Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] try to fix scheduler problem #90

Merged
merged 20 commits into from
Jun 15, 2022
18 changes: 13 additions & 5 deletions cdc/cdc/model/owner.go
Original file line number Diff line number Diff line change
@@ -165,6 +165,8 @@ type KeySpanOperation struct {
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Status uint64 `json:"status,omitempty"`

RelatedKeySpans []KeySpanLocation `json:"related_key_spans"`
}

// KeySpanProcessed returns whether the keyspan has been processed by processor
@@ -217,7 +219,6 @@ type KeySpanReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Start []byte
End []byte
// MarkKeySpanID KeySpanID `json:"mark-keyspan-id"`
}

// Clone clones a KeySpanReplicaInfo
@@ -270,7 +271,7 @@ func (ts *TaskStatus) RemoveKeySpan(id KeySpanID, boundaryTs Ts, isMoveKeySpan b
}

// AddKeySpan add the keyspan in KeySpanInfos and add a add kyespan operation.
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts) {
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts, relatedKeySpans []KeySpanLocation) {
if ts.KeySpans == nil {
ts.KeySpans = make(map[KeySpanID]*KeySpanReplicaInfo)
}
@@ -284,9 +285,10 @@ func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boun
ts.Operation = make(map[KeySpanID]*KeySpanOperation)
}
ts.Operation[id] = &KeySpanOperation{
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
RelatedKeySpans: relatedKeySpans,
}
}

@@ -448,3 +450,9 @@ type ProcInfoSnap struct {
CaptureID string `json:"capture-id"`
KeySpans map[KeySpanID]*KeySpanReplicaInfo `json:"-"`
}

// KeySpanLocation records which capture a keyspan is in
type KeySpanLocation struct {
CaptureID string `json:"capture_id"`
KeySpanID KeySpanID `json:"keyspan_id"`
}
23 changes: 4 additions & 19 deletions cdc/cdc/model/owner_test.go
Original file line number Diff line number Diff line change
@@ -48,21 +48,6 @@ func TestAdminJobType(t *testing.T) {
}
}

func TestDDLStateString(t *testing.T) {
t.Parallel()

names := map[ChangeFeedDDLState]string{
ChangeFeedSyncDML: "SyncDML",
ChangeFeedWaitToExecDDL: "WaitToExecDDL",
ChangeFeedExecDDL: "ExecDDL",
ChangeFeedDDLExecuteFailed: "DDLExecuteFailed",
ChangeFeedDDLState(100): "Unknown",
}
for state, name := range names {
require.Equal(t, name, state.String())
}
}

func TestTaskPositionMarshal(t *testing.T) {
t.Parallel()

@@ -265,11 +250,11 @@ func TestAddKeySpan(t *testing.T) {
},
}
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts, nil)
require.Equal(t, expected, status)

// add existing keyspan does nothing
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1, nil)
require.Equal(t, expected, status)
}

@@ -279,8 +264,8 @@ func TestTaskStatusApplyState(t *testing.T) {
ts1 := uint64(420875042036766723)
ts2 := uint64(420876783269969921)
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1, nil)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2, nil)
require.True(t, status.SomeOperationsUnapplied())
require.Equal(t, ts1, status.AppliedTs())

64 changes: 53 additions & 11 deletions cdc/cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@ type schedulerJob struct {
// if the operation is an add operation, boundaryTs is start ts
BoundaryTs uint64
TargetCapture model.CaptureID

RelatedKeySpans []model.KeySpanLocation
}

