Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] try to fix scheduler problem #90

Merged
merged 20 commits into from
Jun 15, 2022
17 changes: 13 additions & 4 deletions cdc/cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type KeySpanOperation struct {
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Status uint64 `json:"status,omitempty"`

RelatedKeySpans []KeySpanLocation `json:"related_key_spans"`
}

// KeySpanProcessed returns whether the keyspan has been processed by processor
Expand Down Expand Up @@ -270,7 +272,7 @@ func (ts *TaskStatus) RemoveKeySpan(id KeySpanID, boundaryTs Ts, isMoveKeySpan b
}

// AddKeySpan add the keyspan in KeySpanInfos and add a add kyespan operation.
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts) {
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts, relatedKeySpans []KeySpanLocation) {
if ts.KeySpans == nil {
ts.KeySpans = make(map[KeySpanID]*KeySpanReplicaInfo)
}
Expand All @@ -284,9 +286,10 @@ func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boun
ts.Operation = make(map[KeySpanID]*KeySpanOperation)
}
ts.Operation[id] = &KeySpanOperation{
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
RelatedKeySpans: relatedKeySpans,
}
}

Expand Down Expand Up @@ -448,3 +451,9 @@ type ProcInfoSnap struct {
CaptureID string `json:"capture-id"`
KeySpans map[KeySpanID]*KeySpanReplicaInfo `json:"-"`
}

// KeySpanLocation records which capture a keyspan is in
type KeySpanLocation struct {
CaptureID string `json:"capture_id"`
KeySpanID KeySpanID `json:"keyspan_id"`
}
23 changes: 4 additions & 19 deletions cdc/cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@ func TestAdminJobType(t *testing.T) {
}
}

func TestDDLStateString(t *testing.T) {
t.Parallel()

names := map[ChangeFeedDDLState]string{
ChangeFeedSyncDML: "SyncDML",
ChangeFeedWaitToExecDDL: "WaitToExecDDL",
ChangeFeedExecDDL: "ExecDDL",
ChangeFeedDDLExecuteFailed: "DDLExecuteFailed",
ChangeFeedDDLState(100): "Unknown",
}
for state, name := range names {
require.Equal(t, name, state.String())
}
}

func TestTaskPositionMarshal(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -265,11 +250,11 @@ func TestAddKeySpan(t *testing.T) {
},
}
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts, nil)
require.Equal(t, expected, status)

// add existing keyspan does nothing
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1, nil)
require.Equal(t, expected, status)
}

Expand All @@ -279,8 +264,8 @@ func TestTaskStatusApplyState(t *testing.T) {
ts1 := uint64(420875042036766723)
ts2 := uint64(420876783269969921)
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1, nil)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2, nil)
require.True(t, status.SomeOperationsUnapplied())
require.Equal(t, ts1, status.AppliedTs())

Expand Down
59 changes: 50 additions & 9 deletions cdc/cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type schedulerJob struct {
// if the operation is an add operation, boundaryTs is start ts
BoundaryTs uint64
TargetCapture model.CaptureID

RelatedKeySpans []model.KeySpanLocation
}

