diff --git a/pkg/storage/protocol/from_protocol.go b/pkg/storage/protocol/from_protocol.go index 64214d39..66b782e6 100644 --- a/pkg/storage/protocol/from_protocol.go +++ b/pkg/storage/protocol/from_protocol.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "sync" + "sync/atomic" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -24,6 +25,38 @@ type FromProtocol struct { alternateSourcesLock sync.Mutex alternateSources []packets.AlternateSource + + metricRecvEvents uint64 + metricRecvHashes uint64 + metricRecvDevInfo uint64 + metricRecvAltSources uint64 + metricRecvReadAt uint64 + metricRecvWriteAtHash uint64 + metricRecvWriteAtComp uint64 + metricRecvWriteAt uint64 + metricRecvWriteAtWithMap uint64 + metricRecvRemoveFromMap uint64 + metricRecvRemoveDev uint64 + metricRecvDirtyList uint64 + metricSentNeedAt uint64 + metricSentDontNeedAt uint64 +} + +type FromProtocolMetrics struct { + RecvEvents uint64 + RecvHashes uint64 + RecvDevInfo uint64 + RecvAltSources uint64 + RecvReadAt uint64 + RecvWriteAtHash uint64 + RecvWriteAtComp uint64 + RecvWriteAt uint64 + RecvWriteAtWithMap uint64 + RecvRemoveFromMap uint64 + RecvRemoveDev uint64 + RecvDirtyList uint64 + SentNeedAt uint64 + SentDontNeedAt uint64 } func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets.DevInfo) storage.Provider, protocol Protocol) *FromProtocol { @@ -38,6 +71,25 @@ func NewFromProtocol(ctx context.Context, dev uint32, provFactory func(*packets. return fp } +func (fp *FromProtocol) GetMetrics() *FromProtocolMetrics { + return &FromProtocolMetrics{ + RecvEvents: atomic.LoadUint64(&fp.metricRecvEvents), + RecvHashes: atomic.LoadUint64(&fp.metricRecvHashes), + RecvDevInfo: atomic.LoadUint64(&fp.metricRecvDevInfo), + RecvAltSources: atomic.LoadUint64(&fp.metricRecvAltSources), + RecvReadAt: atomic.LoadUint64(&fp.metricRecvReadAt), + RecvWriteAtHash: atomic.LoadUint64(&fp.metricRecvWriteAtHash), + RecvWriteAtComp: atomic.LoadUint64(&fp.metricRecvWriteAtComp), + RecvWriteAt: atomic.LoadUint64(&fp.metricRecvWriteAt), + RecvWriteAtWithMap: atomic.LoadUint64(&fp.metricRecvWriteAtWithMap), + RecvRemoveFromMap: atomic.LoadUint64(&fp.metricRecvRemoveFromMap), + RecvRemoveDev: atomic.LoadUint64(&fp.metricRecvRemoveDev), + RecvDirtyList: atomic.LoadUint64(&fp.metricRecvDirtyList), + SentNeedAt: atomic.LoadUint64(&fp.metricSentNeedAt), + SentDontNeedAt: atomic.LoadUint64(&fp.metricSentDontNeedAt), + } +} + func (fp *FromProtocol) GetAlternateSources() []packets.AlternateSource { fp.alternateSourcesLock.Lock() defer fp.alternateSourcesLock.Unlock() @@ -161,6 +213,8 @@ func (fp *FromProtocol) HandleEvent(cb func(*packets.Event)) error { return err } + atomic.AddUint64(&fp.metricRecvEvents, 1) + if ev.Type == packets.EventCompleted { // Deal with the sync here, and WAIT if needed. storage.SendSiloEvent(fp.prov, "sync.start", storage.SyncStartConfig{ @@ -196,6 +250,8 @@ func (fp *FromProtocol) HandleHashes(cb func(map[uint][sha256.Size]byte)) error return err } + atomic.AddUint64(&fp.metricRecvHashes, 1) + // Relay the hashes, wait and then respond cb(hashes) @@ -217,6 +273,8 @@ func (fp *FromProtocol) HandleDevInfo() error { return err } + atomic.AddUint64(&fp.metricRecvDevInfo, 1) + // Create storage fp.prov = fp.providerFactory(di) numBlocks := (int(fp.prov.Size()) + fp.presentBlockSize - 1) / fp.presentBlockSize @@ -236,6 +294,8 @@ func (fp *FromProtocol) HandleDevInfo() error { return } + atomic.AddUint64(&fp.metricRecvAltSources, 1) + // For now just set it. It only gets sent ONCE at the start of a migration at the moment. fp.alternateSourcesLock.Lock() fp.alternateSources = altSources @@ -274,6 +334,8 @@ func (fp *FromProtocol) HandleReadAt() error { return err } + atomic.AddUint64(&fp.metricRecvReadAt, 1) + // Handle them in goroutines go func(goffset int64, glength int32, gid uint32) { buff := make([]byte, glength) @@ -324,6 +386,8 @@ func (fp *FromProtocol) HandleWriteAt() error { return err } + atomic.AddUint64(&fp.metricRecvWriteAtHash, 1) + // For now, we will simply ack. We do NOT mark it as present. That part will be done when the alternateSources is retrieved. // fp.mark_range_present(int(offset), int(length)) @@ -342,8 +406,14 @@ func (fp *FromProtocol) HandleWriteAt() error { if len(data) > 1 && data[1] == packets.WriteAtCompRLE { offset, writeData, err = packets.DecodeWriteAtComp(data) + if err == nil { + atomic.AddUint64(&fp.metricRecvWriteAtComp, 1) + } } else { offset, writeData, err = packets.DecodeWriteAt(data) + if err == nil { + atomic.AddUint64(&fp.metricRecvWriteAt, 1) + } } if err != nil { return err @@ -388,6 +458,8 @@ func (fp *FromProtocol) HandleWriteAtWithMap(cb func(offset int64, data []byte, return err } + atomic.AddUint64(&fp.metricRecvWriteAtWithMap, 1) + err = cb(offset, writeData, idMap) if err == nil { fp.markRangePresent(int(offset), len(writeData)) @@ -421,6 +493,8 @@ func (fp *FromProtocol) HandleRemoveFromMap(cb func(ids []uint64)) error { return err } + atomic.AddUint64(&fp.metricRecvRemoveFromMap, 1) + cb(ids) /* // TODO: Should probably do this @@ -448,6 +522,8 @@ func (fp *FromProtocol) HandleRemoveDev(cb func()) error { return err } + atomic.AddUint64(&fp.metricRecvRemoveDev, 1) + cb() return nil } @@ -469,6 +545,8 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error { return err } + atomic.AddUint64(&fp.metricRecvDirtyList, 1) + // Mark these as non-present (useful for debugging issues) for _, b := range blocks { offset := int(b) * blockSize @@ -488,11 +566,17 @@ func (fp *FromProtocol) HandleDirtyList(cb func(blocks []uint)) error { func (fp *FromProtocol) NeedAt(offset int64, length int32) error { b := packets.EncodeNeedAt(offset, length) _, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + atomic.AddUint64(&fp.metricSentNeedAt, 1) + } return err } func (fp *FromProtocol) DontNeedAt(offset int64, length int32) error { b := packets.EncodeDontNeedAt(offset, length) _, err := fp.protocol.SendPacket(fp.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + atomic.AddUint64(&fp.metricSentDontNeedAt, 1) + } return err } diff --git a/pkg/storage/protocol/protocol_rw.go b/pkg/storage/protocol/protocol_rw.go index 679abe0a..55ff356c 100644 --- a/pkg/storage/protocol/protocol_rw.go +++ b/pkg/storage/protocol/protocol_rw.go @@ -11,6 +11,8 @@ import ( "github.com/loopholelabs/silo/pkg/storage/protocol/packets" ) +const packetBufferSize = 32 + type packetinfo struct { id uint32 data []byte @@ -22,18 +24,40 @@ type Waiters struct { } type RW struct { - ctx context.Context - readers []io.Reader - writers []io.Writer - writerHeaders [][]byte - writerLocks []sync.Mutex - txID uint32 - activeDevs map[uint32]bool - activeDevsLock sync.Mutex - waiters map[uint32]Waiters - waitersLock sync.Mutex - newdevFn func(context.Context, Protocol, uint32) - newdevProtocol Protocol + ctx context.Context + readers []io.Reader + writers []io.Writer + writerHeaders [][]byte + writerLocks []sync.Mutex + txID uint32 + activeDevs map[uint32]bool + activeDevsLock sync.Mutex + waiters map[uint32]Waiters + waitersLock sync.Mutex + newdevFn func(context.Context, Protocol, uint32) + newdevProtocol Protocol + metricPacketsSent uint64 + metricDataSent uint64 + metricPacketsRecv uint64 + metricDataRecv uint64 + metricWaitingForID int64 + metricWrites uint64 + metricWriteErrors uint64 +} + +// Wrap the writers and gather metrics on them. +type writeWrapper struct { + w io.Writer + rw *RW +} + +func (ww *writeWrapper) Write(buffer []byte) (int, error) { + n, err := ww.w.Write(buffer) + atomic.AddUint64(&ww.rw.metricWrites, 1) + if err != nil { + atomic.AddUint64(&ww.rw.metricWriteErrors, 1) + } + return n, err } func NewRW(ctx context.Context, readers []io.Reader, writers []io.Writer, newdevFN func(context.Context, Protocol, uint32)) *RW { @@ -52,10 +76,11 @@ func NewRWWithBuffering(ctx context.Context, readers []io.Reader, writers []io.W prw.writers = make([]io.Writer, 0) for _, w := range writers { + ww := &writeWrapper{w: w, rw: prw} if bufferConfig != nil { - prw.writers = append(prw.writers, NewBufferedWriter(w, bufferConfig)) + prw.writers = append(prw.writers, NewBufferedWriter(ww, bufferConfig)) } else { - prw.writers = append(prw.writers, w) + prw.writers = append(prw.writers, ww) } } @@ -68,6 +93,30 @@ func NewRWWithBuffering(ctx context.Context, readers []io.Reader, writers []io.W return prw } +type Metrics struct { + PacketsSent uint64 + DataSent uint64 + UrgentPacketsSent uint64 + UrgentDataSent uint64 + PacketsRecv uint64 + DataRecv uint64 + Writes uint64 + WriteErrors uint64 + WaitingForID int64 +} + +func (p *RW) GetMetrics() *Metrics { + return &Metrics{ + PacketsSent: atomic.LoadUint64(&p.metricPacketsSent), + DataSent: atomic.LoadUint64(&p.metricDataSent), + PacketsRecv: atomic.LoadUint64(&p.metricPacketsRecv), + DataRecv: atomic.LoadUint64(&p.metricDataRecv), + Writes: atomic.LoadUint64(&p.metricWrites), + WriteErrors: atomic.LoadUint64(&p.metricWriteErrors), + WaitingForID: atomic.LoadInt64(&p.metricWaitingForID), + } +} + func (p *RW) SetNewDevProtocol(proto Protocol) { p.newdevProtocol = proto } @@ -120,7 +169,6 @@ func (p *RW) SendPacket(dev uint32, id uint32, data []byte, urgency Urgency) (ui binary.LittleEndian.PutUint32(p.writerHeaders[i][8:], uint32(len(data))) _, err := p.writers[i].Write(p.writerHeaders[i]) - if err != nil { return 0, err } @@ -133,6 +181,11 @@ func (p *RW) SendPacket(dev uint32, id uint32, data []byte, urgency Urgency) (ui _, err = p.writers[i].Write(data) } + if err == nil { + atomic.AddUint64(&p.metricPacketsSent, 1) + atomic.AddUint64(&p.metricDataSent, 4+4+4+uint64(len(data))) + } + return id, err } @@ -168,6 +221,9 @@ func (p *RW) Handle() error { return } + atomic.AddUint64(&p.metricPacketsRecv, 1) + atomic.AddUint64(&p.metricDataRecv, 4+4+4+uint64(length)) + err = p.handlePacket(dev, id, data) if err != nil { errs <- err @@ -205,48 +261,56 @@ func (p *RW) handlePacket(dev uint32, id uint32, data []byte) error { p.waiters[dev] = w } - wqID, okk := w.byID[id] - if !okk { - wqID = make(chan packetinfo, 8) // Some buffer here... - w.byID[id] = wqID - } - - wqCmd, okk := w.byCmd[cmd] - if !okk { - wqCmd = make(chan packetinfo, 8) // Some buffer here... - w.byCmd[cmd] = wqCmd + pi := packetinfo{ + id: id, + data: data, } - p.waitersLock.Unlock() - if packets.IsResponse(cmd) { - wqID <- packetinfo{ - id: id, - data: data, + wqID, okk := w.byID[id] + if !okk { + wqID = make(chan packetinfo, packetBufferSize) + w.byID[id] = wqID } + p.waitersLock.Unlock() + + wqID <- pi } else { - wqCmd <- packetinfo{ - id: id, - data: data, + wqCmd, okk := w.byCmd[cmd] + if !okk { + wqCmd = make(chan packetinfo, packetBufferSize) + w.byCmd[cmd] = wqCmd } + + p.waitersLock.Unlock() + + wqCmd <- pi } return nil } func (p *RW) WaitForPacket(dev uint32, id uint32) ([]byte, error) { + atomic.AddInt64(&p.metricWaitingForID, 1) + defer atomic.AddInt64(&p.metricWaitingForID, -1) + p.waitersLock.Lock() w := p.waiters[dev] wq, okk := w.byID[id] if !okk { - wq = make(chan packetinfo, 8) // Some buffer here... + wq = make(chan packetinfo, packetBufferSize) w.byID[id] = wq } p.waitersLock.Unlock() select { - case p := <-wq: - // TODO: Remove the channel, as we only expect a SINGLE response with this ID. - return p.data, nil + case pack := <-wq: + // Remove the channel, as we only expect a SINGLE response with this ID. + p.waitersLock.Lock() + w := p.waiters[dev] + delete(w.byID, id) + p.waitersLock.Unlock() + + return pack.data, nil case <-p.ctx.Done(): return nil, p.ctx.Err() } @@ -257,7 +321,7 @@ func (p *RW) WaitForCommand(dev uint32, cmd byte) (uint32, []byte, error) { w := p.waiters[dev] wq, okk := w.byCmd[cmd] if !okk { - wq = make(chan packetinfo, 8) // Some buffer here... + wq = make(chan packetinfo, packetBufferSize) w.byCmd[cmd] = wq } p.waitersLock.Unlock() diff --git a/pkg/storage/protocol/to_protocol.go b/pkg/storage/protocol/to_protocol.go index 705d1b7d..50202f7b 100644 --- a/pkg/storage/protocol/to_protocol.go +++ b/pkg/storage/protocol/to_protocol.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "sync/atomic" "github.com/loopholelabs/silo/pkg/storage" "github.com/loopholelabs/silo/pkg/storage/protocol/packets" @@ -11,11 +12,29 @@ import ( type ToProtocol struct { storage.ProviderWithEvents - size uint64 - dev uint32 - protocol Protocol - CompressedWrites bool - alternateSources []packets.AlternateSource + size uint64 + dev uint32 + protocol Protocol + CompressedWrites bool + alternateSources []packets.AlternateSource + metricSentEvents uint64 + metricSentAltSources uint64 + metricSentHashes uint64 + metricSentDevInfo uint64 + metricSentRemoveDev uint64 + metricSentDirtyList uint64 + metricSentReadAt uint64 + metricSentWriteAtHash uint64 + metricSentWriteAtHashBytes uint64 + metricSentWriteAtComp uint64 + metricSentWriteAtCompBytes uint64 + metricSentWriteAtCompDataBytes uint64 + metricSentWriteAt uint64 + metricSentWriteAtBytes uint64 + metricSentWriteAtWithMap uint64 + metricSentRemoveFromMap uint64 + metricRecvNeedAt uint64 + metricRecvDontNeedAt uint64 } func NewToProtocol(size uint64, deviceID uint32, p Protocol) *ToProtocol { @@ -27,18 +46,70 @@ func NewToProtocol(size uint64, deviceID uint32, p Protocol) *ToProtocol { } } +type ToProtocolMetrics struct { + SentEvents uint64 + SentAltSources uint64 + SentHashes uint64 + SentDevInfo uint64 + SentRemoveDev uint64 + SentDirtyList uint64 + SentReadAt uint64 + SentWriteAtHash uint64 + SentWriteAtHashBytes uint64 + SentWriteAtComp uint64 + SentWriteAtCompBytes uint64 + SentWriteAtCompDataBytes uint64 + SentWriteAt uint64 + SentWriteAtBytes uint64 + SentWriteAtWithMap uint64 + SentRemoveFromMap uint64 + RecvNeedAt uint64 + RecvDontNeedAt uint64 +} + +func (i *ToProtocol) GetMetrics() *ToProtocolMetrics { + return &ToProtocolMetrics{ + SentEvents: atomic.LoadUint64(&i.metricSentEvents), + SentAltSources: atomic.LoadUint64(&i.metricSentAltSources), + SentHashes: atomic.LoadUint64(&i.metricSentHashes), + SentDevInfo: atomic.LoadUint64(&i.metricSentDevInfo), + SentRemoveDev: atomic.LoadUint64(&i.metricSentRemoveDev), + SentDirtyList: atomic.LoadUint64(&i.metricSentDirtyList), + SentReadAt: atomic.LoadUint64(&i.metricSentReadAt), + SentWriteAtHash: atomic.LoadUint64(&i.metricSentWriteAtHash), + SentWriteAtHashBytes: atomic.LoadUint64(&i.metricSentWriteAtHashBytes), + SentWriteAtComp: atomic.LoadUint64(&i.metricSentWriteAtComp), + SentWriteAtCompBytes: atomic.LoadUint64(&i.metricSentWriteAtCompBytes), + SentWriteAtCompDataBytes: atomic.LoadUint64(&i.metricSentWriteAtCompDataBytes), + SentWriteAt: atomic.LoadUint64(&i.metricSentWriteAt), + SentWriteAtBytes: atomic.LoadUint64(&i.metricSentWriteAtBytes), + SentWriteAtWithMap: atomic.LoadUint64(&i.metricSentWriteAtWithMap), + SentRemoveFromMap: atomic.LoadUint64(&i.metricSentRemoveFromMap), + RecvNeedAt: atomic.LoadUint64(&i.metricRecvNeedAt), + RecvDontNeedAt: atomic.LoadUint64(&i.metricRecvDontNeedAt), + } +} + // Support Silo Events func (i *ToProtocol) SendSiloEvent(eventType storage.EventType, eventData storage.EventData) []storage.EventReturnData { if eventType == storage.EventType("sources") { i.alternateSources = eventData.([]packets.AlternateSource) // Send the list of alternate sources here... - h := packets.EncodeAlternateSources(i.alternateSources) - _, _ = i.protocol.SendPacket(i.dev, IDPickAny, h, UrgencyUrgent) + i.SendAltSources(i.alternateSources) // For now, we do not check the error. If there was a protocol / io error, we should see it on the next send } return nil } +func (i *ToProtocol) SendAltSources(s []packets.AlternateSource) error { + h := packets.EncodeAlternateSources(s) + _, err := i.protocol.SendPacket(i.dev, IDPickAny, h, UrgencyUrgent) + if err == nil { + atomic.AddUint64(&i.metricSentAltSources, 1) + } + return err +} + func (i *ToProtocol) SendEvent(e *packets.Event) error { b := packets.EncodeEvent(e) id, err := i.protocol.SendPacket(i.dev, IDPickAny, b, UrgencyUrgent) @@ -46,6 +117,8 @@ func (i *ToProtocol) SendEvent(e *packets.Event) error { return err } + atomic.AddUint64(&i.metricSentEvents, 1) + // Wait for acknowledgement r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -61,6 +134,9 @@ func (i *ToProtocol) SendHashes(hashes map[uint][sha256.Size]byte) error { if err != nil { return err } + + atomic.AddUint64(&i.metricSentHashes, 1) + // Wait for an ack r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -79,13 +155,24 @@ func (i *ToProtocol) SendDevInfo(name string, blockSize uint32, schema string) e } b := packets.EncodeDevInfo(di) _, err := i.protocol.SendPacket(i.dev, IDPickAny, b, UrgencyUrgent) + if err != nil { + return err + } + + atomic.AddUint64(&i.metricSentDevInfo, 1) return err } func (i *ToProtocol) RemoveDev() error { f := packets.EncodeRemoveDev() _, err := i.protocol.SendPacket(i.dev, IDPickAny, f, UrgencyUrgent) - return err + if err != nil { + return err + } + + atomic.AddUint64(&i.metricSentRemoveDev, 1) + + return nil } func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error { @@ -95,6 +182,8 @@ func (i *ToProtocol) DirtyList(blockSize int, blocks []uint) error { return err } + atomic.AddUint64(&i.metricSentDirtyList, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -112,6 +201,9 @@ func (i *ToProtocol) ReadAt(buffer []byte, offset int64) (int, error) { if err != nil { return 0, err } + + atomic.AddUint64(&i.metricSentReadAt, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -143,6 +235,10 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) { if bytes.Equal(hash[:], as.Hash[:]) { data := packets.EncodeWriteAtHash(as.Offset, as.Length, as.Hash[:]) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAtHash, 1) + atomic.AddUint64(&i.metricSentWriteAtHashBytes, uint64(as.Length)) + } dontSendData = true } break @@ -153,9 +249,18 @@ func (i *ToProtocol) WriteAt(buffer []byte, offset int64) (int, error) { if i.CompressedWrites { data := packets.EncodeWriteAtComp(offset, buffer) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAtComp, 1) + atomic.AddUint64(&i.metricSentWriteAtCompBytes, uint64(len(buffer))) + atomic.AddUint64(&i.metricSentWriteAtCompDataBytes, uint64(len(data))) + } } else { data := packets.EncodeWriteAt(offset, buffer) id, err = i.protocol.SendPacket(i.dev, IDPickAny, data, UrgencyNormal) + if err == nil { + atomic.AddUint64(&i.metricSentWriteAt, 1) + atomic.AddUint64(&i.metricSentWriteAtBytes, uint64(len(buffer))) + } } } if err != nil { @@ -190,6 +295,9 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6 if err != nil { return 0, err } + + atomic.AddUint64(&i.metricSentWriteAtWithMap, 1) + // Wait for the response... r, err := i.protocol.WaitForPacket(i.dev, id) if err != nil { @@ -214,6 +322,9 @@ func (i *ToProtocol) WriteAtWithMap(buffer []byte, offset int64, idMap map[uint6 func (i *ToProtocol) RemoveFromMap(ids []uint64) error { f := packets.EncodeRemoveFromMap(ids) _, err := i.protocol.SendPacket(i.dev, IDPickAny, f, UrgencyUrgent) + if err == nil { + atomic.AddUint64(&i.metricSentRemoveFromMap, 1) + } return err } @@ -246,6 +357,8 @@ func (i *ToProtocol) HandleNeedAt(cb func(offset int64, length int32)) error { return err } + atomic.AddUint64(&i.metricRecvNeedAt, 1) + // We could spin up a goroutine here, but the assumption is that cb won't take long. cb(offset, length) } @@ -263,6 +376,8 @@ func (i *ToProtocol) HandleDontNeedAt(cb func(offset int64, length int32)) error return err } + atomic.AddUint64(&i.metricRecvDontNeedAt, 1) + // We could spin up a goroutine here, but the assumption is that cb won't take long. cb(offset, length) }