Skip to content

Commit

Permalink
fix: Update segment compactTo when compactTo segment is compacted (#2…
Browse files Browse the repository at this point in the history
…8755)

Related to #28736 #28748
See also #27675
Previous PR: #28646

This PR fixes `SegmentNotFound` issue when compaction happens multiple
times and the buffer of first generation segment is sync due to stale
policy

Now the `CompactSegments` API of metacache shall update the compactTo
field of segmentInfo if the compactTo segment is also compacted to keep
the bloodline clean

Also, add the `CompactedSegment` SyncPolicy to sync the compacted
segment asap to keep metacache clean

Now the `SyncPolicy` is an interface instead of a function type so that
when it selects some segments to sync, we colud log the reason and
target segment

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 27, 2023
1 parent b1e0a27 commit eaabe02
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 32 deletions.
21 changes: 12 additions & 9 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type MetaCache interface {
Expand Down Expand Up @@ -126,18 +127,20 @@ func (c *metaCacheImpl) CompactSegments(newSegmentID, partitionID int64, numOfRo
bfs: bfs,
}
}
log.Info("add compactTo segment info metacache", zap.Int64("segmentID", compactTo))
}

for _, segID := range oldSegmentIDs {
if segmentInfo, ok := c.segmentInfos[segID]; ok {
updated := segmentInfo.Clone()
oldSet := typeutil.NewSet(oldSegmentIDs...)
for _, segment := range c.segmentInfos {
if oldSet.Contain(segment.segmentID) ||
oldSet.Contain(segment.compactTo) {
updated := segment.Clone()
updated.compactTo = compactTo
c.segmentInfos[segID] = updated
} else {
log.Warn("some dropped segment not exist in meta cache",
zap.String("channel", c.vChannelName),
zap.Int64("collectionID", c.collectionID),
zap.Int64("segmentID", segID))
c.segmentInfos[segment.segmentID] = updated
log.Info("update segment compactTo",
zap.Int64("segmentID", segment.segmentID),
zap.Int64("originalCompactTo", segment.compactTo),
zap.Int64("compactTo", compactTo))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/writebuffer/bf_write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *BFWriteBufferSuite) TestAutoSync() {
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
Expand Down Expand Up @@ -248,7 +248,7 @@ func (s *BFWriteBufferSuite) TestAutoSyncWithStorageV2() {
s.Run("normal_auto_sync", func() {
wb, err := NewBFWriteBuffer(s.channelName, s.metacache, s.storageV2Cache, s.syncMgr, &writeBufferOption{
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetFlushingSegmentsPolicy(s.metacache),
},
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/writebuffer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ type writeBufferOption struct {
metaWriter syncmgr.MetaWriter
}

func defaultWBOption() *writeBufferOption {
func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
return &writeBufferOption{
// TODO use l0 delta as default after implementation.
deletePolicy: paramtable.Get().DataNodeCfg.DeltaPolicy.GetValue(),
syncPolicies: []SyncPolicy{
SyncFullBuffer,
GetFullBufferPolicy(),
GetSyncStaleBufferPolicy(paramtable.Get().DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)),
GetCompactedSegmentsPolicy(metacache),
GetFlushingSegmentsPolicy(metacache),
},
}
}
Expand Down
55 changes: 44 additions & 11 deletions internal/datanode/writebuffer/sync_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,67 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type SyncPolicy func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
type SyncPolicy interface {
SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64
Reason() string
}

type SelectSegmentFunc func(buffer []*segmentBuffer, ts typeutil.Timestamp) []int64

type SelectSegmentFnPolicy struct {
fn SelectSegmentFunc
reason string
}

func (f SelectSegmentFnPolicy) SelectSegments(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return f.fn(buffers, ts)
}

func (f SelectSegmentFnPolicy) Reason() string { return f.reason }

func wrapSelectSegmentFuncPolicy(fn SelectSegmentFunc, reason string) SelectSegmentFnPolicy {
return SelectSegmentFnPolicy{
fn: fn,
reason: reason,
}
}

func GetFullBufferPolicy() SyncPolicy {
return wrapSelectSegmentFuncPolicy(
func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
}, "buffer full")
}

func SyncFullBuffer(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
return buf.segmentID, buf.IsFull()
})
func GetCompactedSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, _ typeutil.Timestamp) []int64 {
segmentIDs := lo.Map(buffers, func(buffer *segmentBuffer, _ int) int64 { return buffer.segmentID })
return meta.GetSegmentIDsBy(metacache.WithSegmentIDs(segmentIDs...), metacache.WithCompacted())
}, "segment compacted")
}

func GetSyncStaleBufferPolicy(staleDuration time.Duration) SyncPolicy {
return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
current := tsoutil.PhysicalTime(ts)
return lo.FilterMap(buffers, func(buf *segmentBuffer, _ int) (int64, bool) {
minTs := buf.MinTimestamp()
start := tsoutil.PhysicalTime(minTs)

return buf.segmentID, current.Sub(start) > staleDuration
})
}
}, "buffer stale")
}