type moveKeySpanJob struct {
Expand All @@ -54,7 +56,7 @@ type moveKeySpanJob struct {

type oldScheduler struct {
state *orchestrator.ChangefeedReactorState
currentKeySpansID []model.KeySpanID
currentKeySpanIDs []model.KeySpanID
currentKeySpans map[model.KeySpanID]regionspan.Span
captures map[model.CaptureID]*model.CaptureInfo

Expand Down Expand Up @@ -84,13 +86,16 @@ func (s *oldScheduler) Tick(
) (shouldUpdateState bool, err error) {

s.state = state
s.currentKeySpansID, s.currentKeySpans, err = s.updateCurrentKeySpans(ctx)
currentKeySpanIDs, currentKeySpans, err := s.updateCurrentKeySpans(ctx)
if err != nil {
return false, errors.Trace(err)
}

newKeySpans, needRemoveKeySpans := s.diffCurrentKeySpans(currentKeySpans)
s.currentKeySpanIDs, s.currentKeySpans = currentKeySpanIDs, currentKeySpans

s.cleanUpFinishedOperations()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans(newKeySpans, needRemoveKeySpans)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -113,6 +118,27 @@ func (s *oldScheduler) Tick(
return shouldUpdateState, nil
}

func (s *oldScheduler) diffCurrentKeySpans(currentKeySpans map[model.KeySpanID]regionspan.Span) (map[model.KeySpanID]struct{}, []model.KeySpanID) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test for this method.

oldKeySpans := s.currentKeySpans

newKeySpans := map[model.KeySpanID]struct{}{}
needRemoveKeySpans := []model.KeySpanID{}

for keyspanID := range oldKeySpans {
if _, ok := currentKeySpans[keyspanID]; !ok {
needRemoveKeySpans = append(needRemoveKeySpans, keyspanID)
}
}

for keyspanID := range currentKeySpans {
if _, ok := oldKeySpans[keyspanID]; !ok {
newKeySpans[keyspanID] = struct{}{}
}
}

return newKeySpans, needRemoveKeySpans
}

func (s *oldScheduler) MoveKeySpan(keyspanID model.KeySpanID, target model.CaptureID) {
s.moveKeySpanJobQueue = append(s.moveKeySpanJobQueue, &moveKeySpanJob{
keyspanID: keyspanID,
Expand Down Expand Up @@ -246,26 +272,41 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {

// syncKeySpansWithCurrentKeySpans iterates all current keyspans to check whether it should be listened or not.
// this function will return schedulerJob to make sure all keyspans will be listened.
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans() ([]*schedulerJob, error) {
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans(newKeySpans map[model.KeySpanID]struct{}, needRemoveKeySpans []model.KeySpanID) ([]*schedulerJob, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add unit test for this method.

var pendingJob []*schedulerJob
allKeySpanListeningNow, err := s.keyspan2CaptureIndex()
if err != nil {
return nil, errors.Trace(err)
}
relatedKeySpans := make([]model.KeySpanLocation, 0, len(needRemoveKeySpans))
for _, keyspanID := range needRemoveKeySpans {
if captureID, ok := allKeySpanListeningNow[keyspanID]; ok {
location := model.KeySpanLocation{
CaptureID: captureID,
KeySpanID: keyspanID,
}
relatedKeySpans = append(relatedKeySpans, location)
}
}

globalCheckpointTs := s.state.Status.CheckpointTs
for _, keyspanID := range s.currentKeySpansID {
for _, keyspanID := range s.currentKeySpanIDs {
if _, exist := allKeySpanListeningNow[keyspanID]; exist {
delete(allKeySpanListeningNow, keyspanID)
continue
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, &schedulerJob{
job := &schedulerJob{
Tp: schedulerJobTypeAddKeySpan,
KeySpanID: keyspanID,
Start: s.currentKeySpans[keyspanID].Start,
End: s.currentKeySpans[keyspanID].End,
BoundaryTs: globalCheckpointTs,
})
}
if _, ok := newKeySpans[keyspanID]; ok {
job.RelatedKeySpans = relatedKeySpans
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, job)
}
// The remaining keyspans are the keyspans which should be not listened
keyspansThatShouldNotBeListened := allKeySpanListeningNow
Expand Down Expand Up @@ -300,7 +341,7 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) {
StartTs: job.BoundaryTs,
Start: job.Start,
End: job.End,
}, job.BoundaryTs)
}, job.BoundaryTs, job.RelatedKeySpans)
case schedulerJobTypeRemoveKeySpan:
failpoint.Inject("OwnerRemoveKeySpanError", func() {
// just skip removing this keyspan
Expand Down
16 changes: 16 additions & 0 deletions cdc/cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,10 @@ func (p *processor) handleKeySpanOperation(ctx cdcContext.Context) error {
if replicaInfo.StartTs != opt.BoundaryTs {
log.Warn("the startTs and BoundaryTs of add keyspan operation should be always equaled", zap.Any("replicaInfo", replicaInfo))
}

if !p.checkRelatedKeyspans(opt.RelatedKeySpans) {
continue
}
err := p.addKeySpan(ctx, keyspanID, replicaInfo)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -597,6 +601,18 @@ func (p *processor) handleKeySpanOperation(ctx cdcContext.Context) error {
return nil
}

func (p *processor) checkRelatedKeyspans(relatedKeySpans []model.KeySpanLocation) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit test for this method.

for _, location := range relatedKeySpans {
if taskStatus, ok := p.changefeed.TaskStatuses[location.CaptureID]; ok {
if operation, ok := taskStatus.Operation[location.KeySpanID]; ok && operation.Status != model.OperFinished {
return false
}
}

}
return true
}

func (p *processor) sendError(err error) {
if err == nil {
return
Expand Down
8 changes: 4 additions & 4 deletions cdc/cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *processorSuite) TestHandleKeySpanOperation4SingleKeySpan(c *check.C) {
// add keyspan, in processing
// in current implementation of owner, the startTs and BoundaryTs of add keyspan operation should be always equaled.
p.changefeed.PatchTaskStatus(p.captureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
status.AddKeySpan(66, &model.KeySpanReplicaInfo{StartTs: 60}, 60)
status.AddKeySpan(66, &model.KeySpanReplicaInfo{StartTs: 60}, 60, nil)
return status, true, nil
})
tester.MustApplyPatches()
Expand Down Expand Up @@ -351,9 +351,9 @@ func (s *processorSuite) TestHandleKeySpanOperation4MultiKeySpan(c *check.C) {
// add keyspan, in processing
// in current implementation of owner, the startTs and BoundaryTs of add keyspan operation should be always equaled.
p.changefeed.PatchTaskStatus(p.captureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
status.AddKeySpan(1, &model.KeySpanReplicaInfo{StartTs: 60}, 60)
status.AddKeySpan(2, &model.KeySpanReplicaInfo{StartTs: 50}, 50)
status.AddKeySpan(3, &model.KeySpanReplicaInfo{StartTs: 40}, 40)
status.AddKeySpan(1, &model.KeySpanReplicaInfo{StartTs: 60}, 60, nil)
status.AddKeySpan(2, &model.KeySpanReplicaInfo{StartTs: 50}, 50, nil)
status.AddKeySpan(3, &model.KeySpanReplicaInfo{StartTs: 40}, 40, nil)
status.KeySpans[4] = &model.KeySpanReplicaInfo{StartTs: 30}
return status, true, nil
})
Expand Down