From 56659bacbbadb395e5c186b991fd614b390ab417 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Thu, 23 Jan 2025 15:49:05 +0800 Subject: [PATCH] enhance: make serialization be part of sync task to support file format change (#38946) See #38945 --------- Signed-off-by: Ted Xu --- Makefile | 2 +- .../datanode/importv2/task_l0_import_test.go | 2 +- internal/datanode/importv2/util.go | 14 +- internal/flushcommon/syncmgr/meta_writer.go | 8 +- .../flushcommon/syncmgr/meta_writer_test.go | 6 +- .../flushcommon/syncmgr/mock_pack_writer.go | 130 +++++++ .../flushcommon/syncmgr/mock_serializer.go | 95 ------ internal/flushcommon/syncmgr/options.go | 102 ++---- internal/flushcommon/syncmgr/pack_writer.go | 318 ++++++++++++++++++ .../flushcommon/syncmgr/pack_writer_test.go | 190 +++++++++++ .../flushcommon/syncmgr/storage_serializer.go | 146 +------- .../syncmgr/storage_serializer_test.go | 101 ++---- .../flushcommon/syncmgr/sync_manager_test.go | 76 +---- internal/flushcommon/syncmgr/task.go | 263 ++------------- internal/flushcommon/syncmgr/task_test.go | 216 +++++------- .../writebuffer/l0_write_buffer_test.go | 10 - .../flushcommon/writebuffer/write_buffer.go | 23 +- .../writebuffer/write_buffer_test.go | 41 --- internal/storage/data_codec.go | 7 +- pkg/metrics/datanode_metrics.go | 13 - 20 files changed, 839 insertions(+), 924 deletions(-) create mode 100644 internal/flushcommon/syncmgr/mock_pack_writer.go delete mode 100644 internal/flushcommon/syncmgr/mock_serializer.go create mode 100644 internal/flushcommon/syncmgr/pack_writer.go create mode 100644 internal/flushcommon/syncmgr/pack_writer_test.go diff --git a/Makefile b/Makefile index 855129f3f4c32..89a2d8e13df7a 100644 --- a/Makefile +++ b/Makefile @@ -499,7 +499,7 @@ generate-mockery-flushcommon: getdeps $(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/flushcommon/metacache --output=$(PWD)/internal/flushcommon/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage - $(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage + $(INSTALL_PATH)/mockery --name=PackWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_pack_writer.go --with-expecter --structname=MockPackWriter --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go index a5793cfedfde5..8548ba604a880 100644 --- a/internal/datanode/importv2/task_l0_import_test.go +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -138,7 +138,7 @@ func (s *L0ImportSuite) TestL0Import() { task.(*syncmgr.SyncTask).WithAllocator(alloc) s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath") - s.cm.(*mocks.ChunkManager).EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil) + s.cm.(*mocks.ChunkManager).EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil) task.(*syncmgr.SyncTask).WithChunkManager(s.cm) err := task.Run(context.Background()) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 6b2914b6eb7fe..9b5d7cc2505ce 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -69,17 +69,6 @@ func NewSyncTask(ctx context.Context, }, metacache.NewBM25StatsFactory) } - var serializer syncmgr.Serializer - var err error - serializer, err = syncmgr.NewStorageSerializer( - allocator, - metaCache, - nil, - ) - if err != nil { - return nil, err - } - segmentLevel := datapb.SegmentLevel_L1 if insertData == nil && deleteData != nil { segmentLevel = datapb.SegmentLevel_L0 @@ -100,7 +89,8 @@ func NewSyncTask(ctx context.Context, syncPack.WithBM25Stats(bm25Stats) } - return serializer.EncodeBuffer(ctx, syncPack) + task := syncmgr.NewSyncTask().WithAllocator(allocator).WithMetaCache(metaCache).WithSyncPack(syncPack) + return task, nil } func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) { diff --git a/internal/flushcommon/syncmgr/meta_writer.go b/internal/flushcommon/syncmgr/meta_writer.go index 0fbcd3f841077..6cc8f19b3611f 100644 --- a/internal/flushcommon/syncmgr/meta_writer.go +++ b/internal/flushcommon/syncmgr/meta_writer.go @@ -46,7 +46,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error insertFieldBinlogs := lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog }) statsFieldBinlogs := lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog }) - if len(pack.deltaBinlog.Binlogs) > 0 { + if pack.deltaBinlog != nil && len(pack.deltaBinlog.Binlogs) > 0 { deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog) } @@ -102,8 +102,8 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error CheckPoints: checkPoints, StartPositions: startPos, - Flushed: pack.isFlush, - Dropped: pack.isDrop, + Flushed: pack.pack.isFlush, + Dropped: pack.pack.isDrop, Channel: pack.channelName, SegLevel: pack.level, } @@ -111,7 +111,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error err := b.broker.SaveBinlogPaths(ctx, req) // Segment not found during stale segment flush. Segment might get compacted already. // Stop retry and still proceed to the end, ignoring this error. - if !pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) { + if !pack.pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) { log.Warn("stale segment not found, could be compacted", zap.Int64("segmentID", pack.segmentID)) log.Warn("failed to SaveBinlogPaths", diff --git a/internal/flushcommon/syncmgr/meta_writer_test.go b/internal/flushcommon/syncmgr/meta_writer_test.go index 35a0aa44e66c4..3db8ce1f0d777 100644 --- a/internal/flushcommon/syncmgr/meta_writer_test.go +++ b/internal/flushcommon/syncmgr/meta_writer_test.go @@ -46,8 +46,7 @@ func (s *MetaWriterSuite) TestNormalSave() { s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - task := NewSyncTask() - task.WithMetaCache(s.metacache) + task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack)) err := s.writer.UpdateSync(ctx, task) s.NoError(err) } @@ -62,8 +61,7 @@ func (s *MetaWriterSuite) TestReturnError() { metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - task := NewSyncTask() - task.WithMetaCache(s.metacache) + task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack)) err := s.writer.UpdateSync(ctx, task) s.Error(err) } diff --git a/internal/flushcommon/syncmgr/mock_pack_writer.go b/internal/flushcommon/syncmgr/mock_pack_writer.go new file mode 100644 index 0000000000000..c6f0c07945d8b --- /dev/null +++ b/internal/flushcommon/syncmgr/mock_pack_writer.go @@ -0,0 +1,130 @@ +// Code generated by mockery v2.46.0. DO NOT EDIT. + +package syncmgr + +import ( + context "context" + + datapb "github.com/milvus-io/milvus/pkg/proto/datapb" + mock "github.com/stretchr/testify/mock" +) + +// MockPackWriter is an autogenerated mock type for the PackWriter type +type MockPackWriter struct { + mock.Mock +} + +type MockPackWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *MockPackWriter) EXPECT() *MockPackWriter_Expecter { + return &MockPackWriter_Expecter{mock: &_m.Mock} +} + +// Write provides a mock function with given fields: ctx, pack +func (_m *MockPackWriter) Write(ctx context.Context, pack *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error) { + ret := _m.Called(ctx, pack) + + if len(ret) == 0 { + panic("no return value specified for Write") + } + + var r0 []*datapb.Binlog + var r1 *datapb.Binlog + var r2 *datapb.Binlog + var r3 *datapb.Binlog + var r4 int64 + var r5 error + if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error)); ok { + return rf(ctx, pack) + } + if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) []*datapb.Binlog); ok { + r0 = rf(ctx, pack) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*datapb.Binlog) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) *datapb.Binlog); ok { + r1 = rf(ctx, pack) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*datapb.Binlog) + } + } + + if rf, ok := ret.Get(2).(func(context.Context, *SyncPack) *datapb.Binlog); ok { + r2 = rf(ctx, pack) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).(*datapb.Binlog) + } + } + + if rf, ok := ret.Get(3).(func(context.Context, *SyncPack) *datapb.Binlog); ok { + r3 = rf(ctx, pack) + } else { + if ret.Get(3) != nil { + r3 = ret.Get(3).(*datapb.Binlog) + } + } + + if rf, ok := ret.Get(4).(func(context.Context, *SyncPack) int64); ok { + r4 = rf(ctx, pack) + } else { + r4 = ret.Get(4).(int64) + } + + if rf, ok := ret.Get(5).(func(context.Context, *SyncPack) error); ok { + r5 = rf(ctx, pack) + } else { + r5 = ret.Error(5) + } + + return r0, r1, r2, r3, r4, r5 +} + +// MockPackWriter_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' +type MockPackWriter_Write_Call struct { + *mock.Call +} + +// Write is a helper method to define mock.On call +// - ctx context.Context +// - pack *SyncPack +func (_e *MockPackWriter_Expecter) Write(ctx interface{}, pack interface{}) *MockPackWriter_Write_Call { + return &MockPackWriter_Write_Call{Call: _e.mock.On("Write", ctx, pack)} +} + +func (_c *MockPackWriter_Write_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockPackWriter_Write_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*SyncPack)) + }) + return _c +} + +func (_c *MockPackWriter_Write_Call) Return(inserts []*datapb.Binlog, deletes *datapb.Binlog, stats *datapb.Binlog, bm25Stats *datapb.Binlog, size int64, err error) *MockPackWriter_Write_Call { + _c.Call.Return(inserts, deletes, stats, bm25Stats, size, err) + return _c +} + +func (_c *MockPackWriter_Write_Call) RunAndReturn(run func(context.Context, *SyncPack) ([]*datapb.Binlog, *datapb.Binlog, *datapb.Binlog, *datapb.Binlog, int64, error)) *MockPackWriter_Write_Call { + _c.Call.Return(run) + return _c +} + +// NewMockPackWriter creates a new instance of MockPackWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockPackWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockPackWriter { + mock := &MockPackWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/flushcommon/syncmgr/mock_serializer.go b/internal/flushcommon/syncmgr/mock_serializer.go deleted file mode 100644 index 03d05aa85574f..0000000000000 --- a/internal/flushcommon/syncmgr/mock_serializer.go +++ /dev/null @@ -1,95 +0,0 @@ -// Code generated by mockery v2.46.0. DO NOT EDIT. - -package syncmgr - -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) - -// MockSerializer is an autogenerated mock type for the Serializer type -type MockSerializer struct { - mock.Mock -} - -type MockSerializer_Expecter struct { - mock *mock.Mock -} - -func (_m *MockSerializer) EXPECT() *MockSerializer_Expecter { - return &MockSerializer_Expecter{mock: &_m.Mock} -} - -// EncodeBuffer provides a mock function with given fields: ctx, pack -func (_m *MockSerializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { - ret := _m.Called(ctx, pack) - - if len(ret) == 0 { - panic("no return value specified for EncodeBuffer") - } - - var r0 Task - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) (Task, error)); ok { - return rf(ctx, pack) - } - if rf, ok := ret.Get(0).(func(context.Context, *SyncPack) Task); ok { - r0 = rf(ctx, pack) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(Task) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *SyncPack) error); ok { - r1 = rf(ctx, pack) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockSerializer_EncodeBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EncodeBuffer' -type MockSerializer_EncodeBuffer_Call struct { - *mock.Call -} - -// EncodeBuffer is a helper method to define mock.On call -// - ctx context.Context -// - pack *SyncPack -func (_e *MockSerializer_Expecter) EncodeBuffer(ctx interface{}, pack interface{}) *MockSerializer_EncodeBuffer_Call { - return &MockSerializer_EncodeBuffer_Call{Call: _e.mock.On("EncodeBuffer", ctx, pack)} -} - -func (_c *MockSerializer_EncodeBuffer_Call) Run(run func(ctx context.Context, pack *SyncPack)) *MockSerializer_EncodeBuffer_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*SyncPack)) - }) - return _c -} - -func (_c *MockSerializer_EncodeBuffer_Call) Return(_a0 Task, _a1 error) *MockSerializer_EncodeBuffer_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockSerializer_EncodeBuffer_Call) RunAndReturn(run func(context.Context, *SyncPack) (Task, error)) *MockSerializer_EncodeBuffer_Call { - _c.Call.Return(run) - return _c -} - -// NewMockSerializer creates a new instance of MockSerializer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockSerializer(t interface { - mock.TestingT - Cleanup(func()) -}) *MockSerializer { - mock := &MockSerializer{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/internal/flushcommon/syncmgr/options.go b/internal/flushcommon/syncmgr/options.go index 57e78c63c14ce..4a95e50ae4fa9 100644 --- a/internal/flushcommon/syncmgr/options.go +++ b/internal/flushcommon/syncmgr/options.go @@ -1,90 +1,49 @@ package syncmgr import ( - "github.com/samber/lo" - - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/retry" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) func NewSyncTask() *SyncTask { - return &SyncTask{ - isFlush: false, - insertBinlogs: make(map[int64]*datapb.FieldBinlog), - statsBinlogs: make(map[int64]*datapb.FieldBinlog), - deltaBinlog: &datapb.FieldBinlog{}, - bm25Binlogs: make(map[int64]*datapb.FieldBinlog), - segmentData: make(map[string][]byte), - binlogBlobs: make(map[int64]*storage.Blob), - } + return new(SyncTask) } -func (t *SyncTask) WithChunkManager(cm storage.ChunkManager) *SyncTask { - t.chunkManager = cm - return t -} +func (t *SyncTask) WithSyncPack(pack *SyncPack) *SyncTask { + t.pack = pack -func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask { - t.allocator = allocator + // legacy code, remove later + t.collectionID = t.pack.collectionID + t.partitionID = t.pack.partitionID + t.channelName = t.pack.channelName + t.segmentID = t.pack.segmentID + t.batchRows = t.pack.batchRows + // t.metacache = t.pack.metacache + // t.schema = t.metacache.Schema() + t.startPosition = t.pack.startPosition + t.checkpoint = t.pack.checkpoint + t.level = t.pack.level + t.dataSource = t.pack.dataSource + t.tsFrom = t.pack.tsFrom + t.tsTo = t.pack.tsTo + t.failureCallback = t.pack.errHandler return t } -func (t *SyncTask) WithStartPosition(start *msgpb.MsgPosition) *SyncTask { - t.startPosition = start - return t -} - -func (t *SyncTask) WithCheckpoint(cp *msgpb.MsgPosition) *SyncTask { - t.checkpoint = cp - return t -} - -func (t *SyncTask) WithCollectionID(collID int64) *SyncTask { - t.collectionID = collID - return t -} - -func (t *SyncTask) WithPartitionID(partID int64) *SyncTask { - t.partitionID = partID - return t -} - -func (t *SyncTask) WithSegmentID(segID int64) *SyncTask { - t.segmentID = segID - return t -} - -func (t *SyncTask) WithChannelName(chanName string) *SyncTask { - t.channelName = chanName - return t -} - -func (t *SyncTask) WithSchema(schema *schemapb.CollectionSchema) *SyncTask { - t.schema = schema - t.pkField = lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { - return field.GetIsPrimaryKey() - }) - return t -} - -func (t *SyncTask) WithTimeRange(from, to typeutil.Timestamp) *SyncTask { - t.tsFrom, t.tsTo = from, to +func (t *SyncTask) WithChunkManager(cm storage.ChunkManager) *SyncTask { + t.chunkManager = cm return t } -func (t *SyncTask) WithFlush() *SyncTask { - t.isFlush = true +func (t *SyncTask) WithAllocator(allocator allocator.Interface) *SyncTask { + t.allocator = allocator return t } func (t *SyncTask) WithDrop() *SyncTask { - t.isDrop = true + t.pack.isDrop = true return t } @@ -107,18 +66,3 @@ func (t *SyncTask) WithFailureCallback(callback func(error)) *SyncTask { t.failureCallback = callback return t } - -func (t *SyncTask) WithBatchRows(batchRows int64) *SyncTask { - t.batchRows = batchRows - return t -} - -func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask { - t.level = level - return t -} - -func (t *SyncTask) WithDataSource(source string) *SyncTask { - t.dataSource = source - return t -} diff --git a/internal/flushcommon/syncmgr/pack_writer.go b/internal/flushcommon/syncmgr/pack_writer.go new file mode 100644 index 0000000000000..422f744318ac5 --- /dev/null +++ b/internal/flushcommon/syncmgr/pack_writer.go @@ -0,0 +1,318 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "path" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/retry" +) + +type PackWriter interface { + Write(ctx context.Context, pack *SyncPack) ( + inserts []*datapb.Binlog, deletes *datapb.Binlog, stats *datapb.Binlog, bm25Stats *datapb.Binlog, + size int64, err error) +} + +type BulkPackWriter struct { + metaCache metacache.MetaCache + chunkManager storage.ChunkManager + allocator allocator.Interface + writeRetryOpts []retry.Option + + // prefetched log ids + ids []int64 + sizeWritten int64 +} + +func NewBulkPackWriter(metaCache metacache.MetaCache, chunkManager storage.ChunkManager, + allocator allocator.Interface, writeRetryOpts ...retry.Option, +) *BulkPackWriter { + return &BulkPackWriter{ + metaCache: metaCache, + chunkManager: chunkManager, + allocator: allocator, + writeRetryOpts: writeRetryOpts, + } +} + +func (bw *BulkPackWriter) Write(ctx context.Context, pack *SyncPack) ( + inserts map[int64]*datapb.FieldBinlog, + deltas *datapb.FieldBinlog, + stats map[int64]*datapb.FieldBinlog, + bm25Stats map[int64]*datapb.FieldBinlog, + size int64, + err error, +) { + err = bw.prefetchIDs(pack) + if err != nil { + log.Warn("failed allocate ids for sync task", zap.Error(err)) + return + } + + if inserts, err = bw.writeInserts(ctx, pack); err != nil { + log.Error("failed to write insert data", zap.Error(err)) + return + } + if stats, err = bw.writeStats(ctx, pack); err != nil { + log.Error("failed to process stats blob", zap.Error(err)) + return + } + if deltas, err = bw.writeDelta(ctx, pack); err != nil { + log.Error("failed to process delta blob", zap.Error(err)) + return + } + if bm25Stats, err = bw.writeBM25Stasts(ctx, pack); err != nil { + log.Error("failed to process bm25 stats blob", zap.Error(err)) + return + } + + size = bw.sizeWritten + + return +} + +// prefetchIDs pre-allcates ids depending on the number of blobs current task contains. +func (bw *BulkPackWriter) prefetchIDs(pack *SyncPack) error { + totalIDCount := 0 + if len(pack.insertData) > 0 { + totalIDCount += len(pack.insertData[0].Data) * 2 // binlogs and statslogs + } + if pack.isFlush { + totalIDCount++ // merged stats log + } + if pack.deltaData != nil { + totalIDCount++ + } + if pack.bm25Stats != nil { + totalIDCount += len(pack.bm25Stats) + if pack.isFlush { + totalIDCount++ // merged bm25 stats + } + } + + if totalIDCount == 0 { + return nil + } + start, _, err := bw.allocator.Alloc(uint32(totalIDCount)) + if err != nil { + return err + } + bw.ids = lo.RangeFrom(start, totalIDCount) + return nil +} + +func (bw *BulkPackWriter) nextID() int64 { + if len(bw.ids) == 0 { + panic("pre-fetched ids exhausted") + } + r := bw.ids[0] + bw.ids = bw.ids[1:] + return r +} + +func (bw *BulkPackWriter) writeLog(ctx context.Context, blob *storage.Blob, + root, p string, pack *SyncPack, +) (*datapb.Binlog, error) { + key := path.Join(bw.chunkManager.RootPath(), root, p) + err := retry.Do(ctx, func() error { + return bw.chunkManager.Write(ctx, key, blob.Value) + }, bw.writeRetryOpts...) + if err != nil { + return nil, err + } + size := int64(len(blob.GetValue())) + bw.sizeWritten += size + return &datapb.Binlog{ + EntriesNum: blob.RowNum, + TimestampFrom: pack.tsFrom, + TimestampTo: pack.tsTo, + LogPath: key, + LogSize: size, + MemorySize: blob.MemorySize, + }, nil +} + +func (bw *BulkPackWriter) writeInserts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) { + if len(pack.insertData) == 0 { + return make(map[int64]*datapb.FieldBinlog), nil + } + + serializer, err := NewStorageSerializer(bw.metaCache) + if err != nil { + return nil, err + } + + binlogBlobs, err := serializer.serializeBinlog(ctx, pack) + if err != nil { + return nil, err + } + + logs := make(map[int64]*datapb.FieldBinlog) + for fieldID, blob := range binlogBlobs { + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID()) + binlog, err := bw.writeLog(ctx, blob, common.SegmentInsertLogPath, k, pack) + if err != nil { + return nil, err + } + logs[fieldID] = &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: []*datapb.Binlog{binlog}, + } + } + return logs, nil +} + +func (bw *BulkPackWriter) writeStats(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) { + if len(pack.insertData) == 0 { + return make(map[int64]*datapb.FieldBinlog), nil + } + serializer, err := NewStorageSerializer(bw.metaCache) + if err != nil { + return nil, err + } + singlePKStats, batchStatsBlob, err := serializer.serializeStatslog(pack) + if err != nil { + return nil, err + } + + actions := []metacache.SegmentAction{metacache.RollStats(singlePKStats)} + bw.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID)) + + pkFieldID := serializer.pkField.GetFieldID() + binlogs := make([]*datapb.Binlog, 0) + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, bw.nextID()) + if binlog, err := bw.writeLog(ctx, batchStatsBlob, common.SegmentStatslogPath, k, pack); err != nil { + return nil, err + } else { + binlogs = append(binlogs, binlog) + } + + if pack.isFlush && pack.level != datapb.SegmentLevel_L0 { + mergedStatsBlob, err := serializer.serializeMergedPkStats(pack) + if err != nil { + return nil, err + } + + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, pkFieldID, int64(storage.CompoundStatsType)) + binlog, err := bw.writeLog(ctx, mergedStatsBlob, common.SegmentStatslogPath, k, pack) + if err != nil { + return nil, err + } + binlogs = append(binlogs, binlog) + } + + logs := make(map[int64]*datapb.FieldBinlog) + logs[pkFieldID] = &datapb.FieldBinlog{ + FieldID: pkFieldID, + Binlogs: binlogs, + } + return logs, nil +} + +func (bw *BulkPackWriter) writeBM25Stasts(ctx context.Context, pack *SyncPack) (map[int64]*datapb.FieldBinlog, error) { + if len(pack.bm25Stats) == 0 { + return make(map[int64]*datapb.FieldBinlog), nil + } + + serializer, err := NewStorageSerializer(bw.metaCache) + if err != nil { + return nil, err + } + bm25Blobs, err := serializer.serializeBM25Stats(pack) + if err != nil { + return nil, err + } + + logs := make(map[int64]*datapb.FieldBinlog) + for fieldID, blob := range bm25Blobs { + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, bw.nextID()) + binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack) + if err != nil { + return nil, err + } + logs[fieldID] = &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: []*datapb.Binlog{binlog}, + } + } + + actions := []metacache.SegmentAction{metacache.MergeBm25Stats(pack.bm25Stats)} + bw.metaCache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID)) + + if pack.isFlush { + if pack.level != datapb.SegmentLevel_L0 { + if hasBM25Function(bw.metaCache.Schema()) { + mergedBM25Blob, err := serializer.serializeMergedBM25Stats(pack) + if err != nil { + return nil, err + } + for fieldID, blob := range mergedBM25Blob { + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, fieldID, int64(storage.CompoundStatsType)) + binlog, err := bw.writeLog(ctx, blob, common.SegmentBm25LogPath, k, pack) + if err != nil { + return nil, err + } + fieldBinlog, ok := logs[fieldID] + if !ok { + fieldBinlog = &datapb.FieldBinlog{ + FieldID: fieldID, + } + logs[fieldID] = fieldBinlog + } + fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, binlog) + } + } + } + } + return logs, nil +} + +func (bw *BulkPackWriter) writeDelta(ctx context.Context, pack *SyncPack) (*datapb.FieldBinlog, error) { + if pack.deltaData == nil { + return &datapb.FieldBinlog{}, nil + } + s, err := NewStorageSerializer(bw.metaCache) + if err != nil { + return nil, err + } + deltaBlob, err := s.serializeDeltalog(pack) + if err != nil { + return nil, err + } + + k := metautil.JoinIDPath(pack.collectionID, pack.partitionID, pack.segmentID, bw.nextID()) + deltalog, err := bw.writeLog(ctx, deltaBlob, common.SegmentDeltaLogPath, k, pack) + if err != nil { + return nil, err + } + return &datapb.FieldBinlog{ + FieldID: s.pkField.GetFieldID(), + Binlogs: []*datapb.Binlog{deltalog}, + }, nil +} diff --git a/internal/flushcommon/syncmgr/pack_writer_test.go b/internal/flushcommon/syncmgr/pack_writer_test.go new file mode 100644 index 0000000000000..66330d98e63ba --- /dev/null +++ b/internal/flushcommon/syncmgr/pack_writer_test.go @@ -0,0 +1,190 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncmgr + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestNextID(t *testing.T) { + al := allocator.NewMockGIDAllocator() + i := int64(0) + al.AllocF = func(count uint32) (int64, int64, error) { + rt := i + i += int64(count) + return rt, int64(count), nil + } + al.AllocOneF = func() (allocator.UniqueID, error) { + rt := i + i++ + return rt, nil + } + bw := NewBulkPackWriter(nil, nil, al) + bw.prefetchIDs(new(SyncPack).WithFlush()) + + t.Run("normal_next", func(t *testing.T) { + id := bw.nextID() + assert.Equal(t, int64(0), id) + }) + t.Run("id_exhausted", func(t *testing.T) { + assert.Panics(t, func() { + bw.nextID() + }) + }) +} + +func TestBulkPackWriter_Write(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + + seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, nil, nil) + metacache.UpdateNumOfRows(1000)(seg) + collectionID := int64(123) + partitionID := int64(456) + segmentID := int64(789) + channelName := fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", collectionID) + schema := &schemapb.CollectionSchema{ + Name: "sync_task_test_col", + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField, DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, DataType: schemapb.DataType_Int64}, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_Int64, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + } + + mc := metacache.NewMockMetaCache(t) + mc.EXPECT().Collection().Return(collectionID).Maybe() + mc.EXPECT().Schema().Return(schema).Maybe() + mc.EXPECT().GetSegmentByID(segmentID).Return(seg, true).Maybe() + mc.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}).Maybe() + mc.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(seg) + }).Return().Maybe() + + cm := mocks.NewChunkManager(t) + cm.EXPECT().RootPath().Return("files").Maybe() + cm.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + + deletes := &storage.DeleteData{} + for i := 0; i < 10; i++ { + pk := storage.NewInt64PrimaryKey(int64(i + 1)) + ts := tsoutil.ComposeTSByTime(time.Now(), 0) + deletes.Append(pk, ts) + } + + bw := &BulkPackWriter{ + metaCache: mc, + chunkManager: cm, + allocator: allocator.NewLocalAllocator(10000, 100000), + } + + tests := []struct { + name string + pack *SyncPack + wantInserts map[int64]*datapb.FieldBinlog + wantDeltas *datapb.FieldBinlog + wantStats map[int64]*datapb.FieldBinlog + wantBm25Stats map[int64]*datapb.FieldBinlog + wantSize int64 + wantErr error + }{ + { + name: "empty", + pack: new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName), + wantInserts: map[int64]*datapb.FieldBinlog{}, + wantDeltas: &datapb.FieldBinlog{}, + wantStats: map[int64]*datapb.FieldBinlog{}, + wantBm25Stats: map[int64]*datapb.FieldBinlog{}, + wantSize: 0, + wantErr: nil, + }, + { + name: "with delete", + pack: new(SyncPack).WithCollectionID(collectionID).WithPartitionID(partitionID).WithSegmentID(segmentID).WithChannelName(channelName).WithDeleteData(deletes), + wantInserts: map[int64]*datapb.FieldBinlog{}, + wantDeltas: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: 10, + LogPath: "files/delta_log/123/456/789/10000", + LogSize: 642, + MemorySize: 433, + }, + }, + }, + wantStats: map[int64]*datapb.FieldBinlog{}, + wantBm25Stats: map[int64]*datapb.FieldBinlog{}, + wantSize: 642, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotInserts, gotDeltas, gotStats, gotBm25Stats, gotSize, err := bw.Write(context.Background(), tt.pack) + if err != tt.wantErr { + t.Errorf("BulkPackWriter.Write() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotInserts, tt.wantInserts) { + t.Errorf("BulkPackWriter.Write() gotInserts = %v, want %v", gotInserts, tt.wantInserts) + } + if !reflect.DeepEqual(gotDeltas, tt.wantDeltas) { + t.Errorf("BulkPackWriter.Write() gotDeltas = %v, want %v", gotDeltas, tt.wantDeltas) + } + if !reflect.DeepEqual(gotStats, tt.wantStats) { + t.Errorf("BulkPackWriter.Write() gotStats = %v, want %v", gotStats, tt.wantStats) + } + if !reflect.DeepEqual(gotBm25Stats, tt.wantBm25Stats) { + t.Errorf("BulkPackWriter.Write() gotBm25Stats = %v, want %v", gotBm25Stats, tt.wantBm25Stats) + } + if gotSize != tt.wantSize { + t.Errorf("BulkPackWriter.Write() gotSize = %v, want %v", gotSize, tt.wantSize) + } + }) + } +} diff --git a/internal/flushcommon/syncmgr/storage_serializer.go b/internal/flushcommon/syncmgr/storage_serializer.go index 32379eb0c433f..e866a58bf06d1 100644 --- a/internal/flushcommon/syncmgr/storage_serializer.go +++ b/internal/flushcommon/syncmgr/storage_serializer.go @@ -25,32 +25,23 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/proto/etcdpb" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/timerecord" ) type storageV1Serializer struct { - collectionID int64 - schema *schemapb.CollectionSchema - pkField *schemapb.FieldSchema + schema *schemapb.CollectionSchema + pkField *schemapb.FieldSchema inCodec *storage.InsertCodec - allocator allocator.Interface - metacache metacache.MetaCache - metaWriter MetaWriter + metacache metacache.MetaCache } -func NewStorageSerializer(allocator allocator.Interface, metacache metacache.MetaCache, metaWriter MetaWriter) (*storageV1Serializer, error) { - collectionID := metacache.Collection() +func NewStorageSerializer(metacache metacache.MetaCache) (*storageV1Serializer, error) { schema := metacache.Schema() pkField := lo.FindOrElse(schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() }) if pkField == nil { @@ -58,130 +49,21 @@ func NewStorageSerializer(allocator allocator.Interface, metacache metacache.Met } meta := &etcdpb.CollectionMeta{ Schema: schema, - ID: collectionID, } inCodec := storage.NewInsertCodecWithSchema(meta) return &storageV1Serializer{ - collectionID: collectionID, - schema: schema, - pkField: pkField, + schema: schema, + pkField: pkField, - inCodec: inCodec, - allocator: allocator, - metacache: metacache, - metaWriter: metaWriter, + inCodec: inCodec, + metacache: metacache, }, nil } -func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) (Task, error) { - task := NewSyncTask() - tr := timerecord.NewTimeRecorder("storage_serializer") - - log := log.Ctx(ctx).With( - zap.Int64("segmentID", pack.segmentID), - zap.Int64("collectionID", pack.collectionID), - zap.String("channel", pack.channelName), - ) - - if len(pack.insertData) > 0 { - memSize := make(map[int64]int64) - for _, chunk := range pack.insertData { - for fieldID, fieldData := range chunk.Data { - memSize[fieldID] += int64(fieldData.GetMemorySize()) - } - } - task.binlogMemsize = memSize - - binlogBlobs, err := s.serializeBinlog(ctx, pack) - if err != nil { - log.Warn("failed to serialize binlog", zap.Error(err)) - return nil, err - } - task.binlogBlobs = binlogBlobs - - actions := []metacache.SegmentAction{} - singlePKStats, batchStatsBlob, err := s.serializeStatslog(pack) - if err != nil { - log.Warn("failed to serialized statslog", zap.Error(err)) - return nil, err - } - - task.batchStatsBlob = batchStatsBlob - actions = append(actions, metacache.RollStats(singlePKStats)) - - if len(pack.bm25Stats) > 0 { - statsBlobs, err := s.serializeBM25Stats(pack) - if err != nil { - return nil, err - } - - task.bm25Blobs = statsBlobs - actions = append(actions, metacache.MergeBm25Stats(pack.bm25Stats)) - } - - s.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(pack.segmentID)) - } - - if pack.isFlush { - if pack.level != datapb.SegmentLevel_L0 { - mergedStatsBlob, err := s.serializeMergedPkStats(pack) - if err != nil { - log.Warn("failed to serialize merged stats log", zap.Error(err)) - return nil, err - } - task.mergedStatsBlob = mergedStatsBlob - - if hasBM25Function(s.schema) { - mergedBM25Blob, err := s.serializeMergedBM25Stats(pack) - if err != nil { - log.Warn("failed to serialize merged bm25 stats log", zap.Error(err)) - return nil, err - } - task.mergedBm25Blob = mergedBM25Blob - } - } - - task.WithFlush() - } - - if pack.deltaData != nil { - deltaBlob, err := s.serializeDeltalog(pack) - if err != nil { - log.Warn("failed to serialize delta log", zap.Error(err)) - return nil, err - } - task.deltaBlob = deltaBlob - task.deltaRowCount = pack.deltaData.RowCount - } - if pack.isDrop { - task.WithDrop() - } - - s.setTaskMeta(task, pack) - task.WithAllocator(s.allocator) - - metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pack.level.String()).Observe(float64(tr.RecordSpan().Milliseconds())) - return task, nil -} - -func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { - task.WithCollectionID(pack.collectionID). - WithPartitionID(pack.partitionID). - WithChannelName(pack.channelName). - WithSegmentID(pack.segmentID). - WithBatchRows(pack.batchRows). - WithSchema(s.metacache.Schema()). - WithStartPosition(pack.startPosition). - WithCheckpoint(pack.checkpoint). - WithLevel(pack.level). - WithDataSource(pack.dataSource). - WithTimeRange(pack.tsFrom, pack.tsTo). - WithMetaCache(s.metacache). - WithMetaWriter(s.metaWriter). - WithFailureCallback(pack.errHandler) -} - func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) { + if len(pack.insertData) == 0 { + return make(map[int64]*storage.Blob), nil + } log := log.Ctx(ctx) blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...) if err != nil { @@ -202,6 +84,9 @@ func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPac } func (s *storageV1Serializer) serializeBM25Stats(pack *SyncPack) (map[int64]*storage.Blob, error) { + if len(pack.bm25Stats) == 0 { + return make(map[int64]*storage.Blob), nil + } blobs := make(map[int64]*storage.Blob) for fieldID, stats := range pack.bm25Stats { bytes, err := stats.Serialize() @@ -219,6 +104,9 @@ func (s *storageV1Serializer) serializeBM25Stats(pack *SyncPack) (map[int64]*sto } func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) { + if len(pack.insertData) == 0 { + return nil, nil, nil + } var rowNum int64 var pkFieldData []storage.FieldData for _, chunk := range pack.insertData { diff --git a/internal/flushcommon/syncmgr/storage_serializer_test.go b/internal/flushcommon/syncmgr/storage_serializer_test.go index 6e83c72da428a..51d3618329bd4 100644 --- a/internal/flushcommon/syncmgr/storage_serializer_test.go +++ b/internal/flushcommon/syncmgr/storage_serializer_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/samber/lo" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -92,11 +91,10 @@ func (s *StorageV1SerializerSuite) SetupSuite() { } func (s *StorageV1SerializerSuite) SetupTest() { - s.mockCache.EXPECT().Collection().Return(s.collectionID) s.mockCache.EXPECT().Schema().Return(s.schema) var err error - s.serializer, err = NewStorageSerializer(s.mockAllocator, s.mockCache, s.mockMetaWriter) + s.serializer, err = NewStorageSerializer(s.mockCache) s.Require().NoError(err) } @@ -179,20 +177,8 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { pack.WithTimeRange(50, 100) pack.WithDrop() - task, err := s.serializer.EncodeBuffer(ctx, pack) + _, err := s.serializer.serializeBinlog(ctx, pack) s.NoError(err) - taskV1, ok := task.(*SyncTask) - s.Require().True(ok) - s.Equal(s.collectionID, taskV1.collectionID) - s.Equal(s.partitionID, taskV1.partitionID) - s.Equal(s.channelName, taskV1.channelName) - s.Equal(&msgpb.MsgPosition{ - Timestamp: 1000, - ChannelName: s.channelName, - }, taskV1.checkpoint) - s.EqualValues(50, taskV1.tsFrom) - s.EqualValues(100, taskV1.tsTo) - s.True(taskV1.isDrop) }) s.Run("with_empty_data", func() { @@ -200,7 +186,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { pack.WithTimeRange(50, 100) pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchRows(0) - _, err := s.serializer.EncodeBuffer(ctx, pack) + _, err := s.serializer.serializeBinlog(ctx, pack) s.Error(err) }) @@ -209,32 +195,21 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { pack.WithTimeRange(50, 100) pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchRows(10) - s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() - - task, err := s.serializer.EncodeBuffer(ctx, pack) + blobs, err := s.serializer.serializeBinlog(ctx, pack) s.NoError(err) - - taskV1, ok := task.(*SyncTask) - s.Require().True(ok) - s.Equal(s.collectionID, taskV1.collectionID) - s.Equal(s.partitionID, taskV1.partitionID) - s.Equal(s.channelName, taskV1.channelName) - s.Equal(&msgpb.MsgPosition{ - Timestamp: 1000, - ChannelName: s.channelName, - }, taskV1.checkpoint) - s.EqualValues(50, taskV1.tsFrom) - s.EqualValues(100, taskV1.tsTo) - s.Len(taskV1.binlogBlobs, 4) - s.NotNil(taskV1.batchStatsBlob) + s.Len(blobs, 4) + stats, blob, err := s.serializer.serializeStatslog(pack) + s.NoError(err) + s.NotNil(stats) + s.NotNil(blob) }) s.Run("with_flush_segment_not_found", func() { pack := s.getBasicPack() - pack.WithFlush() + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithFlush() s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false).Once() - _, err := s.serializer.EncodeBuffer(ctx, pack) + _, err := s.serializer.serializeMergedPkStats(pack) s.Error(err) }) @@ -247,63 +222,39 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { bfs := s.getBfs() segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs, nil) metacache.UpdateNumOfRows(1000)(segInfo) - s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { - action(segInfo) - }).Return().Once() - s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true).Once() + s.mockCache.EXPECT().GetSegmentByID(s.segmentID).Return(segInfo, true) - task, err := s.serializer.EncodeBuffer(ctx, pack) + blobs, err := s.serializer.serializeBinlog(ctx, pack) s.NoError(err) - - taskV1, ok := task.(*SyncTask) - s.Require().True(ok) - s.Equal(s.collectionID, taskV1.collectionID) - s.Equal(s.partitionID, taskV1.partitionID) - s.Equal(s.channelName, taskV1.channelName) - s.Equal(&msgpb.MsgPosition{ - Timestamp: 1000, - ChannelName: s.channelName, - }, taskV1.checkpoint) - s.EqualValues(50, taskV1.tsFrom) - s.EqualValues(100, taskV1.tsTo) - s.Len(taskV1.binlogBlobs, 4) - s.NotNil(taskV1.batchStatsBlob) - s.NotNil(taskV1.mergedStatsBlob) + s.Len(blobs, 4) + stats, blob, err := s.serializer.serializeStatslog(pack) + s.NoError(err) + s.NotNil(stats) + s.NotNil(blob) + action := metacache.RollStats(stats) + action(segInfo) + blob, err = s.serializer.serializeMergedPkStats(pack) + s.NoError(err) + s.NotNil(blob) }) } func (s *StorageV1SerializerSuite) TestSerializeDelete() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - s.Run("serialize_normal", func() { pack := s.getBasicPack() pack.WithDeleteData(s.getDeleteBuffer()) pack.WithTimeRange(50, 100) - task, err := s.serializer.EncodeBuffer(ctx, pack) + blob, err := s.serializer.serializeDeltalog(pack) s.NoError(err) - - taskV1, ok := task.(*SyncTask) - s.Require().True(ok) - s.Equal(s.collectionID, taskV1.collectionID) - s.Equal(s.partitionID, taskV1.partitionID) - s.Equal(s.channelName, taskV1.channelName) - s.Equal(&msgpb.MsgPosition{ - Timestamp: 1000, - ChannelName: s.channelName, - }, taskV1.checkpoint) - s.EqualValues(50, taskV1.tsFrom) - s.EqualValues(100, taskV1.tsTo) - s.NotNil(taskV1.deltaBlob) + s.NotNil(blob) }) } func (s *StorageV1SerializerSuite) TestBadSchema() { mockCache := metacache.NewMockMetaCache(s.T()) - mockCache.EXPECT().Collection().Return(s.collectionID).Once() mockCache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}).Once() - _, err := NewStorageSerializer(s.mockAllocator, mockCache, s.mockMetaWriter) + _, err := NewStorageSerializer(mockCache) s.Error(err) } diff --git a/internal/flushcommon/syncmgr/sync_manager_test.go b/internal/flushcommon/syncmgr/sync_manager_test.go index e674ba55b023f..af021748e7fed 100644 --- a/internal/flushcommon/syncmgr/sync_manager_test.go +++ b/internal/flushcommon/syncmgr/sync_manager_test.go @@ -2,13 +2,11 @@ package syncmgr import ( "context" - "math/rand" "strconv" "testing" "time" "github.com/cockroachdb/errors" - "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -23,13 +21,11 @@ import ( "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/mocks" - "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/tsoutil" ) type SyncManagerSuite struct { @@ -95,59 +91,15 @@ func (s *SyncManagerSuite) SetupTest() { s.metacache = metacache.NewMockMetaCache(s.T()) } -func (s *SyncManagerSuite) getEmptyInsertBuffer() *storage.InsertData { - buf, err := storage.NewInsertData(s.schema) - s.Require().NoError(err) - - return buf -} - -func (s *SyncManagerSuite) getInsertBuffer() *storage.InsertData { - buf := s.getEmptyInsertBuffer() - - // generate data - for i := 0; i < 10; i++ { - data := make(map[storage.FieldID]any) - data[common.RowIDField] = int64(i + 1) - data[common.TimeStampField] = int64(i + 1) - data[100] = int64(i + 1) - vector := lo.RepeatBy(128, func(_ int) float32 { - return rand.Float32() - }) - data[101] = vector - err := buf.Append(data) - s.Require().NoError(err) - } - return buf -} - -func (s *SyncManagerSuite) getDeleteBuffer() *storage.DeleteData { - buf := &storage.DeleteData{} - for i := 0; i < 10; i++ { - pk := storage.NewInt64PrimaryKey(int64(i + 1)) - ts := tsoutil.ComposeTSByTime(time.Now(), 0) - buf.Append(pk, ts) - } - return buf -} - -func (s *SyncManagerSuite) getDeleteBufferZeroTs() *storage.DeleteData { - buf := &storage.DeleteData{} - for i := 0; i < 10; i++ { - pk := storage.NewInt64PrimaryKey(int64(i + 1)) - buf.Append(pk, 0) - } - return buf -} - func (s *SyncManagerSuite) getSuiteSyncTask() *SyncTask { - task := NewSyncTask().WithCollectionID(s.collectionID). - WithPartitionID(s.partitionID). - WithSegmentID(s.segmentID). - WithChannelName(s.channelName). - WithSchema(s.schema). - WithChunkManager(s.chunkManager). + task := NewSyncTask(). + WithSyncPack(new(SyncPack). + WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName)). WithAllocator(s.allocator). + WithChunkManager(s.chunkManager). WithMetaCache(s.metacache). WithAllocator(s.allocator) @@ -166,12 +118,6 @@ func (s *SyncManagerSuite) TestSubmit() { manager := NewSyncManager(s.chunkManager) task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithTimeRange(50, 100) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) f, err := manager.SyncData(context.Background(), task) s.NoError(err) @@ -206,12 +152,6 @@ func (s *SyncManagerSuite) TestCompacted() { manager := NewSyncManager(s.chunkManager) task := s.getSuiteSyncTask() task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithTimeRange(50, 100) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) f, err := manager.SyncData(context.Background(), task) s.NoError(err) @@ -295,7 +235,6 @@ func (s *SyncManagerSuite) TestSyncManager_TaskStatsJSON() { collectionID: 1, partitionID: 1, channelName: "channel1", - schema: &schemapb.CollectionSchema{}, checkpoint: &msgpb.MsgPosition{}, tsFrom: 1000, tsTo: 2000, @@ -306,7 +245,6 @@ func (s *SyncManagerSuite) TestSyncManager_TaskStatsJSON() { collectionID: 2, partitionID: 2, channelName: "channel2", - schema: &schemapb.CollectionSchema{}, checkpoint: &msgpb.MsgPosition{}, tsFrom: 3000, tsTo: 4000, diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index c04909c0f546c..9d0f46a2437e6 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -19,25 +19,20 @@ package syncmgr import ( "context" "fmt" - "path" "time" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -50,13 +45,10 @@ type SyncTask struct { chunkManager storage.ChunkManager allocator allocator.Interface - segment *metacache.SegmentInfo collectionID int64 partitionID int64 segmentID int64 channelName string - schema *schemapb.CollectionSchema - pkField *schemapb.FieldSchema startPosition *msgpb.MsgPosition checkpoint *msgpb.MsgPosition dataSource string @@ -68,34 +60,16 @@ type SyncTask struct { tsFrom typeutil.Timestamp tsTo typeutil.Timestamp - isFlush bool - isDrop bool - metacache metacache.MetaCache metaWriter MetaWriter + pack *SyncPack + insertBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog statsBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog bm25Binlogs map[int64]*datapb.FieldBinlog deltaBinlog *datapb.FieldBinlog - binlogBlobs map[int64]*storage.Blob // fieldID => blob - binlogMemsize map[int64]int64 // memory size - - bm25Blobs map[int64]*storage.Blob - mergedBm25Blob map[int64]*storage.Blob - - batchStatsBlob *storage.Blob - mergedStatsBlob *storage.Blob - - deltaBlob *storage.Blob - deltaRowCount int64 - - // prefetched log ids - ids []int64 - - segmentData map[string][]byte - writeRetryOpts []retry.Option failureCallback func(err error) @@ -122,7 +96,7 @@ func (t *SyncTask) HandleError(err error) { } metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc() - if !t.isFlush { + if !t.pack.isFlush { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel, t.level.String()).Inc() } } @@ -137,10 +111,9 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } }() - var has bool - t.segment, has = t.metacache.GetSegmentByID(t.segmentID) + _, has := t.metacache.GetSegmentByID(t.segmentID) if !has { - if t.isDrop { + if t.pack.isDrop { log.Info("segment dropped, discard sync task") return nil } @@ -149,35 +122,13 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { return err } - err = t.prefetchIDs() - if err != nil { - log.Warn("failed allocate ids for sync task", zap.Error(err)) - return err - } - - t.processInsertBlobs() - t.processStatsBlob() - t.processDeltaBlob() - - if len(t.bm25Blobs) > 0 || len(t.mergedBm25Blob) > 0 { - t.processBM25StastBlob() - } - - err = t.writeLogs(ctx) + writer := NewBulkPackWriter(t.metacache, t.chunkManager, t.allocator, t.writeRetryOpts...) + t.insertBinlogs, t.deltaBinlog, t.statsBinlogs, t.bm25Binlogs, t.flushedSize, err = writer.Write(ctx, t.pack) if err != nil { - log.Warn("failed to save serialized data into storage", zap.Error(err)) + log.Warn("failed to write sync data", zap.Error(err)) return err } - var totalSize int64 - for _, size := range t.binlogMemsize { - totalSize += size - } - if t.deltaBlob != nil { - totalSize += int64(len(t.deltaBlob.Value)) - } - t.flushedSize = totalSize - metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource, t.level.String()).Add(float64(t.flushedSize)) metrics.DataNodeFlushedRows.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.dataSource).Add(float64(t.batchRows)) @@ -192,203 +143,27 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { } actions := []metacache.SegmentAction{metacache.FinishSyncing(t.batchRows)} - if t.isFlush { + if t.pack.isFlush { actions = append(actions, metacache.UpdateState(commonpb.SegmentState_Flushed)) } - t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segment.SegmentID())) + t.metacache.UpdateSegments(metacache.MergeSegmentAction(actions...), metacache.WithSegmentIDs(t.segmentID)) - if t.isDrop { - t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segment.SegmentID())) - log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName)) + if t.pack.isDrop { + t.metacache.RemoveSegments(metacache.WithSegmentIDs(t.segmentID)) + log.Info("segment removed", zap.Int64("segmentID", t.segmentID), zap.String("channel", t.channelName)) } t.execTime = t.tr.ElapseSpan() - log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime)) + log.Info("task done", zap.Int64("flushedSize", t.flushedSize), zap.Duration("timeTaken", t.execTime)) - if !t.isFlush { + if !t.pack.isFlush { metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc() } metrics.DataNodeFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc() - // free blobs and data - t.binlogBlobs = nil - t.deltaBlob = nil - t.mergedStatsBlob = nil - t.batchStatsBlob = nil - t.segmentData = nil return nil } -// prefetchIDs pre-allcates ids depending on the number of blobs current task contains. -func (t *SyncTask) prefetchIDs() error { - totalIDCount := len(t.binlogBlobs) - if t.batchStatsBlob != nil { - totalIDCount++ - } - if t.deltaBlob != nil { - totalIDCount++ - } - if t.bm25Blobs != nil { - totalIDCount += len(t.bm25Blobs) - } - - start, _, err := t.allocator.Alloc(uint32(totalIDCount)) - if err != nil { - return err - } - t.ids = lo.RangeFrom(start, totalIDCount) - return nil -} - -func (t *SyncTask) nextID() int64 { - if len(t.ids) == 0 { - panic("pre-fetched ids exhausted") - } - r := t.ids[0] - t.ids = t.ids[1:] - return r -} - -func (t *SyncTask) processInsertBlobs() { - for fieldID, blob := range t.binlogBlobs { - k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID()) - key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k) - t.segmentData[key] = blob.GetValue() - t.appendBinlog(fieldID, &datapb.Binlog{ - EntriesNum: blob.RowNum, - TimestampFrom: t.tsFrom, - TimestampTo: t.tsTo, - LogPath: key, - LogSize: int64(len(blob.GetValue())), - MemorySize: t.binlogMemsize[fieldID], - }) - } -} - -func (t *SyncTask) processBM25StastBlob() { - for fieldID, blob := range t.bm25Blobs { - k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, t.nextID()) - key := path.Join(t.chunkManager.RootPath(), common.SegmentBm25LogPath, k) - t.segmentData[key] = blob.GetValue() - t.appendBM25Statslog(fieldID, &datapb.Binlog{ - EntriesNum: blob.RowNum, - TimestampFrom: t.tsFrom, - TimestampTo: t.tsTo, - LogPath: key, - LogSize: int64(len(blob.GetValue())), - MemorySize: blob.MemorySize, - }) - } - - for fieldID, blob := range t.mergedBm25Blob { - k := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, int64(storage.CompoundStatsType)) - key := path.Join(t.chunkManager.RootPath(), common.SegmentBm25LogPath, k) - t.segmentData[key] = blob.GetValue() - t.appendBM25Statslog(fieldID, &datapb.Binlog{ - EntriesNum: blob.RowNum, - TimestampFrom: t.tsFrom, - TimestampTo: t.tsTo, - LogPath: key, - LogSize: int64(len(blob.GetValue())), - MemorySize: blob.MemorySize, - }) - } -} - -func (t *SyncTask) processStatsBlob() { - if t.batchStatsBlob != nil { - t.convertBlob2StatsBinlog(t.batchStatsBlob, t.pkField.GetFieldID(), t.nextID(), t.batchRows) - } - if t.mergedStatsBlob != nil { - totalRowNum := t.segment.NumOfRows() - t.convertBlob2StatsBinlog(t.mergedStatsBlob, t.pkField.GetFieldID(), int64(storage.CompoundStatsType), totalRowNum) - } -} - -func (t *SyncTask) processDeltaBlob() { - if t.deltaBlob != nil { - value := t.deltaBlob.GetValue() - data := &datapb.Binlog{} - - blobKey := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, t.nextID()) - blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) - - t.segmentData[blobPath] = value - data.LogSize = int64(len(t.deltaBlob.Value)) - data.LogPath = blobPath - data.TimestampFrom = t.tsFrom - data.TimestampTo = t.tsTo - data.EntriesNum = t.deltaRowCount - data.MemorySize = t.deltaBlob.GetMemorySize() - t.appendDeltalog(data) - } -} - -func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID int64, rowNum int64) { - key := metautil.JoinIDPath(t.collectionID, t.partitionID, t.segmentID, fieldID, logID) - key = path.Join(t.chunkManager.RootPath(), common.SegmentStatslogPath, key) - - value := blob.GetValue() - t.segmentData[key] = value - t.appendStatslog(fieldID, &datapb.Binlog{ - EntriesNum: rowNum, - TimestampFrom: t.tsFrom, - TimestampTo: t.tsTo, - LogPath: key, - LogSize: int64(len(value)), - MemorySize: int64(len(value)), - }) -} - -func (t *SyncTask) appendBinlog(fieldID int64, binlog *datapb.Binlog) { - fieldBinlog, ok := t.insertBinlogs[fieldID] - if !ok { - fieldBinlog = &datapb.FieldBinlog{ - FieldID: fieldID, - } - t.insertBinlogs[fieldID] = fieldBinlog - } - - fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, binlog) -} - -func (t *SyncTask) appendBM25Statslog(fieldID int64, log *datapb.Binlog) { - fieldBinlog, ok := t.bm25Binlogs[fieldID] - if !ok { - fieldBinlog = &datapb.FieldBinlog{ - FieldID: fieldID, - } - t.bm25Binlogs[fieldID] = fieldBinlog - } - fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, log) -} - -func (t *SyncTask) appendStatslog(fieldID int64, statlog *datapb.Binlog) { - fieldBinlog, ok := t.statsBinlogs[fieldID] - if !ok { - fieldBinlog = &datapb.FieldBinlog{ - FieldID: fieldID, - } - t.statsBinlogs[fieldID] = fieldBinlog - } - fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, statlog) -} - -func (t *SyncTask) appendDeltalog(deltalog *datapb.Binlog) { - t.deltaBinlog.Binlogs = append(t.deltaBinlog.Binlogs, deltalog) -} - -// writeLogs writes log files (binlog/deltalog/statslog) into storage via chunkManger. -func (t *SyncTask) writeLogs(ctx context.Context) error { - return retry.Handle(ctx, func() (bool, error) { - err := t.chunkManager.MultiWrite(ctx, t.segmentData) - if err != nil { - return !merr.IsCanceledOrTimeout(err), err - } - return false, nil - }, t.writeRetryOpts...) -} - // writeMeta updates segments via meta writer in option. func (t *SyncTask) writeMeta(ctx context.Context) error { return t.metaWriter.UpdateSync(ctx, t) @@ -411,7 +186,7 @@ func (t *SyncTask) ChannelName() string { } func (t *SyncTask) IsFlush() bool { - return t.isFlush + return t.pack.isFlush } func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog, map[int64]*datapb.FieldBinlog) { @@ -419,13 +194,17 @@ func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.F } func (t *SyncTask) MarshalJSON() ([]byte, error) { + deltaRowCount := int64(0) + if t.pack != nil && t.pack.deltaData != nil { + deltaRowCount = t.pack.deltaData.RowCount + } return json.Marshal(&metricsinfo.SyncTask{ SegmentID: t.segmentID, BatchRows: t.batchRows, SegmentLevel: t.level.String(), TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom), TSTo: tsoutil.PhysicalTimeFormat(t.tsTo), - DeltaRowCount: t.deltaRowCount, + DeltaRowCount: deltaRowCount, FlushSize: t.flushedSize, RunningTime: t.execTime.String(), NodeID: paramtable.GetNodeID(), diff --git a/internal/flushcommon/syncmgr/task_test.go b/internal/flushcommon/syncmgr/task_test.go index ef1655fc8c6fe..595a1557f813a 100644 --- a/internal/flushcommon/syncmgr/task_test.go +++ b/internal/flushcommon/syncmgr/task_test.go @@ -102,10 +102,12 @@ func (s *SyncTaskSuite) SetupTest() { s.chunkManager = mocks.NewChunkManager(s.T()) s.chunkManager.EXPECT().RootPath().Return("files").Maybe() - s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil).Maybe() + s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() s.broker = broker.NewMockBroker(s.T()) s.metacache = metacache.NewMockMetaCache(s.T()) + s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe() + s.metacache.EXPECT().Schema().Return(s.schema).Maybe() } func (s *SyncTaskSuite) getEmptyInsertBuffer() *storage.InsertData { @@ -144,26 +146,16 @@ func (s *SyncTaskSuite) getDeleteBuffer() *storage.DeleteData { return buf } -func (s *SyncTaskSuite) getDeleteBufferZeroTs() *storage.DeleteData { - buf := &storage.DeleteData{} - for i := 0; i < 10; i++ { - pk := storage.NewInt64PrimaryKey(int64(i + 1)) - buf.Append(pk, 0) - } - return buf -} - -func (s *SyncTaskSuite) getSuiteSyncTask() *SyncTask { - task := NewSyncTask().WithCollectionID(s.collectionID). - WithPartitionID(s.partitionID). - WithSegmentID(s.segmentID). - WithChannelName(s.channelName). - WithSchema(s.schema). - WithChunkManager(s.chunkManager). +func (s *SyncTaskSuite) getSuiteSyncTask(pack *SyncPack) *SyncTask { + task := NewSyncTask(). + WithSyncPack(pack. + WithCollectionID(s.collectionID). + WithPartitionID(s.partitionID). + WithSegmentID(s.segmentID). + WithChannelName(s.channelName)). WithAllocator(s.allocator). + WithChunkManager(s.chunkManager). WithMetaCache(s.metacache) - task.binlogMemsize = map[int64]int64{0: 1, 1: 1, 100: 100} - return task } @@ -192,79 +184,64 @@ func (s *SyncTaskSuite) TestRunNormal() { seg.GetBloomFilterSet().Roll() s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() + s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) { + action(seg) + }).Return() s.Run("without_data", func() { - task := s.getSuiteSyncTask() + task := s.getSuiteSyncTask(new(SyncPack).WithCheckpoint( + &msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithTimeRange(50, 100) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) err := task.Run(ctx) s.NoError(err) }) s.Run("with_insert_delete_cp", func() { - task := s.getSuiteSyncTask() - task.WithTimeRange(50, 100) + task := s.getSuiteSyncTask( + new(SyncPack). + WithInsertData([]*storage.InsertData{s.getInsertBuffer()}). + WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - task.binlogBlobs[100] = &storage.Blob{ - Key: "100", - Value: []byte("test_data"), - } err := task.Run(ctx) s.NoError(err) }) - s.Run("with_statslog", func() { - task := s.getSuiteSyncTask() - task.WithTimeRange(50, 100) + s.Run("with_flush", func() { + task := s.getSuiteSyncTask( + new(SyncPack). + WithInsertData([]*storage.InsertData{s.getInsertBuffer()}). + WithFlush(). + WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - task.WithFlush() - task.batchStatsBlob = &storage.Blob{ - Key: "100", - Value: []byte("test_data"), - } - task.mergedStatsBlob = &storage.Blob{ - Key: "1", - Value: []byte("test_data"), - } - err := task.Run(ctx) s.NoError(err) }) - s.Run("with_delta_data", func() { + s.Run("with_drop", func() { s.metacache.EXPECT().RemoveSegments(mock.Anything, mock.Anything).Return(nil).Once() - task := s.getSuiteSyncTask() - task.WithTimeRange(50, 100) + task := s.getSuiteSyncTask(new(SyncPack). + WithDeleteData(s.getDeleteBuffer()). + WithDrop(). + WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - task.WithDrop() - task.deltaBlob = &storage.Blob{ - Key: "100", - Value: []byte("test_data"), - } - err := task.Run(ctx) s.NoError(err) }) @@ -281,19 +258,15 @@ func (s *SyncTaskSuite) TestRunL0Segment() { s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() s.Run("pure_delete_l0_flush", func() { - task := s.getSuiteSyncTask() - task.deltaBlob = &storage.Blob{ - Key: "100", - Value: []byte("test_data"), - } - task.WithTimeRange(50, 100) + task := s.getSuiteSyncTask(new(SyncPack). + WithDeleteData(s.getDeleteBuffer()). + WithFlush(). + WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - task.WithFlush() err := task.Run(ctx) s.NoError(err) @@ -307,7 +280,7 @@ func (s *SyncTaskSuite) TestRunError() { s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(nil, false) flag := false handler := func(_ error) { flag = true } - task := s.getSuiteSyncTask().WithFailureCallback(handler) + task := s.getSuiteSyncTask(new(SyncPack)).WithFailureCallback(handler) err := task.Run(ctx) @@ -320,13 +293,14 @@ func (s *SyncTaskSuite) TestRunError() { metacache.UpdateNumOfRows(1000)(seg) s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg}) + s.metacache.EXPECT().Collection().Return(s.collectionID).Maybe() + s.metacache.EXPECT().Schema().Return(s.schema).Maybe() s.Run("allocate_id_fail", func() { mockAllocator := allocator.NewMockAllocator(s.T()) mockAllocator.EXPECT().Alloc(mock.Anything).Return(0, 0, errors.New("mocked")) - task := s.getSuiteSyncTask() - task.allocator = mockAllocator + task := s.getSuiteSyncTask(new(SyncPack).WithFlush()).WithAllocator(mockAllocator) err := task.Run(ctx) s.Error(err) @@ -335,14 +309,13 @@ func (s *SyncTaskSuite) TestRunError() { s.Run("metawrite_fail", func() { s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(errors.New("mocked")) - task := s.getSuiteSyncTask() + task := s.getSuiteSyncTask(new(SyncPack). + WithCheckpoint(&msgpb.MsgPosition{ + ChannelName: s.channelName, + MsgID: []byte{1, 2, 3, 4}, + Timestamp: 100, + })) task.WithMetaWriter(BrokerMetaWriter(s.broker, 1, retry.Attempts(1))) - task.WithTimeRange(50, 100) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) err := task.Run(ctx) s.Error(err) @@ -353,14 +326,10 @@ func (s *SyncTaskSuite) TestRunError() { handler := func(_ error) { flag = true } s.chunkManager.ExpectedCalls = nil s.chunkManager.EXPECT().RootPath().Return("files") - s.chunkManager.EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked"))) - task := s.getSuiteSyncTask().WithFailureCallback(handler) - task.binlogBlobs[100] = &storage.Blob{ - Key: "100", - Value: []byte("test_data"), - } - - task.WithWriteRetryOptions(retry.Attempts(1)) + s.chunkManager.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(retry.Unrecoverable(errors.New("mocked"))) + task := s.getSuiteSyncTask(new(SyncPack).WithInsertData([]*storage.InsertData{s.getInsertBuffer()})). + WithFailureCallback(handler). + WithWriteRetryOptions(retry.Attempts(1)) err := task.Run(ctx) @@ -369,43 +338,26 @@ func (s *SyncTaskSuite) TestRunError() { }) } -func (s *SyncTaskSuite) TestNextID() { - task := s.getSuiteSyncTask() - - task.ids = []int64{0} - s.Run("normal_next", func() { - id := task.nextID() - s.EqualValues(0, id) - }) - s.Run("id_exhausted", func() { - s.Panics(func() { - task.nextID() - }) - }) -} - func (s *SyncTaskSuite) TestSyncTask_MarshalJSON() { t := &SyncTask{ - segmentID: 12345, - batchRows: 100, - level: datapb.SegmentLevel_L0, - tsFrom: 1000, - tsTo: 2000, - deltaRowCount: 10, - flushedSize: 1024, - execTime: 2 * time.Second, + segmentID: 12345, + batchRows: 100, + level: datapb.SegmentLevel_L0, + tsFrom: 1000, + tsTo: 2000, + flushedSize: 1024, + execTime: 2 * time.Second, } tm := &metricsinfo.SyncTask{ - SegmentID: t.segmentID, - BatchRows: t.batchRows, - SegmentLevel: t.level.String(), - TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom), - TSTo: tsoutil.PhysicalTimeFormat(t.tsTo), - DeltaRowCount: t.deltaRowCount, - FlushSize: t.flushedSize, - RunningTime: t.execTime.String(), - NodeID: paramtable.GetNodeID(), + SegmentID: t.segmentID, + BatchRows: t.batchRows, + SegmentLevel: t.level.String(), + TSFrom: tsoutil.PhysicalTimeFormat(t.tsFrom), + TSTo: tsoutil.PhysicalTimeFormat(t.tsTo), + FlushSize: t.flushedSize, + RunningTime: t.execTime.String(), + NodeID: paramtable.GetNodeID(), } expectedBytes, err := json.Marshal(tm) s.NoError(err) diff --git a/internal/flushcommon/writebuffer/l0_write_buffer_test.go b/internal/flushcommon/writebuffer/l0_write_buffer_test.go index aa79ddaf2c90c..eccfdca42107c 100644 --- a/internal/flushcommon/writebuffer/l0_write_buffer_test.go +++ b/internal/flushcommon/writebuffer/l0_write_buffer_test.go @@ -204,16 +204,6 @@ func (s *L0WriteBufferSuite) TestBufferData() { }) } -func (s *L0WriteBufferSuite) TestCreateFailure() { - metacache := metacache.NewMockMetaCache(s.T()) - metacache.EXPECT().Collection().Return(s.collID) - metacache.EXPECT().Schema().Return(&schemapb.CollectionSchema{}) - _, err := NewL0WriteBuffer(s.channelName, metacache, s.syncMgr, &writeBufferOption{ - idAllocator: s.allocator, - }) - s.Error(err) -} - func TestL0WriteBuffer(t *testing.T) { suite.Run(t, new(L0WriteBufferSuite)) } diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index f5538b2810ac1..e1681c5d51a44 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -13,6 +13,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/flushcommon/syncmgr" @@ -115,6 +116,7 @@ type writeBufferBase struct { channelName string metaWriter syncmgr.MetaWriter + allocator allocator.Interface collSchema *schemapb.CollectionSchema estSizePerRecord int metaCache metacache.MetaCache @@ -125,7 +127,6 @@ type writeBufferBase struct { syncPolicies []SyncPolicy syncCheckpoint *checkpointCandidates syncMgr syncmgr.SyncManager - serializer syncmgr.Serializer checkpoint *msgpb.MsgPosition flushTimestamp *atomic.Uint64 @@ -143,17 +144,6 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s flushTsPolicy := GetFlushTsPolicy(flushTs, metacache) option.syncPolicies = append(option.syncPolicies, flushTsPolicy) - var serializer syncmgr.Serializer - var err error - serializer, err = syncmgr.NewStorageSerializer( - option.idAllocator, - metacache, - option.metaWriter, - ) - if err != nil { - return nil, err - } - schema := metacache.Schema() estSize, err := typeutil.EstimateSizePerRecord(schema) if err != nil { @@ -167,9 +157,9 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s estSizePerRecord: estSize, syncMgr: syncMgr, metaWriter: option.metaWriter, + allocator: option.idAllocator, buffers: make(map[int64]*segmentBuffer), metaCache: metacache, - serializer: serializer, syncCheckpoint: newCheckpointCandiates(), syncPolicies: option.syncPolicies, flushTimestamp: flushTs, @@ -617,7 +607,12 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(wb.collectionID)).Sub(totalMemSize) - return wb.serializer.EncodeBuffer(ctx, pack) + task := syncmgr.NewSyncTask(). + WithAllocator(wb.allocator). + WithMetaWriter(wb.metaWriter). + WithMetaCache(wb.metaCache). + WithSyncPack(pack) + return task, nil } // getEstBatchSize returns the batch size based on estimated size per record and FlushBufferSize configuration value. diff --git a/internal/flushcommon/writebuffer/write_buffer_test.go b/internal/flushcommon/writebuffer/write_buffer_test.go index 55f41f42f9d9d..55d595a01ebb0 100644 --- a/internal/flushcommon/writebuffer/write_buffer_test.go +++ b/internal/flushcommon/writebuffer/write_buffer_test.go @@ -17,7 +17,6 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -225,39 +224,6 @@ func (s *WriteBufferSuite) TestGetCheckpoint() { }) } -func (s *WriteBufferSuite) TestSyncSegmentsError() { - wb, err := newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ - pkStatsFactory: func(vchannel *datapb.SegmentInfo) pkoracle.PkStat { - return pkoracle.NewBloomFilterSet() - }, - }) - s.Require().NoError(err) - - serializer := syncmgr.NewMockSerializer(s.T()) - - wb.serializer = serializer - - segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ - ID: 1, - }, nil, nil) - s.metacache.EXPECT().GetSegmentByID(int64(1)).Return(segment, true) - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - - s.Run("segment_not_found", func() { - serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrSegmentNotFound(1)).Once() - s.NotPanics(func() { - wb.syncSegments(context.Background(), []int64{1}) - }) - }) - - s.Run("other_err", func() { - serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(nil, merr.WrapErrServiceInternal("mocked")).Once() - s.Panics(func() { - wb.syncSegments(context.Background(), []int64{1}) - }) - }) -} - func (s *WriteBufferSuite) TestEvictBuffer() { wb, err := newWriteBufferBase(s.channelName, s.metacache, s.syncMgr, &writeBufferOption{ pkStatsFactory: func(vchannel *datapb.SegmentInfo) pkoracle.PkStat { @@ -266,10 +232,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() { }) s.Require().NoError(err) - serializer := syncmgr.NewMockSerializer(s.T()) - - wb.serializer = serializer - s.Run("no_checkpoint", func() { wb.mut.Lock() wb.buffers[100] = &segmentBuffer{} @@ -281,8 +243,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() { }() wb.EvictBuffer(GetOldestBufferPolicy(1)) - - serializer.AssertNotCalled(s.T(), "EncodeBuffer") }) s.Run("trigger_sync", func() { @@ -314,7 +274,6 @@ func (s *WriteBufferSuite) TestEvictBuffer() { }, nil, nil) s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true) s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return() - serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil) s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) { return struct{}{}, nil }), nil) diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 561924489284d..0ae5006581b95 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -146,9 +146,10 @@ func (insertCodec *InsertCodec) SerializePkStats(stats *PrimaryKeyStats, rowNum buffer := statsWriter.GetBuffer() return &Blob{ - Key: blobKey, - Value: buffer, - RowNum: rowNum, + Key: blobKey, + Value: buffer, + RowNum: rowNum, + MemorySize: int64(len(buffer)), }, nil } diff --git a/pkg/metrics/datanode_metrics.go b/pkg/metrics/datanode_metrics.go index 173d9000ea369..1a44da1b164f1 100644 --- a/pkg/metrics/datanode_metrics.go +++ b/pkg/metrics/datanode_metrics.go @@ -115,18 +115,6 @@ var ( collectionIDLabelName, }) - DataNodeEncodeBufferLatency = prometheus.NewHistogramVec( // TODO: arguably - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.DataNodeRole, - Name: "encode_buffer_latency", - Help: "latency of encode buffer data", - Buckets: buckets, - }, []string{ - nodeIDLabelName, - segmentLevelLabelName, - }) - DataNodeSave2StorageLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, @@ -268,7 +256,6 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeFlowGraphBufferDataSize) // output related registry.MustRegister(DataNodeAutoFlushBufferCount) - registry.MustRegister(DataNodeEncodeBufferLatency) registry.MustRegister(DataNodeSave2StorageLatency) registry.MustRegister(DataNodeFlushBufferCount) registry.MustRegister(DataNodeFlushReqCounter)