func GetFlushingSegmentsPolicy(meta metacache.MetaCache) SyncPolicy {
return func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(_ []*segmentBuffer, _ typeutil.Timestamp) []int64 {
return meta.GetSegmentIDsBy(metacache.WithSegmentState(commonpb.SegmentState_Flushing))
}
}, "segment flushing")
}

func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) SyncPolicy {
return func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
flushTs := flushTimestamp.Load()
if flushTs != nonFlushTS && ts >= flushTs {
// flush segment start pos < flushTs && checkpoint > flushTs
Expand All @@ -61,5 +94,5 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
return ids
}
return nil
}
}, "flush ts")
}
21 changes: 16 additions & 5 deletions internal/datanode/writebuffer/sync_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ func (s *SyncPolicySuite) TestSyncFullBuffer() {
buffer, err := newSegmentBuffer(100, s.collSchema)
s.Require().NoError(err)

ids := SyncFullBuffer([]*segmentBuffer{buffer}, 0)
policy := GetFullBufferPolicy()
ids := policy.SelectSegments([]*segmentBuffer{buffer}, 0)
s.Equal(0, len(ids), "empty buffer shall not be synced")

buffer.insertBuffer.size = buffer.insertBuffer.sizeLimit + 1

ids = SyncFullBuffer([]*segmentBuffer{buffer}, 0)
ids = policy.SelectSegments([]*segmentBuffer{buffer}, 0)
s.ElementsMatch([]int64{100}, ids)
}

Expand All @@ -54,14 +55,14 @@ func (s *SyncPolicySuite) TestSyncStalePolicy() {
buffer, err := newSegmentBuffer(100, s.collSchema)
s.Require().NoError(err)

ids := policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
ids := policy.SelectSegments([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.Equal(0, len(ids), "empty buffer shall not be synced")

buffer.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(-time.Minute*2), 0),
}

ids = policy([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
ids = policy.SelectSegments([]*segmentBuffer{buffer}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch([]int64{100}, ids)
}

Expand All @@ -71,7 +72,17 @@ func (s *SyncPolicySuite) TestFlushingSegmentsPolicy() {
ids := []int64{1, 2, 3}
metacache.EXPECT().GetSegmentIDsBy(mock.Anything).Return(ids)

result := policy([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0))
result := policy.SelectSegments([]*segmentBuffer{}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch(ids, result)
}

func (s *SyncPolicySuite) TestCompactedSegmentsPolicy() {
metacache := metacache.NewMockMetaCache(s.T())
policy := GetCompactedSegmentsPolicy(metacache)
ids := []int64{1, 2}
metacache.EXPECT().GetSegmentIDsBy(mock.Anything, mock.Anything).Return(ids)

result := policy.SelectSegments([]*segmentBuffer{{segmentID: 1}, {segmentID: 2}}, tsoutil.ComposeTSByTime(time.Now(), 0))
s.ElementsMatch(ids, result)
}

Expand Down
9 changes: 6 additions & 3 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type WriteBuffer interface {
}

func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, syncMgr syncmgr.SyncManager, opts ...WriteBufferOption) (WriteBuffer, error) {
option := defaultWBOption()
option.syncPolicies = append(option.syncPolicies, GetFlushingSegmentsPolicy(metacache))
option := defaultWBOption(metacache)
for _, opt := range opts {
opt(option)
}
Expand Down Expand Up @@ -212,7 +211,11 @@ func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range wb.syncPolicies {
segments.Insert(policy(buffers, ts)...)
result := policy.SelectSegments(buffers, ts)
if len(result) > 0 {
log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason()))
segments.Insert(result...)
}
}

return segments.Collect()
Expand Down

0 comments on commit eaabe02

Please sign in to comment.