Skip to content

Commit

Permalink
better kv map & fix some tests (flowbehappy#254)
Browse files Browse the repository at this point in the history
* use more safety less function in btree_map

* fix some test
  • Loading branch information
CharlesCheung96 committed Sep 4, 2024
1 parent db88d35 commit 28d80e9
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 226 deletions.
20 changes: 11 additions & 9 deletions heartbeatpb/table_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,25 @@ var DDLSpanSchemaID int64 = 0
// DDLSpan is the special span for Table Trigger Event Dispatcher
var DDLSpan = &TableSpan{TableID: 0, StartKey: nil, EndKey: nil}

func LessTableSpan(t1, t2 *TableSpan) bool {
return t1.Less(t2)
}

// Less compares two Spans, defines the order between spans.
func (s *TableSpan) Less(other any) bool {
tbl := other.(*TableSpan)
if s.TableID < tbl.TableID {
func (s *TableSpan) Less(other *TableSpan) bool {
if s.TableID < other.TableID {
return true
}
if bytes.Compare(s.StartKey, tbl.StartKey) < 0 {
if bytes.Compare(s.StartKey, other.StartKey) < 0 {
return true
}
return false
}

func (s *TableSpan) Equal(inferior any) bool {
tbl := inferior.(*TableSpan)
return s.TableID == tbl.TableID &&
bytes.Equal(s.StartKey, tbl.StartKey) &&
bytes.Equal(s.EndKey, tbl.EndKey)
func (s *TableSpan) Equal(other *TableSpan) bool {
return s.TableID == other.TableID &&
bytes.Equal(s.StartKey, other.StartKey) &&
bytes.Equal(s.EndKey, other.EndKey)
}

func (s *TableSpan) Copy() *TableSpan {
Expand Down
2 changes: 1 addition & 1 deletion maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (m *Maintainer) onBootstrapDone(cachedResp map[common.NodeID]*heartbeatpb.M
if stm.State == scheduler.SchedulerStatusWorking {
tableMap, ok := workingMap[span.TableID]
if !ok {
tableMap = utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine]()
tableMap = utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine](heartbeatpb.LessTableSpan)
workingMap[span.TableID] = tableMap
}
tableMap.ReplaceOrInsert(span, stm)
Expand Down
6 changes: 3 additions & 3 deletions maintainer/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestFinishBootstrap(t *testing.T) {
DDLStatus: nil,
},
}, NewReplicaSet(model.ChangeFeedID{}, dispatcherID2, 1, span, 1))
cached := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine]()
cached := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine](heartbeatpb.LessTableSpan)
cached.ReplaceOrInsert(span, stm2)
require.False(t, s.bootstrapped)
s.FinishBootstrap(map[uint64]utils.Map[*heartbeatpb.TableSpan, *scheduler.StateMachine]{
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestSplitTableWhenBootstrapFinished(t *testing.T) {
{TableID: 1, StartKey: appendNew(totalSpan.StartKey, 'a'), EndKey: appendNew(totalSpan.StartKey, 'b')}, // 1 region // 1 region
{TableID: 1, StartKey: appendNew(totalSpan.StartKey, 'b'), EndKey: appendNew(totalSpan.StartKey, 'c')},
}
cached := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine]()
cached := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine](heartbeatpb.LessTableSpan)
for _, span := range reportedSpans {
dispatcherID1 := common.NewDispatcherID()
stm1 := scheduler.NewStateMachine(dispatcherID1, map[model.CaptureID]scheduler.InferiorStatus{
Expand All @@ -355,7 +355,7 @@ func TestSplitTableWhenBootstrapFinished(t *testing.T) {
CheckpointTs: 10,
},
}, NewReplicaSet(model.ChangeFeedID{}, ddlDispatcherID, heartbeatpb.DDLSpanSchemaID, heartbeatpb.DDLSpan, 1))
ddlCache := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine]()
ddlCache := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine](heartbeatpb.LessTableSpan)
ddlCache.ReplaceOrInsert(heartbeatpb.DDLSpan, ddlStm)

require.False(t, s.bootstrapped)
Expand Down
2 changes: 1 addition & 1 deletion maintainer/split/splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestMapFindHole(t *testing.T) {
}

for i, cs := range cases {
m := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine]()
m := utils.NewBtreeMap[*heartbeatpb.TableSpan, *scheduler.StateMachine](heartbeatpb.LessTableSpan)
for _, span := range cs.spans {
m.ReplaceOrInsert(span, &scheduler.StateMachine{})
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestNewDispatcherStat(t *testing.T) {
require.Equal(t, startTs, stat.spanSubscription.watermark.Load())
require.Equal(t, 0, int(stat.spanSubscription.newEventCount.Load()))
require.NotEmpty(t, stat.workerIndex)
require.Nil(t, stat.filter)
}

func TestDispatcherStatUpdateWatermark(t *testing.T) {
Expand Down Expand Up @@ -91,6 +92,7 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) {

wg.Wait()
}

func TestScanTaskPool_PushTask(t *testing.T) {
pool := newScanTaskPool()
span := newTableSpan(1, "a", "b")
Expand Down Expand Up @@ -150,20 +152,19 @@ func TestScanTaskPool_PushTask(t *testing.T) {
receivedTask := <-pool.pendingTaskQueue[dispatcherStat.workerIndex]
require.Equal(t, expectedTask, receivedTask)

// Verify that the task is set to nil in the taskSet
// Verify that the task is removed from taskSet
task, ok = pool.taskSet[dispatcherInfo.GetID()]
require.True(t, ok)
require.False(t, ok)
require.Nil(t, task)

}

func newTableSpan(tableID uint64, start, end string) *heartbeatpb.TableSpan {
res := &heartbeatpb.TableSpan{
return &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte(start),
EndKey: []byte(end),
}
return res
}

func TestResolvedTsCache(t *testing.T) {
Expand Down
12 changes: 3 additions & 9 deletions pkg/eventservice/event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ const (
defaultScanWorkerCount = 8192
)

// EventService accepts the requests of pulling events.
// The EventService is a singleton in the system.
type EventService interface {
Name() string
Run(ctx context.Context) error
Close(context.Context) error
}

type DispatcherInfo interface {
// GetID returns the ID of the dispatcher.
GetID() common.DispatcherID
Expand All @@ -43,6 +35,8 @@ type DispatcherInfo interface {
GetFilterConfig() *config.FilterConfig
}

// EventService accepts the requests of pulling events.
// The EventService is a singleton in the system.
type eventService struct {
mc messaging.MessageCenter
eventStore eventstore.EventStore
Expand All @@ -54,7 +48,7 @@ type eventService struct {
tz *time.Location
}

func NewEventService() EventService {
func NewEventService() common.SubModule {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
eventStore := appcontext.GetService[eventstore.EventStore](appcontext.EventStore)
schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
Expand Down
14 changes: 1 addition & 13 deletions pkg/eventservice/event_service_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/flowbehappy/tigate/pkg/common"
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/pingcap/log"
"go.uber.org/zap"
Expand Down Expand Up @@ -48,18 +47,7 @@ func TestEventServiceOneMillionTable(t *testing.T) {
}
}()

appcontext.SetService(appcontext.MessageCenter, mc)
appcontext.SetService(appcontext.EventStore, mockStore)
es := NewEventService()
esImpl := es.(*eventService)
wg.Add(1)
go func() {
defer wg.Done()
err := es.Run(ctx)
if err != nil {
t.Errorf("EventService.Run() error = %v", err)
}
}()
esImpl := initEventService(ctx, t, mc, mockStore)

start := time.Now()
dispatchers := make([]DispatcherInfo, 0, tableNum)
Expand Down
Loading

0 comments on commit 28d80e9

Please sign in to comment.