type moveKeySpanJob struct {
@@ -55,7 +57,7 @@ type moveKeySpanJob struct {

type oldScheduler struct {
state *orchestrator.ChangefeedReactorState
currentKeySpansID []model.KeySpanID
currentKeySpanIDs []model.KeySpanID
currentKeySpans map[model.KeySpanID]regionspan.Span
captures map[model.CaptureID]*model.CaptureInfo

@@ -80,20 +82,21 @@ func newSchedulerV1(f updateCurrentKeySpansFunc) scheduler {
func (s *oldScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
// currentKeySpans []model.KeySpanID,
captures map[model.CaptureID]*model.CaptureInfo,
) (shouldUpdateState bool, err error) {

s.state = state
s.captures = captures

s.currentKeySpansID, s.currentKeySpans, err = s.updateCurrentKeySpans(ctx)
currentKeySpanIDs, currentKeySpans, err := s.updateCurrentKeySpans(ctx)
if err != nil {
return false, errors.Trace(err)
}

newKeySpans, needRemoveKeySpans := s.diffCurrentKeySpans(currentKeySpans)
s.currentKeySpanIDs, s.currentKeySpans = currentKeySpanIDs, currentKeySpans

s.cleanUpFinishedOperations()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans(newKeySpans, needRemoveKeySpans)
if err != nil {
return false, errors.Trace(err)
}
@@ -116,6 +119,27 @@ func (s *oldScheduler) Tick(
return shouldUpdateState, nil
}

func (s *oldScheduler) diffCurrentKeySpans(currentKeySpans map[model.KeySpanID]regionspan.Span) (map[model.KeySpanID]struct{}, []model.KeySpanID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test for this method.

oldKeySpans := s.currentKeySpans

newKeySpans := map[model.KeySpanID]struct{}{}
needRemoveKeySpans := []model.KeySpanID{}

for keyspanID := range oldKeySpans {
if _, ok := currentKeySpans[keyspanID]; !ok {
needRemoveKeySpans = append(needRemoveKeySpans, keyspanID)
}
}

for keyspanID := range currentKeySpans {
if _, ok := oldKeySpans[keyspanID]; !ok {
newKeySpans[keyspanID] = struct{}{}
}
}

return newKeySpans, needRemoveKeySpans
}

func (s *oldScheduler) MoveKeySpan(keyspanID model.KeySpanID, target model.CaptureID) {
s.moveKeySpanJobQueue = append(s.moveKeySpanJobQueue, &moveKeySpanJob{
keyspanID: keyspanID,
@@ -221,7 +245,9 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
}
}

count := 0
getMinWorkloadCapture := func() model.CaptureID {
count++
minCapture := ""
minWorkLoad := uint64(math.MaxUint64)
for captureID, workload := range workloads {
@@ -249,26 +275,41 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {

// syncKeySpansWithCurrentKeySpans iterates all current keyspans to check whether it should be listened or not.
// this function will return schedulerJob to make sure all keyspans will be listened.
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans() ([]*schedulerJob, error) {
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans(newKeySpans map[model.KeySpanID]struct{}, needRemoveKeySpans []model.KeySpanID) ([]*schedulerJob, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add unit test for this method.

var pendingJob []*schedulerJob
allKeySpanListeningNow, err := s.keyspan2CaptureIndex()
if err != nil {
return nil, errors.Trace(err)
}
relatedKeySpans := make([]model.KeySpanLocation, 0, len(needRemoveKeySpans))
for _, keyspanID := range needRemoveKeySpans {
if captureID, ok := allKeySpanListeningNow[keyspanID]; ok {
location := model.KeySpanLocation{
CaptureID: captureID,
KeySpanID: keyspanID,
}
relatedKeySpans = append(relatedKeySpans, location)
}
}

globalCheckpointTs := s.state.Status.CheckpointTs
for _, keyspanID := range s.currentKeySpansID {
for _, keyspanID := range s.currentKeySpanIDs {
if _, exist := allKeySpanListeningNow[keyspanID]; exist {
delete(allKeySpanListeningNow, keyspanID)
continue
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, &schedulerJob{
job := &schedulerJob{
Tp: schedulerJobTypeAddKeySpan,
KeySpanID: keyspanID,
Start: s.currentKeySpans[keyspanID].Start,
End: s.currentKeySpans[keyspanID].End,
BoundaryTs: globalCheckpointTs,
})
}
if _, ok := newKeySpans[keyspanID]; ok {
job.RelatedKeySpans = relatedKeySpans
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, job)
}
// The remaining keyspans are the keyspans which should be not listened
keyspansThatShouldNotBeListened := allKeySpanListeningNow
@@ -303,7 +344,7 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) {
StartTs: job.BoundaryTs,
Start: job.Start,
End: job.End,
}, job.BoundaryTs)
}, job.BoundaryTs, job.RelatedKeySpans)
case schedulerJobTypeRemoveKeySpan:
failpoint.Inject("OwnerRemoveKeySpanError", func() {
// just skip removing this keyspan
@@ -517,6 +558,7 @@ func (w *schedulerV1CompatWrapper) calculateWatermarks(
if resolvedTs == model.Ts(math.MaxUint64) {
return schedulerv2.CheckpointCannotProceed, 0
}

checkpointTs := resolvedTs
for _, position := range state.TaskPositions {
if checkpointTs > position.CheckPointTs {
Loading