diff --git a/client/internal/vanus/eventlog/name_service.go b/client/internal/vanus/eventlog/name_service.go index 42534a8df..5291c79ca 100644 --- a/client/internal/vanus/eventlog/name_service.go +++ b/client/internal/vanus/eventlog/name_service.go @@ -128,7 +128,7 @@ func toSegment(segment *metapb.Segment) *record.Segment { StartOffset: segment.GetStartOffsetInLog(), EndOffset: segment.GetEndOffsetInLog(), FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs), - LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs), + LastEventBornAt: time.UnixMilli(segment.LastEventBornAtByUnixMs), Writable: segment.State == "working", // TODO: writable Blocks: blocks, LeaderBlockID: segment.GetLeaderBlockId(), diff --git a/internal/controller/eventbus/eventlog/eventlog.go b/internal/controller/eventbus/eventlog/eventlog.go index e2147088d..dac39cfdc 100644 --- a/internal/controller/eventbus/eventlog/eventlog.go +++ b/internal/controller/eventbus/eventlog/eventlog.go @@ -603,27 +603,17 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) { executionID := uuid.NewString() mgr.eventlogMap.Range(func(key, value interface{}) bool { elog, _ := value.(*eventlog) - head := elog.head() - checkCtx := context.Background() - for head != nil { + for head, next := elog.headAndNext(); head != nil; head, next = elog.headAndNext() { switch { - case head.LastEventBornTime.Second() == 0: - // TODO(wenfeng.wang) fix if set - head.LastEventBornTime = time.Now().Add(mgr.segmentExpiredTime) - elog.lock() - if err := elog.updateSegment(checkCtx, head); err != nil { - log.Warning(ctx, "update segment's metadata failed", map[string]interface{}{ - log.KeyError: err, - "segment": head.String(), - "eventlog": elog.md.ID.String(), - }) - head.LastEventBornTime = time.Time{} - } - elog.unlock() + case !head.isFull() || next == nil: return true - case !head.isFull(): - return true - case time.Since(head.LastEventBornTime.Add(mgr.segmentExpiredTime)) > 0: + case next.StartOffsetInLog == 0: + // StartOffsetInLog must be set when mark previous segment full. + panic("next segment has not StartOffsetInLog") // unreachable + case head.LastEventBornTime.IsZero(): + // LastEventBornTime must be set when mark the segment full. + panic("full segment has not LastEventBornTime") // unreachable + case time.Since(head.LastEventBornTime) > mgr.segmentExpiredTime: err := elog.deleteHead(ctx) if err != nil { log.Warning(ctx, "delete segment error", map[string]interface{}{ @@ -652,7 +642,6 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) { default: return true } - head = elog.head() } return true }) @@ -1079,6 +1068,24 @@ func (el *eventlog) head() *Segment { return ptr.Value.(*Segment) } +// headAndNext returns copies of head and next segment in the eventlog. +func (el *eventlog) headAndNext() (*Segment, *Segment) { + el.mutex.RLock() + defer el.mutex.RUnlock() + + switch el.size() { + case 0: + return nil, nil + case 1: + head := *el.segmentList.Front().Value.(*Segment) + return &head, nil + default: + ptr := el.segmentList.Front() + head, next := *ptr.Value.(*Segment), *ptr.Next().Value.(*Segment) + return &head, &next + } +} + func (el *eventlog) tail() *Segment { if el.size() == 0 { return nil @@ -1179,18 +1186,22 @@ func (el *eventlog) listOfPrevious(seg *Segment) []*Segment { //nolint:unused // func (el *eventlog) deleteHead(ctx context.Context) error { el.mutex.Lock() defer el.mutex.Unlock() + if el.segmentList.Len() == 0 { return nil } + headV := el.segmentList.Front() nextV := headV.Next() head, _ := headV.Value.(*Segment) + segments := make([]vanus.ID, 0, len(el.segments)-1) for _, v := range el.segments { if v.Uint64() != head.ID.Uint64() { segments = append(segments, v) } } + if err := el.kvClient.Delete(ctx, metadata.GetEventlogSegmentsMetadataKey(el.md.ID, head.ID)); err != nil { log.Warning(ctx, "delete segment failed when delete head", map[string]interface{}{ log.KeyError: err, @@ -1213,11 +1224,14 @@ func (el *eventlog) deleteHead(ctx context.Context) error { return err } } + if el.writePtr == head { el.writePtr = nil } + _ = el.segmentList.RemoveFront() el.segments = segments + return nil } diff --git a/internal/controller/eventbus/eventlog/eventlog_test.go b/internal/controller/eventbus/eventlog/eventlog_test.go index 85ac2afc4..7f308eade 100644 --- a/internal/controller/eventbus/eventlog/eventlog_test.go +++ b/internal/controller/eventbus/eventlog/eventlog_test.go @@ -15,9 +15,8 @@ package eventlog import ( - stdCtx "context" + "context" stdJson "encoding/json" - "math" "path/filepath" "testing" "time" @@ -115,7 +114,7 @@ func TestEventlogManager_RunWithoutTask(t *testing.T) { } kvCli.EXPECT().List(gomock.Any(), gomock.Any()).Times(4).DoAndReturn(func( - ctx stdCtx.Context, path string, + ctx context.Context, path string, ) ([]kv.Pair, error) { if path == metadata.EventlogKeyPrefixInKVStore { return elPairs, nil @@ -139,7 +138,7 @@ func TestEventlogManager_RunWithoutTask(t *testing.T) { _data3, _ := stdJson.Marshal(segment3) kvCli.EXPECT().Get(gomock.Any(), gomock.Any()).Times(3).DoAndReturn(func( - ctx stdCtx.Context, path string, + ctx context.Context, path string, ) ([]byte, error) { if path == filepath.Join(metadata.SegmentKeyPrefixInKVStore, segment1.ID.String()) { return _data1, nil @@ -153,7 +152,7 @@ func TestEventlogManager_RunWithoutTask(t *testing.T) { return nil, nil }) vanus.InitFakeSnowflake() - err := utMgr.Run(stdCtx.Background(), kvCli, false) + err := utMgr.Run(context.Background(), kvCli, false) So(err, ShouldBeNil) So(util.MapLen(&utMgr.eventlogMap), ShouldEqual, 3) v, exist := utMgr.eventlogMap.Load(el1.ID.Key()) @@ -182,7 +181,7 @@ func TestEventlogManager_ScaleSegmentTask(t *testing.T) { kvCli := kv.NewMockClient(ctrl) utMgr.kvClient = kvCli - ctx := stdCtx.Background() + ctx := context.Background() kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) alloc := block.NewMockAllocator(ctrl) utMgr.allocator = alloc @@ -192,7 +191,9 @@ func TestEventlogManager_ScaleSegmentTask(t *testing.T) { } vanus.InitFakeSnowflake() alloc.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil) - alloc.EXPECT().Pick(gomock.Any(), 3).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) { + alloc.EXPECT().Pick(gomock.Any(), 3).AnyTimes().DoAndReturn(func( + ctx context.Context, num int, + ) ([]*metadata.Block, error) { return []*metadata.Block{ { ID: vanus.NewTestID(), @@ -213,7 +214,7 @@ func TestEventlogManager_ScaleSegmentTask(t *testing.T) { }) alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func( - ctx stdCtx.Context, volumes []vanus.ID, + ctx context.Context, volumes []vanus.ID, ) ([]*metadata.Block, error) { return []*metadata.Block{ { @@ -319,7 +320,7 @@ func TestEventlogManager_CleanSegmentTask(t *testing.T) { utMgr.kvClient = kvCli vanus.InitFakeSnowflake() - ctx := stdCtx.Background() + ctx := context.Background() kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) alloc := block.NewMockAllocator(ctrl) utMgr.allocator = alloc @@ -328,7 +329,9 @@ func TestEventlogManager_CleanSegmentTask(t *testing.T) { Capacity: 64 * 1024 * 1024 * 1024, } alloc.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil) - alloc.EXPECT().Pick(gomock.Any(), 3).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) { + alloc.EXPECT().Pick(gomock.Any(), 3).AnyTimes().DoAndReturn(func( + ctx context.Context, num int, + ) ([]*metadata.Block, error) { return []*metadata.Block{ { ID: vanus.NewTestID(), @@ -348,7 +351,7 @@ func TestEventlogManager_CleanSegmentTask(t *testing.T) { }, nil }) alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func( - ctx stdCtx.Context, volumes []vanus.ID, + ctx context.Context, volumes []vanus.ID, ) ([]*metadata.Block, error) { return []*metadata.Block{ { @@ -436,7 +439,7 @@ func TestEventlogManager_CreateAndGetEventlog(t *testing.T) { utMgr.kvClient = kvCli vanus.InitFakeSnowflake() - ctx := stdCtx.Background() + ctx := context.Background() kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(14).Return(nil) alloc := block.NewMockAllocator(ctrl) utMgr.allocator = alloc @@ -444,7 +447,7 @@ func TestEventlogManager_CreateAndGetEventlog(t *testing.T) { ID: vanus.NewTestID(), Capacity: 64 * 1024 * 1024 * 1024, } - alloc.EXPECT().Pick(ctx, 3).Times(1).DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) { + alloc.EXPECT().Pick(ctx, 3).Times(1).DoAndReturn(func(ctx context.Context, num int) ([]*metadata.Block, error) { return []*metadata.Block{ { ID: vanus.NewTestID(), @@ -464,7 +467,7 @@ func TestEventlogManager_CreateAndGetEventlog(t *testing.T) { }, nil }) alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(func( - ctx stdCtx.Context, volumes []vanus.ID, + ctx context.Context, volumes []vanus.ID, ) ([]*metadata.Block, error) { return []*metadata.Block{ { @@ -548,7 +551,7 @@ func TestEventlogManager_DeleteEventlog(t *testing.T) { kvCli := kv.NewMockClient(ctrl) utMgr.kvClient = kvCli - ctx := stdCtx.Background() + ctx := context.Background() Convey("test deleting", func() { // the eventlog doesn't exist @@ -604,7 +607,7 @@ func TestEventlogManager_GetAppendableSegment(t *testing.T) { kvCli := kv.NewMockClient(ctrl) mgr.kvClient = kvCli - ctx := stdCtx.Background() + ctx := context.Background() kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(6).Return(nil) alloc := block.NewMockAllocator(ctrl) mgr.allocator = alloc @@ -612,7 +615,7 @@ func TestEventlogManager_GetAppendableSegment(t *testing.T) { ID: vanus.NewTestID(), Capacity: 64 * 1024 * 1024 * 1024, } - alloc.EXPECT().Pick(ctx, 3).Times(1).DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) { + alloc.EXPECT().Pick(ctx, 3).Times(1).DoAndReturn(func(ctx context.Context, num int) ([]*metadata.Block, error) { return []*metadata.Block{ { ID: vanus.NewTestID(), @@ -666,7 +669,7 @@ func TestEventlogManager_UpdateSegment(t *testing.T) { kvCli := kv.NewMockClient(ctrl) utMgr.kvClient = kvCli - ctx := stdCtx.Background() + ctx := context.Background() Convey("case: the eventlog doesn't exist", func() { utMgr.UpdateSegment(ctx, map[string][]Segment{ "dont_exist": { @@ -799,7 +802,7 @@ func TestEventlogManager_UpdateSegmentReplicas(t *testing.T) { kvCli := kv.NewMockClient(ctrl) utMgr.kvClient = kvCli - ctx := stdCtx.Background() + ctx := context.Background() el, err := newEventlog(ctx, &metadata.Eventlog{ ID: vanus.NewTestID(), EventbusID: vanus.NewTestID(), @@ -841,38 +844,50 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { Convey("test expired segment deleting", t, func() { ctrl := gomock.NewController(t) kvCli := kv.NewMockClient(ctrl) - ctx := stdCtx.Background() + + ctx := context.Background() + utMgr := &eventlogManager{ segmentReplicaNum: 3, checkSegmentExpiredInterval: 100 * time.Millisecond, segmentExpiredTime: time.Hour, } - el1, err1 := newEventlog(ctx, &metadata.Eventlog{ + el1, err := newEventlog(ctx, &metadata.Eventlog{ ID: vanus.NewTestID(), EventbusID: vanus.NewTestID(), }, kvCli, false) - el2, err2 := newEventlog(ctx, &metadata.Eventlog{ + So(err, ShouldBeNil) + utMgr.eventlogMap.Store(el1.md.ID.Key(), el1) + + el2, err := newEventlog(ctx, &metadata.Eventlog{ ID: vanus.NewTestID(), EventbusID: vanus.NewTestID(), }, kvCli, false) - el3, err3 := newEventlog(ctx, &metadata.Eventlog{ + So(err, ShouldBeNil) + utMgr.eventlogMap.Store(el2.md.ID.Key(), el2) + + el3, err := newEventlog(ctx, &metadata.Eventlog{ ID: vanus.NewTestID(), EventbusID: vanus.NewTestID(), }, kvCli, false) - So(err1, ShouldBeNil) - So(err2, ShouldBeNil) - So(err3, ShouldBeNil) - utMgr.eventlogMap.Store(el1.md.ID.Key(), el1) - utMgr.eventlogMap.Store(el2.md.ID.Key(), el2) + So(err, ShouldBeNil) utMgr.eventlogMap.Store(el3.md.ID.Key(), el3) + el4, err := newEventlog(ctx, &metadata.Eventlog{ + ID: vanus.NewTestID(), + EventbusID: vanus.NewTestID(), + }, kvCli, false) + So(err, ShouldBeNil) + utMgr.eventlogMap.Store(el4.md.ID.Key(), el4) + Convey("test clean expired segment", func() { kvCli.EXPECT().Delete(gomock.Any(), gomock.Any()).AnyTimes().Return(nil) kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) s11 := &Segment{ ID: vanus.NewTestID(), + State: StateWorking, FirstEventBornTime: time.Now().Add(-6 * time.Hour), LastEventBornTime: time.Now().Add(-3 * time.Hour), } @@ -881,49 +896,64 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { s21 := &Segment{ ID: vanus.NewTestID(), + State: StateFrozen, FirstEventBornTime: time.Now().Add(-6 * time.Hour), LastEventBornTime: time.Now().Add(-3 * time.Hour), + } + el2.segmentList.Set(s21.ID.Uint64(), s21) + el2.segments = []vanus.ID{s21.ID} + + s31 := &Segment{ + ID: vanus.NewTestID(), State: StateFrozen, + FirstEventBornTime: time.Now().Add(-6 * time.Hour), + LastEventBornTime: time.Now().Add(-3 * time.Hour), } - s22 := &Segment{ + s32 := &Segment{ ID: vanus.NewTestID(), + StartOffsetInLog: 1, + State: StateWorking, FirstEventBornTime: time.Now().Add(-3 * time.Hour), LastEventBornTime: time.Now().Add(-1 * time.Minute), } - el2.segmentList.Set(s21.ID.Uint64(), s21) - el2.segmentList.Set(s22.ID.Uint64(), s22) - el2.segments = []vanus.ID{s21.ID, s22.ID} + el3.segmentList.Set(s31.ID.Uint64(), s31) + el3.segmentList.Set(s32.ID.Uint64(), s32) + el3.segments = []vanus.ID{s31.ID, s32.ID} - s31 := &Segment{ + s41 := &Segment{ ID: vanus.NewTestID(), + State: StateFrozen, FirstEventBornTime: time.Now().Add(-6 * time.Hour), LastEventBornTime: time.Now().Add(-3 * time.Hour), - State: StateFrozen, } - s32 := &Segment{ + s42 := &Segment{ ID: vanus.NewTestID(), + StartOffsetInLog: 1, + State: StateFrozen, FirstEventBornTime: time.Now().Add(-3 * time.Hour), LastEventBornTime: time.Now().Add(-1*time.Hour - time.Minute), - State: StateFrozen, } - s33 := &Segment{ + s43 := &Segment{ ID: vanus.NewTestID(), + StartOffsetInLog: 2, + State: StateFrozen, FirstEventBornTime: time.Now().Add(-1 * time.Hour), LastEventBornTime: time.Now().Add(-1 * time.Minute), - State: StateFrozen, } - s34 := &Segment{ + s44 := &Segment{ ID: vanus.NewTestID(), + StartOffsetInLog: 3, + State: StateFrozen, FirstEventBornTime: time.Now().Add(-1 * time.Minute), LastEventBornTime: time.Now().Add(-1 * time.Millisecond), - State: StateFrozen, } - el3.segmentList.Set(s31.ID.Uint64(), s31) - el3.segmentList.Set(s32.ID.Uint64(), s32) - el3.segmentList.Set(s33.ID.Uint64(), s33) - el3.segmentList.Set(s34.ID.Uint64(), s34) - el3.segments = []vanus.ID{s31.ID, s32.ID, s33.ID, s34.ID} - cCtx, cancel := stdCtx.WithCancel(ctx) + el4.segmentList.Set(s41.ID.Uint64(), s41) + el4.segmentList.Set(s42.ID.Uint64(), s42) + el4.segmentList.Set(s43.ID.Uint64(), s43) + el4.segmentList.Set(s44.ID.Uint64(), s44) + el4.segments = []vanus.ID{s41.ID, s42.ID, s43.ID, s44.ID} + + cCtx, cancel := context.WithCancel(ctx) ch := make(chan struct{}) go func() { utMgr.checkSegmentExpired(cCtx) @@ -932,17 +962,21 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { time.Sleep(time.Second) cancel() <-ch + So(el1.segmentList.Len(), ShouldEqual, 1) So(el1.segments, ShouldHaveLength, 1) - So(el2.segmentList.Len(), ShouldEqual, 1) - So(el2.segments, ShouldHaveLength, 1) - So(el2.segments[0], ShouldEqual, s22.ID) + So(el3.segmentList.Len(), ShouldEqual, 1) + So(el3.segments, ShouldHaveLength, 1) + + So(el3.segmentList.Len(), ShouldEqual, 1) + So(el3.segments, ShouldHaveLength, 1) + So(el3.segments[0], ShouldEqual, s32.ID) - So(el3.segmentList.Len(), ShouldEqual, 2) - So(el3.segments, ShouldHaveLength, 2) - So(el3.segments[0], ShouldEqual, s33.ID) - So(el3.segments[1], ShouldEqual, s34.ID) + So(el4.segmentList.Len(), ShouldEqual, 2) + So(el4.segments, ShouldHaveLength, 2) + So(el4.segments[0], ShouldEqual, s43.ID) + So(el4.segments[1], ShouldEqual, s44.ID) So(util.MapLen(&utMgr.segmentNeedBeClean), ShouldEqual, 3) }) @@ -958,36 +992,40 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { el1.segmentList.Set(s11.ID.Uint64(), s11) el1.segments = []vanus.ID{s11.ID} - s31 := &Segment{ + s21 := &Segment{ ID: vanus.NewTestID(), + State: StateFrozen, FirstEventBornTime: time.Now().Add(-6 * time.Hour), LastEventBornTime: time.Now().Add(-3 * time.Hour), - State: StateFrozen, } - s32 := &Segment{ + s22 := &Segment{ ID: vanus.NewTestID(), - FirstEventBornTime: time.Now().Add(-3 * time.Hour), - LastEventBornTime: time.Now().Add(-1*time.Hour - time.Minute), + StartOffsetInLog: 1, State: StateFrozen, + FirstEventBornTime: time.Now().Add(-3 * time.Hour), + LastEventBornTime: time.Now().Add(-time.Hour - time.Minute), } - s33 := &Segment{ + s23 := &Segment{ ID: vanus.NewTestID(), - FirstEventBornTime: time.Now().Add(-1 * time.Hour), - LastEventBornTime: time.Now().Add(-1 * time.Minute), + StartOffsetInLog: 2, State: StateFrozen, + FirstEventBornTime: time.Now().Add(-time.Hour), + LastEventBornTime: time.Now().Add(-time.Minute), } - s34 := &Segment{ + s24 := &Segment{ ID: vanus.NewTestID(), - FirstEventBornTime: time.Now().Add(-1 * time.Minute), - LastEventBornTime: time.Now().Add(-1 * time.Millisecond), + StartOffsetInLog: 3, State: StateFrozen, + FirstEventBornTime: time.Now().Add(-time.Minute), + LastEventBornTime: time.Now().Add(-time.Millisecond), } - el3.segmentList.Set(s31.ID.Uint64(), s31) - el3.segmentList.Set(s32.ID.Uint64(), s32) - el3.segmentList.Set(s33.ID.Uint64(), s33) - el3.segmentList.Set(s34.ID.Uint64(), s34) - el3.segments = []vanus.ID{s31.ID, s32.ID, s33.ID, s34.ID} - cCtx, cancel := stdCtx.WithCancel(ctx) + el2.segmentList.Set(s21.ID.Uint64(), s21) + el2.segmentList.Set(s22.ID.Uint64(), s22) + el2.segmentList.Set(s23.ID.Uint64(), s23) + el2.segmentList.Set(s24.ID.Uint64(), s24) + el2.segments = []vanus.ID{s21.ID, s22.ID, s23.ID, s24.ID} + + cCtx, cancel := context.WithCancel(ctx) ch := make(chan struct{}) go func() { utMgr.checkSegmentExpired(cCtx) @@ -1001,21 +1039,60 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { So(el1.segments, ShouldHaveLength, 1) So(el1.head().LastEventBornTime, ShouldEqual, time.Time{}) - So(el3.segmentList.Len(), ShouldEqual, 4) - So(el3.segments, ShouldHaveLength, 4) + So(el2.segmentList.Len(), ShouldEqual, 4) + So(el2.segments, ShouldHaveLength, 4) }) - Convey("test update segment no last event time", func() { - kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil) + Convey("test segment has not start offset", func() { + s11 := &Segment{ + ID: vanus.NewTestID(), + State: StateWorking, + } + s12 := &Segment{ + ID: vanus.NewTestID(), + State: StateWorking, + } + el1.segmentList.Set(s11.ID.Uint64(), s11) + el1.segmentList.Set(s12.ID.Uint64(), s12) + el1.segments = []vanus.ID{s11.ID, s12.ID} + + cCtx, cancel := context.WithCancel(ctx) + ch := make(chan struct{}) + go func() { + utMgr.checkSegmentExpired(cCtx) + ch <- struct{}{} + }() + time.Sleep(time.Second) + cancel() + <-ch + + So(el1.segmentList.Len(), ShouldEqual, 2) + So(el1.segments, ShouldHaveLength, 2) + So(el1.head().LastEventBornTime, ShouldBeZeroValue) + // Mark segment is full. + s11.State = StateFrozen + s11.LastEventBornTime = time.Now() + + So(func() { + utMgr.checkSegmentExpired(context.Background()) + }, ShouldPanic) + }) + + Convey("test segment has not last event time", func() { s11 := &Segment{ ID: vanus.NewTestID(), - State: StateFrozen, + State: StateWorking, + } + s12 := &Segment{ + ID: vanus.NewTestID(), + State: StateWorking, } el1.segmentList.Set(s11.ID.Uint64(), s11) - el1.segments = []vanus.ID{s11.ID} + el1.segmentList.Set(s12.ID.Uint64(), s12) + el1.segments = []vanus.ID{s11.ID, s12.ID} - cCtx, cancel := stdCtx.WithCancel(ctx) + cCtx, cancel := context.WithCancel(ctx) ch := make(chan struct{}) go func() { utMgr.checkSegmentExpired(cCtx) @@ -1025,10 +1102,17 @@ func Test_ExpiredSegmentDeleting(t *testing.T) { cancel() <-ch - So(el1.segmentList.Len(), ShouldEqual, 1) - So(el1.segments, ShouldHaveLength, 1) - minutes := math.Ceil(float64(time.Until(el1.head().LastEventBornTime)) / float64(time.Minute)) - So(minutes, ShouldEqual, 60) + So(el1.segmentList.Len(), ShouldEqual, 2) + So(el1.segments, ShouldHaveLength, 2) + So(el1.head().LastEventBornTime, ShouldBeZeroValue) + + // Mark segment is full. + s11.State = StateFrozen + s12.StartOffsetInLog = 1 + + So(func() { + utMgr.checkSegmentExpired(context.Background()) + }, ShouldPanic) }) }) } @@ -1037,7 +1121,7 @@ func TestEventlog_All(t *testing.T) { Convey("test eventlog operation", t, func() { ctrl := gomock.NewController(t) kvCli := kv.NewMockClient(ctrl) - ctx := stdCtx.Background() + ctx := context.Background() md := &metadata.Eventlog{ ID: vanus.NewTestID(), EventbusID: vanus.NewTestID(), @@ -1101,7 +1185,7 @@ func TestEventlog_All(t *testing.T) { func TestEventlog_MarkSegmentFull(t *testing.T) { Convey("test eventlog operation", t, func() { - ctx := stdCtx.Background() + ctx := context.Background() ctrl := gomock.NewController(t) kvCli := kv.NewMockClient(ctrl) md := &metadata.Eventlog{} diff --git a/internal/controller/eventbus/eventlog/segment.go b/internal/controller/eventbus/eventlog/segment.go index df2085592..893f826db 100644 --- a/internal/controller/eventbus/eventlog/segment.go +++ b/internal/controller/eventbus/eventlog/segment.go @@ -171,7 +171,7 @@ func Convert2ProtoSegment(ctx context.Context, ins ...Segment) []*metapb.Segment Replicas: blocks, State: string(seg.State), FirstEventBornAtByUnixMs: seg.FirstEventBornTime.UnixMilli(), - LastEvnetBornAtByUnixMs: seg.LastEventBornTime.UnixMilli(), + LastEventBornAtByUnixMs: seg.LastEventBornTime.UnixMilli(), } if seg.GetLeaderBlock() != nil { segs[idx].LeaderBlockId = seg.GetLeaderBlock().ID.Uint64() diff --git a/proto/pkg/meta/meta.pb.go b/proto/pkg/meta/meta.pb.go index 2958b9765..92748631b 100644 --- a/proto/pkg/meta/meta.pb.go +++ b/proto/pkg/meta/meta.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.29.0 -// protoc v3.21.12 +// protoc-gen-go v1.30.0 +// protoc (unknown) // source: meta.proto package meta @@ -668,7 +668,7 @@ type Segment struct { Replicas map[uint64]*Block `protobuf:"bytes,12,rep,name=replicas,proto3" json:"replicas,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` LeaderBlockId uint64 `protobuf:"varint,13,opt,name=leader_block_id,json=leaderBlockId,proto3" json:"leader_block_id,omitempty"` FirstEventBornAtByUnixMs int64 `protobuf:"varint,14,opt,name=first_event_born_at_by_unix_ms,json=firstEventBornAtByUnixMs,proto3" json:"first_event_born_at_by_unix_ms,omitempty"` - LastEvnetBornAtByUnixMs int64 `protobuf:"varint,15,opt,name=last_evnet_born_at_by_unix_ms,json=lastEvnetBornAtByUnixMs,proto3" json:"last_evnet_born_at_by_unix_ms,omitempty"` + LastEventBornAtByUnixMs int64 `protobuf:"varint,15,opt,name=last_event_born_at_by_unix_ms,json=lastEventBornAtByUnixMs,proto3" json:"last_event_born_at_by_unix_ms,omitempty"` } func (x *Segment) Reset() { @@ -801,9 +801,9 @@ func (x *Segment) GetFirstEventBornAtByUnixMs() int64 { return 0 } -func (x *Segment) GetLastEvnetBornAtByUnixMs() int64 { +func (x *Segment) GetLastEventBornAtByUnixMs() int64 { if x != nil { - return x.LastEvnetBornAtByUnixMs + return x.LastEventBornAtByUnixMs } return 0 } @@ -1127,6 +1127,7 @@ type SinkCredential struct { CredentialType SinkCredential_CredentialType `protobuf:"varint,1,opt,name=credential_type,json=credentialType,proto3,enum=vanus.core.meta.SinkCredential_CredentialType" json:"credential_type,omitempty"` // Types that are assignable to Credential: + // // *SinkCredential_Plain // *SinkCredential_Aws // *SinkCredential_Gcloud @@ -2006,7 +2007,8 @@ type UserRole struct { ResourceKind string `protobuf:"bytes,4,opt,name=resource_kind,json=resourceKind,proto3" json:"resource_kind,omitempty"` BuiltIn bool `protobuf:"varint,5,opt,name=built_in,json=builtIn,proto3" json:"built_in,omitempty"` // for custom define role - // string role_id = 6; + // + // string role_id = 6; CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` } @@ -2095,7 +2097,8 @@ type ResourceRole struct { UserIdentifier string `protobuf:"bytes,4,opt,name=user_identifier,json=userIdentifier,proto3" json:"user_identifier,omitempty"` BuiltIn bool `protobuf:"varint,5,opt,name=built_in,json=builtIn,proto3" json:"built_in,omitempty"` // for custom define role - // string role_id = 6; + // + // string role_id = 6; CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` } @@ -2262,9 +2265,9 @@ var file_meta_proto_rawDesc = []byte{ 0x6e, 0x69, 0x78, 0x5f, 0x6d, 0x73, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x03, 0x52, 0x18, 0x66, 0x69, 0x72, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x42, 0x6f, 0x72, 0x6e, 0x41, 0x74, 0x42, 0x79, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x73, 0x12, 0x3e, 0x0a, 0x1d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, - 0x76, 0x6e, 0x65, 0x74, 0x5f, 0x62, 0x6f, 0x72, 0x6e, 0x5f, 0x61, 0x74, 0x5f, 0x62, 0x79, 0x5f, + 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x62, 0x6f, 0x72, 0x6e, 0x5f, 0x61, 0x74, 0x5f, 0x62, 0x79, 0x5f, 0x75, 0x6e, 0x69, 0x78, 0x5f, 0x6d, 0x73, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x03, 0x52, 0x17, 0x6c, - 0x61, 0x73, 0x74, 0x45, 0x76, 0x6e, 0x65, 0x74, 0x42, 0x6f, 0x72, 0x6e, 0x41, 0x74, 0x42, 0x79, + 0x61, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x42, 0x6f, 0x72, 0x6e, 0x41, 0x74, 0x42, 0x79, 0x55, 0x6e, 0x69, 0x78, 0x4d, 0x73, 0x1a, 0x53, 0x0a, 0x0d, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2c, 0x0a, 0x05, 0x76, 0x61, 0x6c, diff --git a/proto/proto/meta.proto b/proto/proto/meta.proto index 4c0babbcd..43c6b7971 100644 --- a/proto/proto/meta.proto +++ b/proto/proto/meta.proto @@ -72,7 +72,7 @@ message Segment { map replicas = 12; uint64 leader_block_id = 13; int64 first_event_born_at_by_unix_ms = 14; - int64 last_evnet_born_at_by_unix_ms = 15; + int64 last_event_born_at_by_unix_ms = 15; } message SegmentHealthInfo {