diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index 4ebbe9e3e4672..15e7ff5346bef 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type MetaCache interface { @@ -126,18 +127,20 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo bfs: bfs, } } + log.Info("add compactTo segment info metacache", zap.Int64("segmentID", compactTo)) } - for _, segID := range oldSegmentIDs { - if segmentInfo, ok := c.segmentInfos[segID]; ok { - updated := segmentInfo.Clone() + oldSet := typeutil.NewSet(oldSegmentIDs...) + for _, segment := range c.segmentInfos { + if oldSet.Contain(segment.segmentID) || + oldSet.Contain(segment.compactTo) { + updated := segment.Clone() updated.compactTo = compactTo - c.segmentInfos[segID] = updated - } else { - log.Warn("some dropped segment not exist in meta cache", - zap.String("channel", c.vChannelName), - zap.Int64("collectionID", c.collectionID), - zap.Int64("segmentID", segID)) + c.segmentInfos[segment.segmentID] = updated + log.Info("update segment compactTo", + zap.Int64("segmentID", segment.segmentID), + zap.Int64("originalCompactTo", segment.compactTo), + zap.Int64("compactTo", compactTo)) } } } diff --git a/internal/datanode/writebuffer/bf_write_buffer_test.go b/internal/datanode/writebuffer/bf_write_buffer_test.go index 9437603aed3fe..48d69d7a72a8d 100644 --- a/internal/datanode/writebuffer/bf_write_buffer_test.go +++ b/internal/datanode/writebuffer/bf_write_buffer_test.go @@ -176,7 +176,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() { s.Run("normal_auto_sync", func() { wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{ syncPolicies: []SyncPolicy{ - SyncFullBuffer, + GetFullBufferPolicy(), GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), GetFlushingSegmentsPolicy(s.metacache), }, @@ -248,7 +248,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() { s.Run("normal_auto_sync", func() { wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{ syncPolicies: []SyncPolicy{ - SyncFullBuffer, + GetFullBufferPolicy(), GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), GetFlushingSegmentsPolicy(s.metacache), }, diff --git a/internal/datanode/writebuffer/options.go b/internal/datanode/writebuffer/options.go index d0849293e5db2..e603651c3cea1 100644 --- a/internal/datanode/writebuffer/options.go +++ b/internal/datanode/writebuffer/options.go @@ -28,13 +28,15 @@ type writeBufferOption struct { metaWriter syncmgr.MetaWriter } -func defaultWBOption() *writeBufferOption { +func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption { return &writeBufferOption{ // TODO use l0 delta as default after implementation. deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(), syncPolicies: []SyncPolicy{ - SyncFullBuffer, + GetFullBufferPolicy(), GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), + GetCompactedSegmentsPolicy(metacache), + GetFlushingSegmentsPolicy(metacache), }, } } diff --git a/internal/datanode/writebuffer/sync_policy.go b/internal/datanode/writebuffer/sync_policy.go index e243dc84cf5ca..06217004a75e2 100644 --- a/internal/datanode/writebuffer/sync_policy.go +++ b/internal/datanode/writebuffer/sync_policy.go @@ -13,16 +13,49 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type SyncPolicy func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 +type SyncPolicy interface { + SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 + Reason() string +} + +type SelectSegmentFunc func(buffer []*segmentBuffer, ts typeutil.Timestamp) []int64 + +type SelectSegmentFnPolicy struct { + fn SelectSegmentFunc + reason string +} + +func (f SelectSegmentFnPolicy) SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { + return f.fn(buffers, ts) +} + +func (f SelectSegmentFnPolicy) Reason() string { return f.reason } + +func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegmentFnPolicy { + return SelectSegmentFnPolicy{ + fn: fn, + reason: reason, + } +} + +func GetFullBufferPolicy() SyncPolicy { + return wrapSelectSegmentFuncPolicy( + func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { + return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { + return buf.segmentID, buf.IsFull() + }) + }, "buffer full") +} -func SyncFullBuffer(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { - return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { - return buf.segmentID, buf.IsFull() - }) +func GetCompactedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy { + return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 { + segmentIDs := lo.Map(buffers, func(buffer *segmentBuffer, _ int) int64 { return buffer.segmentID }) + return meta.GetSegmentIDsBy(metacache.WithSegmentIDs(segmentIDs...), metacache.WithCompacted()) + }, "segment compacted") } func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy { - return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { + return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { current := tsoutil.PhysicalTime(ts) return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) { minTs := buf.MinTimestamp() @@ -30,17 +63,17 @@ func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy { return buf.segmentID, current.Sub(start) > staleDuration }) - } + }, "buffer stale") } func GetFlushingSegmentsPolicy(meta metacache.MetaCache) SyncPolicy { - return func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 { + return wrapSelectSegmentFuncPolicy(func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 { return meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Flushing)) - } + }, "segment flushing") } func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy { - return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { + return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 { flushTs := flushTimestamp.Load() if flushTs != nonFlushTS && ts >= flushTs { // flush segment start pos < flushTs && checkpoint > flushTs @@ -61,5 +94,5 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S return ids } return nil - } + }, "flush ts") } diff --git a/internal/datanode/writebuffer/sync_policy_test.go b/internal/datanode/writebuffer/sync_policy_test.go index 1d35a7bbcb66e..f63cd227c35e8 100644 --- a/internal/datanode/writebuffer/sync_policy_test.go +++ b/internal/datanode/writebuffer/sync_policy_test.go @@ -39,12 +39,13 @@ func (s *SyncPolicySuite) TestSyncFullBuffer() { buffer, err := newSegmentBuffer(100, s.collSchema) s.Require().NoError(err) - ids := SyncFullBuffer([]*segmentBuffer{buffer}, 0) + policy := GetFullBufferPolicy() + ids := policy.SelectSegments([]*segmentBuffer{buffer}, 0) s.Equal(0, len(ids), "empty buffer shall not be synced") buffer.insertBuffer.size = buffer.insertBuffer.sizeLimit + 1 - ids = SyncFullBuffer([]*segmentBuffer{buffer}, 0) + ids = policy.SelectSegments([]*segmentBuffer{buffer}, 0) s.ElementsMatch([]int64{100}, ids) } @@ -54,14 +55,14 @@ func (s *SyncPolicySuite) TestSyncStalePolicy() { buffer, err := newSegmentBuffer(100, s.collSchema) s.Require().NoError(err) - ids := policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) + ids := policy.SelectSegments([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) s.Equal(0, len(ids), "empty buffer shall not be synced") buffer.insertBuffer.startPos = &msgpb.MsgPosition{ Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-time.Minute*2), 0), } - ids = policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) + ids = policy.SelectSegments([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0)) s.ElementsMatch([]int64{100}, ids) } @@ -71,7 +72,17 @@ func (s *SyncPolicySuite) TestFlushingSegmentsPolicy() { ids := []int64{1, 2, 3} metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids) - result := policy([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0)) + result := policy.SelectSegments([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0)) + s.ElementsMatch(ids, result) +} + +func (s *SyncPolicySuite) TestCompactedSegmentsPolicy() { + metacache := metacache.NewMockMetaCache(s.T()) + policy := GetCompactedSegmentsPolicy(metacache) + ids := []int64{1, 2} + metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return(ids) + + result := policy.SelectSegments([]*segmentBuffer{{segmentID: 1}, {segmentID: 2}}, tsoutil.ComposeTSByTime(time.Now(), 0)) s.ElementsMatch(ids, result) } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index e3d139a6865dc..8f32ae20be38c 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -56,8 +56,7 @@ type WriteBuffer interface { } func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) { - option := defaultWBOption() - option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache)) + option := defaultWBOption(metacache) for _, opt := range opts { opt(option) } @@ -212,7 +211,11 @@ func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 { buffers := lo.Values(wb.buffers) segments := typeutil.NewSet[int64]() for _, policy := range wb.syncPolicies { - segments.Insert(policy(buffers, ts)...) + result := policy.SelectSegments(buffers, ts) + if len(result) > 0 { + log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason())) + segments.Insert(result...) + } } return segments.Collect()