Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] try to fix scheduler problem #90

Merged
merged 20 commits into from
Jun 15, 2022
18 changes: 13 additions & 5 deletions cdc/cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type KeySpanOperation struct {
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Status uint64 `json:"status,omitempty"`

RelatedKeySpans []KeySpanLocation `json:"related_key_spans"`
}

// KeySpanProcessed returns whether the keyspan has been processed by processor
Expand Down Expand Up @@ -217,7 +219,6 @@ type KeySpanReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Start []byte
End []byte
// MarkKeySpanID KeySpanID `json:"mark-keyspan-id"`
}

// Clone clones a KeySpanReplicaInfo
Expand Down Expand Up @@ -270,7 +271,7 @@ func (ts *TaskStatus) RemoveKeySpan(id KeySpanID, boundaryTs Ts, isMoveKeySpan b
}

// AddKeySpan add the keyspan in KeySpanInfos and add a add kyespan operation.
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts) {
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts, relatedKeySpans []KeySpanLocation) {
if ts.KeySpans == nil {
ts.KeySpans = make(map[KeySpanID]*KeySpanReplicaInfo)
}
Expand All @@ -284,9 +285,10 @@ func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boun
ts.Operation = make(map[KeySpanID]*KeySpanOperation)
}
ts.Operation[id] = &KeySpanOperation{
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
RelatedKeySpans: relatedKeySpans,
}
}

Expand Down Expand Up @@ -448,3 +450,9 @@ type ProcInfoSnap struct {
CaptureID string `json:"capture-id"`
KeySpans map[KeySpanID]*KeySpanReplicaInfo `json:"-"`
}

// KeySpanLocation records which capture a keyspan is in
type KeySpanLocation struct {
CaptureID string `json:"capture_id"`
KeySpanID KeySpanID `json:"keyspan_id"`
}
23 changes: 4 additions & 19 deletions cdc/cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@ func TestAdminJobType(t *testing.T) {
}
}

func TestDDLStateString(t *testing.T) {
t.Parallel()

names := map[ChangeFeedDDLState]string{
ChangeFeedSyncDML: "SyncDML",
ChangeFeedWaitToExecDDL: "WaitToExecDDL",
ChangeFeedExecDDL: "ExecDDL",
ChangeFeedDDLExecuteFailed: "DDLExecuteFailed",
ChangeFeedDDLState(100): "Unknown",
}
for state, name := range names {
require.Equal(t, name, state.String())
}
}

func TestTaskPositionMarshal(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -265,11 +250,11 @@ func TestAddKeySpan(t *testing.T) {
},
}
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts, nil)
require.Equal(t, expected, status)

// add existing keyspan does nothing
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1, nil)
require.Equal(t, expected, status)
}

Expand All @@ -279,8 +264,8 @@ func TestTaskStatusApplyState(t *testing.T) {
ts1 := uint64(420875042036766723)
ts2 := uint64(420876783269969921)
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1, nil)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2, nil)
require.True(t, status.SomeOperationsUnapplied())
require.Equal(t, ts1, status.AppliedTs())

Expand Down
64 changes: 53 additions & 11 deletions cdc/cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type schedulerJob struct {
// if the operation is an add operation, boundaryTs is start ts
BoundaryTs uint64
TargetCapture model.CaptureID

RelatedKeySpans []model.KeySpanLocation
}

