Skip to content

Commit

Permalink
enhance: make serialization be part of sync task to support file form…
Browse files Browse the repository at this point in the history
…at change (#38946)

See #38945

---------

Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu authored Jan 23, 2025
1 parent 176ef63 commit 56659ba
Show file tree
Hide file tree
Showing 20 changed files with 839 additions and 924 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ generate-mockery-flushcommon: getdeps
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/flushcommon/metacache --output=$(PWD)/internal/flushcommon/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=PackWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_pack_writer.go --with-expecter --structname=MockPackWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/importv2/task_l0_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *L0ImportSuite) TestL0Import() {
task.(*syncmgr.SyncTask).WithAllocator(alloc)

s.cm.(*mocks.ChunkManager).EXPECT().RootPath().Return("mock-rootpath")
s.cm.(*mocks.ChunkManager).EXPECT().MultiWrite(mock.Anything, mock.Anything).Return(nil)
s.cm.(*mocks.ChunkManager).EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil)
task.(*syncmgr.SyncTask).WithChunkManager(s.cm)

err := task.Run(context.Background())
Expand Down
14 changes: 2 additions & 12 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,6 @@ func NewSyncTask(ctx context.Context,
}, metacache.NewBM25StatsFactory)
}

var serializer syncmgr.Serializer
var err error
serializer, err = syncmgr.NewStorageSerializer(
allocator,
metaCache,
nil,
)
if err != nil {
return nil, err
}

segmentLevel := datapb.SegmentLevel_L1
if insertData == nil && deleteData != nil {
segmentLevel = datapb.SegmentLevel_L0
Expand All @@ -100,7 +89,8 @@ func NewSyncTask(ctx context.Context,
syncPack.WithBM25Stats(bm25Stats)
}

return serializer.EncodeBuffer(ctx, syncPack)
task := syncmgr.NewSyncTask().WithAllocator(allocator).WithMetaCache(metaCache).WithSyncPack(syncPack)
return task, nil
}

func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/flushcommon/syncmgr/meta_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error

insertFieldBinlogs := lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
statsFieldBinlogs := lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
if len(pack.deltaBinlog.Binlogs) > 0 {
if pack.deltaBinlog != nil && len(pack.deltaBinlog.Binlogs) > 0 {
deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog)
}

Expand Down Expand Up @@ -102,16 +102,16 @@ func (b *brokerMetaWriter) UpdateSync(ctx context.Context, pack *SyncTask) error
CheckPoints: checkPoints,

StartPositions: startPos,
Flushed: pack.isFlush,
Dropped: pack.isDrop,
Flushed: pack.pack.isFlush,
Dropped: pack.pack.isDrop,
Channel: pack.channelName,
SegLevel: pack.level,
}
err := retry.Handle(ctx, func() (bool, error) {
err := b.broker.SaveBinlogPaths(ctx, req)
// Segment not found during stale segment flush. Segment might get compacted already.
// Stop retry and still proceed to the end, ignoring this error.
if !pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
if !pack.pack.isFlush && errors.Is(err, merr.ErrSegmentNotFound) {
log.Warn("stale segment not found, could be compacted",
zap.Int64("segmentID", pack.segmentID))
log.Warn("failed to SaveBinlogPaths",
Expand Down
6 changes: 2 additions & 4 deletions internal/flushcommon/syncmgr/meta_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func (s *MetaWriterSuite) TestNormalSave() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
task := NewSyncTask()
task.WithMetaCache(s.metacache)
task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack))
err := s.writer.UpdateSync(ctx, task)
s.NoError(err)
}
Expand All @@ -62,8 +61,7 @@ func (s *MetaWriterSuite) TestReturnError() {
metacache.UpdateNumOfRows(1000)(seg)
s.metacache.EXPECT().GetSegmentByID(mock.Anything).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
task := NewSyncTask()
task.WithMetaCache(s.metacache)
task := NewSyncTask().WithMetaCache(s.metacache).WithSyncPack(new(SyncPack))
err := s.writer.UpdateSync(ctx, task)
s.Error(err)
}
Expand Down
130 changes: 130 additions & 0 deletions internal/flushcommon/syncmgr/mock_pack_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 0 additions & 95 deletions internal/flushcommon/syncmgr/mock_serializer.go

This file was deleted.

Loading

0 comments on commit 56659ba

Please sign in to comment.