diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 99426a909269f..b29566ba878d7 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -18,21 +18,18 @@ package datanode import ( "context" - "path" "strconv" - "time" "github.com/cockroachdb/errors" "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -44,121 +41,27 @@ var ( errStart = errors.New("start") ) -type downloader interface { - // donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths. - // The paths are 1 group of binlog paths generated by 1 `Serialize`. - // - // errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress. - // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever. - download(ctx context.Context, paths []string) ([]*Blob, error) -} - -type uploader interface { - // upload saves InsertData and DeleteData into blob storage, stats binlogs are generated from InsertData. - // - // errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress. - // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever. - uploadInsertLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, error) - uploadStatsLog(ctx context.Context, segID, partID UniqueID, iData *InsertData, stats *storage.PrimaryKeyStats, totRows int64, meta *etcdpb.CollectionMeta) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) - uploadDeltaLog(ctx context.Context, segID, partID UniqueID, dData *DeleteData, meta *etcdpb.CollectionMeta) ([]*datapb.FieldBinlog, error) -} - -type binlogIO struct { - storage.ChunkManager - allocator.Allocator -} - -var ( - _ downloader = (*binlogIO)(nil) - _ uploader = (*binlogIO)(nil) -) - -func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) { +func downloadBlobs(ctx context.Context, b io.BinlogIO, paths []string) ([]*Blob, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "download") defer span.End() log.Debug("down load", zap.Strings("path", paths)) + bytes, err := b.Download(ctx, paths) + if err != nil { + log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths)) + return nil, errDownloadFromBlobStorage + } resp := make([]*Blob, len(paths)) if len(paths) == 0 { return resp, nil } - futures := make([]*conc.Future[any], len(paths)) - for i, path := range paths { - localPath := path - future := getMultiReadPool().Submit(func() (any, error) { - var vs []byte - err := errStart - for err != nil { - select { - case <-ctx.Done(): - log.Warn("ctx done when downloading kvs from blob storage", zap.Strings("paths", paths)) - return nil, errDownloadFromBlobStorage - default: - if err != errStart { - time.Sleep(50 * time.Millisecond) - } - vs, err = b.Read(ctx, localPath) - } - } - return vs, nil - }) - futures[i] = future - } - - for i := range futures { - if !futures[i].OK() { - return nil, futures[i].Err() - } - resp[i] = &Blob{Value: futures[i].Value().([]byte)} + for i := range bytes { + resp[i] = &Blob{Value: bytes[i]} } - return resp, nil } -func (b *binlogIO) uploadSegmentFiles( - ctx context.Context, - CollectionID UniqueID, - segID UniqueID, - kvs map[string][]byte, -) error { - log.Debug("update", zap.Int64("collectionID", CollectionID), zap.Int64("segmentID", segID)) - if len(kvs) == 0 { - return nil - } - futures := make([]*conc.Future[any], 0) - for key, val := range kvs { - localPath := key - localVal := val - future := getMultiReadPool().Submit(func() (any, error) { - err := errStart - for err != nil { - select { - case <-ctx.Done(): - log.Warn("ctx done when saving kvs to blob storage", - zap.Int64("collectionID", CollectionID), - zap.Int64("segmentID", segID), - zap.Int("number of kvs", len(kvs))) - return nil, errUploadToBlobStorage - default: - if err != errStart { - time.Sleep(50 * time.Millisecond) - } - err = b.Write(ctx, localPath, localVal) - } - } - return nil, nil - }) - futures = append(futures, future) - } - - err := conc.AwaitAll(futures...) - if err != nil { - return err - } - return nil -} - // genDeltaBlobs returns key, value -func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) { +func genDeltaBlobs(b io.BinlogIO, allocator allocator.Allocator, data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) { dCodec := storage.NewDeleteCodec() blob, err := dCodec.Serialize(collID, partID, segID, data) @@ -166,19 +69,18 @@ func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueI return "", nil, err } - idx, err := b.AllocOne() + idx, err := allocator.AllocOne() if err != nil { return "", nil, err } k := metautil.JoinIDPath(collID, partID, segID, idx) - - key := path.Join(b.ChunkManager.RootPath(), common.SegmentDeltaLogPath, k) + key := b.JoinFullPath(common.SegmentDeltaLogPath, k) return key, blob.GetValue(), nil } // genInsertBlobs returns insert-paths and save blob to kvs -func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) { +func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) { inlogs, err := iCodec.Serialize(partID, segID, data) if err != nil { return nil, err @@ -188,7 +90,7 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod notifyGenIdx := make(chan struct{}) defer close(notifyGenIdx) - generator, err := b.GetGenerator(len(inlogs), notifyGenIdx) + generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx) if err != nil { return nil, err } @@ -196,9 +98,8 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod for _, blob := range inlogs { // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) - k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, <-generator) - key := path.Join(b.ChunkManager.RootPath(), common.SegmentInsertLogPath, k) - + k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator) + key := b.JoinFullPath(common.SegmentInsertLogPath, k) value := blob.GetValue() fileLen := len(value) @@ -213,22 +114,20 @@ func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, iCod } // genStatBlobs return stats log paths and save blob to kvs -func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) { +func genStatBlobs(b io.BinlogIO, allocator allocator.Allocator, stats *storage.PrimaryKeyStats, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte, totRows int64) (map[UniqueID]*datapb.FieldBinlog, error) { statBlob, err := iCodec.SerializePkStats(stats, totRows) if err != nil { return nil, err } statPaths := make(map[UniqueID]*datapb.FieldBinlog) - idx, err := b.AllocOne() + idx, err := allocator.AllocOne() if err != nil { return nil, err } - fID, _ := strconv.ParseInt(statBlob.GetKey(), 10, 64) - k := metautil.JoinIDPath(iCodec.Schema.GetID(), partID, segID, fID, idx) - key := path.Join(b.ChunkManager.RootPath(), common.SegmentStatslogPath, k) - + k := metautil.JoinIDPath(collectionID, partID, segID, fID, idx) + key := b.JoinFullPath(common.SegmentStatslogPath, k) value := statBlob.GetValue() fileLen := len(value) @@ -243,57 +142,46 @@ func (b *binlogIO) genStatBlobs(stats *storage.PrimaryKeyStats, partID, segID Un // update stats log // also update with insert data if not nil -func (b *binlogIO) uploadStatsLog( +func uploadStatsLog( ctx context.Context, - segID UniqueID, + b io.BinlogIO, + allocator allocator.Allocator, + collectionID UniqueID, partID UniqueID, - iData *InsertData, + segID UniqueID, stats *storage.PrimaryKeyStats, totRows int64, - meta *etcdpb.CollectionMeta, -) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { + iCodec *storage.InsertCodec, +) (map[UniqueID]*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadStatslog") defer span.End() - var inPaths map[int64]*datapb.FieldBinlog - var err error - - iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) - if !iData.IsEmpty() { - inPaths, err = b.genInsertBlobs(iData, partID, segID, iCodec, kvs) - if err != nil { - log.Warn("generate insert blobs wrong", - zap.Int64("collectionID", iCodec.Schema.GetID()), - zap.Int64("segmentID", segID), - zap.Error(err)) - return nil, nil, err - } - } - - statPaths, err := b.genStatBlobs(stats, partID, segID, iCodec, kvs, totRows) + statPaths, err := genStatBlobs(b, allocator, stats, collectionID, partID, segID, iCodec, kvs, totRows) if err != nil { - return nil, nil, err + return nil, err } - err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) + err = b.Upload(ctx, kvs) if err != nil { - return nil, nil, err + return nil, err } - return inPaths, statPaths, nil + return statPaths, nil } -func (b *binlogIO) uploadInsertLog( +func uploadInsertLog( ctx context.Context, - segID UniqueID, + b io.BinlogIO, + allocator allocator.Allocator, + collectionID UniqueID, partID UniqueID, + segID UniqueID, iData *InsertData, - meta *etcdpb.CollectionMeta, + iCodec *storage.InsertCodec, ) (map[UniqueID]*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog") defer span.End() - iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) if iData.IsEmpty() { @@ -304,12 +192,12 @@ func (b *binlogIO) uploadInsertLog( return nil, nil } - inpaths, err := b.genInsertBlobs(iData, partID, segID, iCodec, kvs) + inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs) if err != nil { return nil, err } - err = b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) + err = b.Upload(ctx, kvs) if err != nil { return nil, err } @@ -317,12 +205,14 @@ func (b *binlogIO) uploadInsertLog( return inpaths, nil } -func (b *binlogIO) uploadDeltaLog( +func uploadDeltaLog( ctx context.Context, - segID UniqueID, + b io.BinlogIO, + allocator allocator.Allocator, + collectionID UniqueID, partID UniqueID, + segID UniqueID, dData *DeleteData, - meta *etcdpb.CollectionMeta, ) ([]*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadDeltaLog") defer span.End() @@ -332,10 +222,10 @@ func (b *binlogIO) uploadDeltaLog( ) if dData.RowCount > 0 { - k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID) + k, v, err := genDeltaBlobs(b, allocator, dData, collectionID, partID, segID) if err != nil { log.Warn("generate delta blobs wrong", - zap.Int64("collectionID", meta.GetID()), + zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segID), zap.Error(err)) return nil, err @@ -354,7 +244,7 @@ func (b *binlogIO) uploadDeltaLog( return nil, nil } - err := b.uploadSegmentFiles(ctx, meta.GetID(), segID, kvs) + err := b.Upload(ctx, kvs) if err != nil { return nil, err } diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index ebb9f9acaf7cc..eea1b18291e81 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -53,8 +54,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { defer cm.RemoveWithPrefix(ctx, cm.RootPath()) t.Run("Test download", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - b := &binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) tests := []struct { isvalid bool ks []string // for preparation @@ -77,19 +77,19 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { assert.NotEmpty(t, blob) inkeys = append(inkeys, key) - loaded, err := b.download(test.inctx, []string{key}) + loaded, err := downloadBlobs(test.inctx, binlogIO, []string{key}) assert.NoError(t, err) assert.ElementsMatch(t, blob, loaded[0].GetValue()) } - loaded, err := b.download(test.inctx, inkeys) + loaded, err := downloadBlobs(test.inctx, binlogIO, inkeys) assert.NoError(t, err) assert.Equal(t, len(test.ks), len(loaded)) } else { ctx, cancel := context.WithCancel(test.inctx) cancel() - _, err := b.download(ctx, []string{"test"}) + _, err := downloadBlobs(ctx, binlogIO, []string{"test"}) assert.EqualError(t, err, errDownloadFromBlobStorage.Error()) } }) @@ -97,12 +97,10 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { }) t.Run("Test download twice", func(t *testing.T) { - mkc := &mockCm{errRead: true} - alloc := allocator.NewMockAllocator(t) - b := &binlogIO{mkc, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*20) - blobs, err := b.download(ctx, []string{"a"}) + blobs, err := downloadBlobs(ctx, binlogIO, []string{"a"}) assert.Error(t, err) assert.Empty(t, blobs) cancel() @@ -114,9 +112,10 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { t.Run("gen insert blob failed", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) - b := binlogIO{cm, alloc} - _, _, err := b.uploadStatsLog(context.Background(), 1, 10, genInsertData(2), genTestStat(meta), 10, meta) + alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error")) + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) + _, err := uploadStatsLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genTestStat(meta), 10, iCodec) assert.Error(t, err) }) }) @@ -127,34 +126,34 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { t.Run("empty insert", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) - b := binlogIO{cm, alloc} - - paths, err := b.uploadInsertLog(context.Background(), 1, 10, genEmptyInsertData(), meta) + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) + paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec) assert.NoError(t, err) assert.Nil(t, paths) }) t.Run("gen insert blob failed", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) - b := binlogIO{cm, alloc} - + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) - - _, err := b.uploadInsertLog(context.Background(), 1, 10, genInsertData(2), meta) + _, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec) assert.Error(t, err) }) t.Run("upload failed", func(t *testing.T) { mkc := &mockCm{errRead: true, errSave: true} alloc := allocator.NewMockAllocator(t) - b := binlogIO{mkc, alloc} + binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := b.uploadInsertLog(ctx, 1, 10, genInsertData(2), meta) + _, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec) assert.Error(t, err) }) }) @@ -183,8 +182,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("Test genDeltaBlobs", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - b := &binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10002), "test_gen_blobs", schemapb.DataType_Int64) @@ -201,7 +199,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { if test.isvalid { - k, v, err := b.genDeltaBlobs(&DeleteData{ + k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{ Pks: []storage.PrimaryKey{test.deletepk}, Tss: []uint64{test.ts}, }, meta.GetID(), 10, 1) @@ -221,8 +219,8 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("Test serialize error", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) - b := &binlogIO{cm, alloc} - k, v, err := b.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{}}, 1, 1, 1) + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{}}, 1, 1, 1) assert.Error(t, err) assert.Empty(t, k) assert.Empty(t, v) @@ -231,8 +229,8 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("Test AllocOne error", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().AllocOne().Call.Return(int64(0), fmt.Errorf("mock AllocOne error")) - bin := binlogIO{cm, alloc} - k, v, err := bin.genDeltaBlobs(&DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1) + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + k, v, err := genDeltaBlobs(binlogIO, alloc, &DeleteData{Pks: []storage.PrimaryKey{pk}, Tss: []uint64{1}}, 1, 1, 1) assert.Error(t, err) assert.Empty(t, k) assert.Empty(t, v) @@ -243,7 +241,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { f := &MetaFactory{} alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) - b := binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) tests := []struct { pkType schemapb.DataType @@ -260,7 +258,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) - pin, err := b.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs) + pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs) assert.NoError(t, err) assert.Equal(t, 12, len(pin)) @@ -282,9 +280,10 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("serialize error", func(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(nil) - bin := &binlogIO{cm, allocator.NewMockAllocator(t)} + alloc := allocator.NewMockAllocator(t) + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) kvs := make(map[string][]byte) - pin, err := bin.genInsertBlobs(genEmptyInsertData(), 10, 1, iCodec, kvs) + pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs) assert.Error(t, err) assert.Empty(t, kvs) @@ -298,10 +297,10 @@ func TestBinlogIOInnerMethods(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error")) - bin := &binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) kvs := make(map[string][]byte) - pin, err := bin.genInsertBlobs(genInsertData(2), 10, 1, iCodec, kvs) + pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs) assert.Error(t, err) assert.Empty(t, kvs) @@ -314,7 +313,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { alloc := allocator.NewMockAllocator(t) alloc.EXPECT().AllocOne().Return(0, nil) - b := binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) tests := []struct { pkType schemapb.DataType @@ -331,7 +330,7 @@ func TestBinlogIOInnerMethods(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) - stat, err := b.genStatBlobs(genTestStat(meta), 10, 1, iCodec, kvs, 0) + stat, err := genStatBlobs(binlogIO, alloc, genTestStat(meta), meta.GetID(), 10, 1, iCodec, kvs, 0) assert.NoError(t, err) assert.Equal(t, 1, len(stat)) @@ -343,14 +342,14 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run("Test genStatsBlob error", func(t *testing.T) { f := &MetaFactory{} alloc := allocator.NewMockAllocator(t) - b := binlogIO{cm, alloc} + binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) t.Run("serialize error", func(t *testing.T) { meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_stat_blobs_error", schemapb.DataType_Int64) iCodec := storage.NewInsertCodecWithSchema(meta) kvs := make(map[string][]byte) - _, err := b.genStatBlobs(nil, 10, 1, iCodec, kvs, 0) + _, err := genStatBlobs(binlogIO, alloc, nil, meta.GetID(), 10, 1, iCodec, kvs, 0) assert.Error(t, err) }) }) @@ -378,6 +377,9 @@ func (mk *mockCm) Write(ctx context.Context, filePath string, content []byte) er } func (mk *mockCm) MultiWrite(ctx context.Context, contents map[string][]byte) error { + if mk.errSave { + return errors.New("mockKv save error") + } return nil } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index fde437dea49a7..60bc3fe554b7e 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -30,6 +30,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" @@ -70,8 +71,7 @@ var _ compactor = (*compactionTask)(nil) // for MixCompaction only type compactionTask struct { - downloader - uploader + binlogIO io.BinlogIO compactor metaCache metacache.MetaCache syncMgr syncmgr.SyncManager @@ -89,8 +89,7 @@ type compactionTask struct { func newCompactionTask( ctx context.Context, - dl downloader, - ul uploader, + binlogIO io.BinlogIO, metaCache metacache.MetaCache, syncMgr syncmgr.SyncManager, alloc allocator.Allocator, @@ -98,17 +97,15 @@ func newCompactionTask( ) *compactionTask { ctx1, cancel := context.WithCancel(ctx) return &compactionTask{ - ctx: ctx1, - cancel: cancel, - - downloader: dl, - uploader: ul, - syncMgr: syncMgr, - metaCache: metaCache, - Allocator: alloc, - plan: plan, - tr: timerecord.NewTimeRecorder("levelone compaction"), - done: make(chan struct{}, 1), + ctx: ctx1, + cancel: cancel, + binlogIO: binlogIO, + syncMgr: syncMgr, + metaCache: metaCache, + Allocator: alloc, + plan: plan, + tr: timerecord.NewTimeRecorder("levelone compaction"), + done: make(chan struct{}, 1), } } @@ -184,9 +181,18 @@ func (t *compactionTask) uploadRemainLog( stats *storage.PrimaryKeyStats, totRows int64, writeBuffer *storage.InsertData, - fID2Type map[UniqueID]schemapb.DataType, ) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { - inPaths, statPaths, err := t.uploadStatsLog(ctxTimeout, targetSegID, partID, writeBuffer, stats, totRows, meta) + iCodec := storage.NewInsertCodecWithSchema(meta) + inPaths := make(map[int64]*datapb.FieldBinlog, 0) + var err error + if !writeBuffer.IsEmpty() { + inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec) + if err != nil { + return nil, nil, err + } + } + + statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec) if err != nil { return nil, nil, err } @@ -200,9 +206,10 @@ func (t *compactionTask) uploadSingleInsertLog( partID UniqueID, meta *etcdpb.CollectionMeta, writeBuffer *storage.InsertData, - fID2Type map[UniqueID]schemapb.DataType, ) (map[UniqueID]*datapb.FieldBinlog, error) { - inPaths, err := t.uploadInsertLog(ctxTimeout, targetSegID, partID, writeBuffer, meta) + iCodec := storage.NewInsertCodecWithSchema(meta) + + inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec) if err != nil { return nil, err } @@ -228,8 +235,6 @@ func (t *compactionTask) merge( numRows int64 // the number of rows uploaded expired int64 // the number of expired entity - fID2Type = make(map[UniqueID]schemapb.DataType) - insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) insertPaths = make([]*datapb.FieldBinlog, 0) @@ -283,7 +288,6 @@ func (t *compactionTask) merge( // get pkID, pkType, dim var pkField *schemapb.FieldSchema for _, fs := range meta.GetSchema().GetFields() { - fID2Type[fs.GetFieldID()] = fs.GetDataType() if fs.GetIsPrimaryKey() && fs.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(fs.GetDataType()) { pkField = fs } @@ -322,7 +326,7 @@ func (t *compactionTask) merge( for _, path := range unMergedInsertlogs { downloadStart := time.Now() - data, err := t.download(ctx, path) + data, err := downloadBlobs(ctx, t.binlogIO, path) if err != nil { log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) return nil, nil, 0, err @@ -380,7 +384,7 @@ func (t *compactionTask) merge( if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() { numRows += int64(writeBuffer.GetRowNum()) uploadInsertStart := time.Now() - inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer, fID2Type) + inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer) if err != nil { log.Warn("failed to upload single insert log", zap.Error(err)) return nil, nil, 0, err @@ -402,7 +406,7 @@ func (t *compactionTask) merge( numRows += int64(writeBuffer.GetRowNum()) uploadStart := time.Now() inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta, - stats, numRows+int64(currentRows), writeBuffer, fID2Type) + stats, numRows+int64(currentRows), writeBuffer) if err != nil { return nil, nil, 0, err } @@ -511,7 +515,7 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { } if len(paths) != 0 { - bs, err := t.download(ctxTimeout, paths) + bs, err := downloadBlobs(ctxTimeout, t.binlogIO, paths) if err != nil { log.Warn("compact wrong, fail to download deltalogs", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err)) return nil, err diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 9902be4555e03..eee7bf97e4f49 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" memkv "github.com/milvus-io/milvus/internal/kv/mem" @@ -292,12 +293,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) alloc.EXPECT().AllocOne().Return(0, nil) t.Run("Merge without expiration", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() - + iCodec := storage.NewInsertCodecWithSchema(meta) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -316,10 +317,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), plan: &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 1}, @@ -335,7 +336,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo()) }) t.Run("Merge without expiration2", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetValue() defer func() { @@ -346,7 +348,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -363,10 +365,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { dm := map[interface{}]Timestamp{} ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), plan: &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 1}, @@ -384,7 +386,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) // set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row t.Run("merge_with_more_than_100rows", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetAsInt() defer func() { @@ -394,7 +397,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { iData := genInsertData(101) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -413,10 +416,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), plan: &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 1}, @@ -435,13 +438,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with expiration", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} - + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -461,9 +464,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) { // 10 days in seconds ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, plan: &datapb.CompactionPlan{ CollectionTtl: 864000, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ @@ -480,8 +483,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("merge_with_rownum_zero", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) iData := genInsertDataWithExpiredTS() + iCodec := storage.NewInsertCodecWithSchema(meta) meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) metaCache := metacache.NewMockMetaCache(t) metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe() @@ -496,7 +500,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -515,10 +519,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), plan: &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 1}, @@ -533,13 +537,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with meta error", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -558,10 +563,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), plan: &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 1}, @@ -579,13 +584,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge with meta type param error", func(t *testing.T) { - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -604,10 +610,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { } ct := &compactionTask{ - metaCache: metaCache, - downloader: mockbIO, - uploader: mockbIO, - done: make(chan struct{}, 1), + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), } _, _, _, err = ct.merge(context.Background(), allPaths, 2, 0, &etcdpb.CollectionMeta{ @@ -727,11 +733,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { require.NoError(t, err) ct := &compactionTask{ - uploader: &binlogIO{&mockCm{errSave: true}, alloc}, - done: make(chan struct{}, 1), + binlogIO: io.NewBinlogIO(&mockCm{errSave: true}, getOrCreateIOPool()), + Allocator: alloc, + done: make(chan struct{}, 1), } - _, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil, nil) + _, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil) assert.Error(t, err) }) }) @@ -855,7 +862,8 @@ func TestCompactorInterfaceMethods(t *testing.T) { collName := "test_compact_coll_name" meta := NewMetaFactory().GetCollectionMeta(c.colID, collName, c.pkType) - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) mockKv := memkv.NewMemoryKV() metaCache := metacache.NewMockMetaCache(t) metaCache.EXPECT().Collection().Return(c.colID) @@ -906,17 +914,21 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) require.NoError(t, err) - iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), c.segID1, c.parID, iData1, stats1, 2, meta) + iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iData1, iCodec) + require.NoError(t, err) + sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, stats1, 2, iCodec) require.NoError(t, err) - dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID1, c.parID, dData1, meta) + dPaths1, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, dData1) require.NoError(t, err) require.Equal(t, 12, len(iPaths1)) stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) require.NoError(t, err) - iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), c.segID2, c.parID, iData2, stats2, 2, meta) + iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iData2, iCodec) require.NoError(t, err) - dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), c.segID2, c.parID, dData2, meta) + sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, stats2, 2, iCodec) + require.NoError(t, err) + dPaths2, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, dData2) require.NoError(t, err) require.Equal(t, 12, len(iPaths2)) @@ -942,7 +954,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan) + task := newCompactionTask(context.TODO(), mockbIO, metaCache, syncMgr, alloc, plan) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) @@ -988,7 +1000,8 @@ func TestCompactorInterfaceMethods(t *testing.T) { meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name", schemapb.DataType_Int64) - mockbIO := &binlogIO{cm, alloc} + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iCodec := storage.NewInsertCodecWithSchema(meta) metaCache := metacache.NewMockMetaCache(t) metaCache.EXPECT().Collection().Return(collID) @@ -1044,17 +1057,21 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) require.NoError(t, err) - iPaths1, sPaths1, err := mockbIO.uploadStatsLog(context.TODO(), segID1, partID, iData1, stats1, 1, meta) + iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iData1, iCodec) + require.NoError(t, err) + sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, stats1, 1, iCodec) require.NoError(t, err) - dPaths1, err := mockbIO.uploadDeltaLog(context.TODO(), segID1, partID, dData1, meta) + dPaths1, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), partID, segID1, dData1) require.NoError(t, err) require.Equal(t, 12, len(iPaths1)) stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) require.NoError(t, err) - iPaths2, sPaths2, err := mockbIO.uploadStatsLog(context.TODO(), segID2, partID, iData2, stats2, 1, meta) + iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iData2, iCodec) + require.NoError(t, err) + sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, stats2, 1, iCodec) require.NoError(t, err) - dPaths2, err := mockbIO.uploadDeltaLog(context.TODO(), segID2, partID, dData2, meta) + dPaths2, err := uploadDeltaLog(context.TODO(), mockbIO, alloc, meta.GetID(), partID, segID2, dData2) require.NoError(t, err) require.Equal(t, 12, len(iPaths2)) @@ -1080,7 +1097,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { Channel: "channelname", } - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan) + task := newCompactionTask(context.TODO(), mockbIO, metaCache, syncMgr, alloc, plan) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index fb9de915e1655..44316023ea7d3 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -285,11 +285,10 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan req, ) case datapb.CompactionType_MixCompaction: - // TODO, replace this binlogIO with io.BinlogIO - binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} + binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool()) task = newCompactionTask( taskCtx, - binlogIO, binlogIO, + binlogIO, ds.metacache, node.syncMgr, node.allocator, diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index f4918d072d5be..111fca1d2abb8 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -205,6 +205,9 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { blobs := make([]*Blob, 0) var writer *InsertBinlogWriter + if insertCodec.Schema == nil { + return nil, fmt.Errorf("schema is not set") + } timeFieldData, ok := data.Data[common.TimeStampField] if !ok { return nil, fmt.Errorf("data doesn't contains timestamp field")