Skip to content

Commit

Permalink
fix(ctrl): did not delete expired segments in time (vanus-labs#576)
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor authored Mar 26, 2023
1 parent 4ec5540 commit 9c695ff
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 115 deletions.
2 changes: 1 addition & 1 deletion client/internal/vanus/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func toSegment(segment *metapb.Segment) *record.Segment {
StartOffset: segment.GetStartOffsetInLog(),
EndOffset: segment.GetEndOffsetInLog(),
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
LastEventBornAt: time.UnixMilli(segment.LastEventBornAtByUnixMs),
Writable: segment.State == "working", // TODO: writable
Blocks: blocks,
LeaderBlockID: segment.GetLeaderBlockId(),
Expand Down
54 changes: 34 additions & 20 deletions internal/controller/eventbus/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,27 +603,17 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
executionID := uuid.NewString()
mgr.eventlogMap.Range(func(key, value interface{}) bool {
elog, _ := value.(*eventlog)
head := elog.head()
checkCtx := context.Background()
for head != nil {
for head, next := elog.headAndNext(); head != nil; head, next = elog.headAndNext() {
switch {
case head.LastEventBornTime.Second() == 0:
// TODO(wenfeng.wang) fix if set
head.LastEventBornTime = time.Now().Add(mgr.segmentExpiredTime)
elog.lock()
if err := elog.updateSegment(checkCtx, head); err != nil {
log.Warning(ctx, "update segment's metadata failed", map[string]interface{}{
log.KeyError: err,
"segment": head.String(),
"eventlog": elog.md.ID.String(),
})
head.LastEventBornTime = time.Time{}
}
elog.unlock()
case !head.isFull() || next == nil:
return true
case !head.isFull():
return true
case time.Since(head.LastEventBornTime.Add(mgr.segmentExpiredTime)) > 0:
case next.StartOffsetInLog == 0:
// StartOffsetInLog must be set when mark previous segment full.
panic("next segment has not StartOffsetInLog") // unreachable
case head.LastEventBornTime.IsZero():
// LastEventBornTime must be set when mark the segment full.
panic("full segment has not LastEventBornTime") // unreachable
case time.Since(head.LastEventBornTime) > mgr.segmentExpiredTime:
err := elog.deleteHead(ctx)
if err != nil {
log.Warning(ctx, "delete segment error", map[string]interface{}{
Expand Down Expand Up @@ -652,7 +642,6 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
default:
return true
}
head = elog.head()
}
return true
})
Expand Down Expand Up @@ -1079,6 +1068,24 @@ func (el *eventlog) head() *Segment {
return ptr.Value.(*Segment)
}

// headAndNext returns copies of head and next segment in the eventlog.
func (el *eventlog) headAndNext() (*Segment, *Segment) {
el.mutex.RLock()
defer el.mutex.RUnlock()

switch el.size() {
case 0:
return nil, nil
case 1:
head := *el.segmentList.Front().Value.(*Segment)
return &head, nil
default:
ptr := el.segmentList.Front()
head, next := *ptr.Value.(*Segment), *ptr.Next().Value.(*Segment)
return &head, &next
}
}

func (el *eventlog) tail() *Segment {
if el.size() == 0 {
return nil
Expand Down Expand Up @@ -1179,18 +1186,22 @@ func (el *eventlog) listOfPrevious(seg *Segment) []*Segment { //nolint:unused //
func (el *eventlog) deleteHead(ctx context.Context) error {
el.mutex.Lock()
defer el.mutex.Unlock()

if el.segmentList.Len() == 0 {
return nil
}

headV := el.segmentList.Front()
nextV := headV.Next()
head, _ := headV.Value.(*Segment)

segments := make([]vanus.ID, 0, len(el.segments)-1)
for _, v := range el.segments {
if v.Uint64() != head.ID.Uint64() {
segments = append(segments, v)
}
}

if err := el.kvClient.Delete(ctx, metadata.GetEventlogSegmentsMetadataKey(el.md.ID, head.ID)); err != nil {
log.Warning(ctx, "delete segment failed when delete head", map[string]interface{}{
log.KeyError: err,
Expand All @@ -1213,11 +1224,14 @@ func (el *eventlog) deleteHead(ctx context.Context) error {
return err
}
}

if el.writePtr == head {
el.writePtr = nil
}

_ = el.segmentList.RemoveFront()
el.segments = segments

return nil
}

Expand Down
Loading

0 comments on commit 9c695ff

Please sign in to comment.