type moveKeySpanJob struct {
Expand All @@ -55,7 +57,7 @@ type moveKeySpanJob struct {

type oldScheduler struct {
state *orchestrator.ChangefeedReactorState
currentKeySpansID []model.KeySpanID
currentKeySpanIDs []model.KeySpanID
currentKeySpans map[model.KeySpanID]regionspan.Span
captures map[model.CaptureID]*model.CaptureInfo

Expand All @@ -80,20 +82,21 @@ func newSchedulerV1(f updateCurrentKeySpansFunc) scheduler {
func (s *oldScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
// currentKeySpans []model.KeySpanID,
captures map[model.CaptureID]*model.CaptureInfo,
) (shouldUpdateState bool, err error) {

s.state = state
s.captures = captures

s.currentKeySpansID, s.currentKeySpans, err = s.updateCurrentKeySpans(ctx)
currentKeySpanIDs, currentKeySpans, err := s.updateCurrentKeySpans(ctx)
if err != nil {
return false, errors.Trace(err)
}

newKeySpans, needRemoveKeySpans := s.diffCurrentKeySpans(currentKeySpans)
s.currentKeySpanIDs, s.currentKeySpans = currentKeySpanIDs, currentKeySpans

s.cleanUpFinishedOperations()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans(newKeySpans, needRemoveKeySpans)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -116,6 +119,27 @@ func (s *oldScheduler) Tick(
return shouldUpdateState, nil
}

func (s *oldScheduler) diffCurrentKeySpans(currentKeySpans map[model.KeySpanID]regionspan.Span) (map[model.KeySpanID]struct{}, []model.KeySpanID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test for this method.

oldKeySpans := s.currentKeySpans

newKeySpans := map[model.KeySpanID]struct{}{}
needRemoveKeySpans := []model.KeySpanID{}

for keyspanID := range oldKeySpans {
if _, ok := currentKeySpans[keyspanID]; !ok {
needRemoveKeySpans = append(needRemoveKeySpans, keyspanID)
}
}

for keyspanID := range currentKeySpans {
if _, ok := oldKeySpans[keyspanID]; !ok {
newKeySpans[keyspanID] = struct{}{}
}
}

return newKeySpans, needRemoveKeySpans
}

func (s *oldScheduler) MoveKeySpan(keyspanID model.KeySpanID, target model.CaptureID) {
s.moveKeySpanJobQueue = append(s.moveKeySpanJobQueue, &moveKeySpanJob{
keyspanID: keyspanID,
Expand Down Expand Up @@ -221,7 +245,9 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
}
}

count := 0
getMinWorkloadCapture := func() model.CaptureID {
count++
minCapture := ""
minWorkLoad := uint64(math.MaxUint64)
for captureID, workload := range workloads {
Expand Down Expand Up @@ -249,26 +275,41 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {

// syncKeySpansWithCurrentKeySpans iterates all current keyspans to check whether it should be listened or not.
// this function will return schedulerJob to make sure all keyspans will be listened.
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans() ([]*schedulerJob, error) {
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans(newKeySpans map[model.KeySpanID]struct{}, needRemoveKeySpans []model.KeySpanID) ([]*schedulerJob, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add unit test for this method.

var pendingJob []*schedulerJob
allKeySpanListeningNow, err := s.keyspan2CaptureIndex()
if err != nil {
return nil, errors.Trace(err)
}
relatedKeySpans := make([]model.KeySpanLocation, 0, len(needRemoveKeySpans))
for _, keyspanID := range needRemoveKeySpans {
if captureID, ok := allKeySpanListeningNow[keyspanID]; ok {
location := model.KeySpanLocation{
CaptureID: captureID,
KeySpanID: keyspanID,
}
relatedKeySpans = append(relatedKeySpans, location)
}
}

globalCheckpointTs := s.state.Status.CheckpointTs
for _, keyspanID := range s.currentKeySpansID {
for _, keyspanID := range s.currentKeySpanIDs {
if _, exist := allKeySpanListeningNow[keyspanID]; exist {
delete(allKeySpanListeningNow, keyspanID)
continue
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, &schedulerJob{
job := &schedulerJob{
Tp: schedulerJobTypeAddKeySpan,
KeySpanID: keyspanID,
Start: s.currentKeySpans[keyspanID].Start,
End: s.currentKeySpans[keyspanID].End,
BoundaryTs: globalCheckpointTs,
})
}
if _, ok := newKeySpans[keyspanID]; ok {
job.RelatedKeySpans = relatedKeySpans
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, job)
}
// The remaining keyspans are the keyspans which should be not listened
keyspansThatShouldNotBeListened := allKeySpanListeningNow
Expand Down Expand Up @@ -303,7 +344,7 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) {
StartTs: job.BoundaryTs,
Start: job.Start,
End: job.End,
}, job.BoundaryTs)
}, job.BoundaryTs, job.RelatedKeySpans)
case schedulerJobTypeRemoveKeySpan:
failpoint.Inject("OwnerRemoveKeySpanError", func() {
// just skip removing this keyspan
Expand Down Expand Up @@ -517,6 +558,7 @@ func (w *schedulerV1CompatWrapper) calculateWatermarks(
if resolvedTs == model.Ts(math.MaxUint64) {
return schedulerv2.CheckpointCannotProceed, 0
}

checkpointTs := resolvedTs
for _, position := range state.TaskPositions {
if checkpointTs > position.CheckPointTs {
Expand Down
Loading