Skip to content

Commit

Permalink
[to #48] try to fix scheduler problem (#90)
Browse files Browse the repository at this point in the history
* try fix scheduler problem

Signed-off-by: zeminzhou <[email protected]>

* optimization

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* remove commented codes

Signed-off-by: zeminzhou <[email protected]>

* fix ut for owner

Signed-off-by: zeminzhou <[email protected]>

* add ut for processor

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* remove spaceline

Signed-off-by: zeminzhou <[email protected]>

* int64 -> uint64

Signed-off-by: zeminzhou <[email protected]>

* .

Signed-off-by: zeminzhou <[email protected]>

* .

Signed-off-by: zeminzhou <[email protected]>

Co-authored-by: zeminzhou <[email protected]>
Co-authored-by: Ping Yu <[email protected]>
  • Loading branch information
3 people authored Jun 15, 2022
1 parent 572ac00 commit 7aefe87
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 58 deletions.
18 changes: 13 additions & 5 deletions cdc/cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type KeySpanOperation struct {
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Status uint64 `json:"status,omitempty"`

RelatedKeySpans []KeySpanLocation `json:"related_key_spans"`
}

// KeySpanProcessed returns whether the keyspan has been processed by processor
Expand Down Expand Up @@ -217,7 +219,6 @@ type KeySpanReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Start []byte
End []byte
// MarkKeySpanID KeySpanID `json:"mark-keyspan-id"`
}

// Clone clones a KeySpanReplicaInfo
Expand Down Expand Up @@ -270,7 +271,7 @@ func (ts *TaskStatus) RemoveKeySpan(id KeySpanID, boundaryTs Ts, isMoveKeySpan b
}

// AddKeySpan add the keyspan in KeySpanInfos and add a add kyespan operation.
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts) {
func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boundaryTs Ts, relatedKeySpans []KeySpanLocation) {
if ts.KeySpans == nil {
ts.KeySpans = make(map[KeySpanID]*KeySpanReplicaInfo)
}
Expand All @@ -284,9 +285,10 @@ func (ts *TaskStatus) AddKeySpan(id KeySpanID, keyspan *KeySpanReplicaInfo, boun
ts.Operation = make(map[KeySpanID]*KeySpanOperation)
}
ts.Operation[id] = &KeySpanOperation{
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
Delete: false,
BoundaryTs: boundaryTs,
Status: OperDispatched,
RelatedKeySpans: relatedKeySpans,
}
}

Expand Down Expand Up @@ -448,3 +450,9 @@ type ProcInfoSnap struct {
CaptureID string `json:"capture-id"`
KeySpans map[KeySpanID]*KeySpanReplicaInfo `json:"-"`
}

// KeySpanLocation records which capture a keyspan is in
type KeySpanLocation struct {
CaptureID string `json:"capture_id"`
KeySpanID KeySpanID `json:"keyspan_id"`
}
23 changes: 4 additions & 19 deletions cdc/cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@ func TestAdminJobType(t *testing.T) {
}
}

func TestDDLStateString(t *testing.T) {
t.Parallel()

names := map[ChangeFeedDDLState]string{
ChangeFeedSyncDML: "SyncDML",
ChangeFeedWaitToExecDDL: "WaitToExecDDL",
ChangeFeedExecDDL: "ExecDDL",
ChangeFeedDDLExecuteFailed: "DDLExecuteFailed",
ChangeFeedDDLState(100): "Unknown",
}
for state, name := range names {
require.Equal(t, name, state.String())
}
}

func TestTaskPositionMarshal(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -265,11 +250,11 @@ func TestAddKeySpan(t *testing.T) {
},
}
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts}, ts, nil)
require.Equal(t, expected, status)

// add existing keyspan does nothing
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: 1}, 1, nil)
require.Equal(t, expected, status)
}

Expand All @@ -279,8 +264,8 @@ func TestTaskStatusApplyState(t *testing.T) {
ts1 := uint64(420875042036766723)
ts2 := uint64(420876783269969921)
status := &TaskStatus{}
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2)
status.AddKeySpan(1, &KeySpanReplicaInfo{StartTs: ts1}, ts1, nil)
status.AddKeySpan(2, &KeySpanReplicaInfo{StartTs: ts2}, ts2, nil)
require.True(t, status.SomeOperationsUnapplied())
require.Equal(t, ts1, status.AppliedTs())

Expand Down
64 changes: 53 additions & 11 deletions cdc/cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type schedulerJob struct {
// if the operation is an add operation, boundaryTs is start ts
BoundaryTs uint64
TargetCapture model.CaptureID

RelatedKeySpans []model.KeySpanLocation
}

type moveKeySpanJob struct {
Expand All @@ -55,7 +57,7 @@ type moveKeySpanJob struct {

type oldScheduler struct {
state *orchestrator.ChangefeedReactorState
currentKeySpansID []model.KeySpanID
currentKeySpanIDs []model.KeySpanID
currentKeySpans map[model.KeySpanID]regionspan.Span
captures map[model.CaptureID]*model.CaptureInfo

Expand All @@ -80,20 +82,21 @@ func newSchedulerV1(f updateCurrentKeySpansFunc) scheduler {
func (s *oldScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
// currentKeySpans []model.KeySpanID,
captures map[model.CaptureID]*model.CaptureInfo,
) (shouldUpdateState bool, err error) {

s.state = state
s.captures = captures

s.currentKeySpansID, s.currentKeySpans, err = s.updateCurrentKeySpans(ctx)
currentKeySpanIDs, currentKeySpans, err := s.updateCurrentKeySpans(ctx)
if err != nil {
return false, errors.Trace(err)
}

newKeySpans, needRemoveKeySpans := s.diffCurrentKeySpans(currentKeySpans)
s.currentKeySpanIDs, s.currentKeySpans = currentKeySpanIDs, currentKeySpans

s.cleanUpFinishedOperations()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans()
pendingJob, err := s.syncKeySpansWithCurrentKeySpans(newKeySpans, needRemoveKeySpans)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -116,6 +119,27 @@ func (s *oldScheduler) Tick(
return shouldUpdateState, nil
}

func (s *oldScheduler) diffCurrentKeySpans(currentKeySpans map[model.KeySpanID]regionspan.Span) (map[model.KeySpanID]struct{}, []model.KeySpanID) {
oldKeySpans := s.currentKeySpans

newKeySpans := map[model.KeySpanID]struct{}{}
needRemoveKeySpans := []model.KeySpanID{}

for keyspanID := range oldKeySpans {
if _, ok := currentKeySpans[keyspanID]; !ok {
needRemoveKeySpans = append(needRemoveKeySpans, keyspanID)
}
}

for keyspanID := range currentKeySpans {
if _, ok := oldKeySpans[keyspanID]; !ok {
newKeySpans[keyspanID] = struct{}{}
}
}

return newKeySpans, needRemoveKeySpans
}

func (s *oldScheduler) MoveKeySpan(keyspanID model.KeySpanID, target model.CaptureID) {
s.moveKeySpanJobQueue = append(s.moveKeySpanJobQueue, &moveKeySpanJob{
keyspanID: keyspanID,
Expand Down Expand Up @@ -221,7 +245,9 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
}
}

count := 0
getMinWorkloadCapture := func() model.CaptureID {
count++
minCapture := ""
minWorkLoad := uint64(math.MaxUint64)
for captureID, workload := range workloads {
Expand Down Expand Up @@ -249,26 +275,41 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {

// syncKeySpansWithCurrentKeySpans iterates all current keyspans to check whether it should be listened or not.
// this function will return schedulerJob to make sure all keyspans will be listened.
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans() ([]*schedulerJob, error) {
func (s *oldScheduler) syncKeySpansWithCurrentKeySpans(newKeySpans map[model.KeySpanID]struct{}, needRemoveKeySpans []model.KeySpanID) ([]*schedulerJob, error) {
var pendingJob []*schedulerJob
allKeySpanListeningNow, err := s.keyspan2CaptureIndex()
if err != nil {
return nil, errors.Trace(err)
}
relatedKeySpans := make([]model.KeySpanLocation, 0, len(needRemoveKeySpans))
for _, keyspanID := range needRemoveKeySpans {
if captureID, ok := allKeySpanListeningNow[keyspanID]; ok {
location := model.KeySpanLocation{
CaptureID: captureID,
KeySpanID: keyspanID,
}
relatedKeySpans = append(relatedKeySpans, location)
}
}

globalCheckpointTs := s.state.Status.CheckpointTs
for _, keyspanID := range s.currentKeySpansID {
for _, keyspanID := range s.currentKeySpanIDs {
if _, exist := allKeySpanListeningNow[keyspanID]; exist {
delete(allKeySpanListeningNow, keyspanID)
continue
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, &schedulerJob{
job := &schedulerJob{
Tp: schedulerJobTypeAddKeySpan,
KeySpanID: keyspanID,
Start: s.currentKeySpans[keyspanID].Start,
End: s.currentKeySpans[keyspanID].End,
BoundaryTs: globalCheckpointTs,
})
}
if _, ok := newKeySpans[keyspanID]; ok {
job.RelatedKeySpans = relatedKeySpans
}
// For each keyspan which should be listened but is not, add an adding-keyspan job to the pending job list
pendingJob = append(pendingJob, job)
}
// The remaining keyspans are the keyspans which should be not listened
keyspansThatShouldNotBeListened := allKeySpanListeningNow
Expand Down Expand Up @@ -303,7 +344,7 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) {
StartTs: job.BoundaryTs,
Start: job.Start,
End: job.End,
}, job.BoundaryTs)
}, job.BoundaryTs, job.RelatedKeySpans)
case schedulerJobTypeRemoveKeySpan:
failpoint.Inject("OwnerRemoveKeySpanError", func() {
// just skip removing this keyspan
Expand Down Expand Up @@ -517,6 +558,7 @@ func (w *schedulerV1CompatWrapper) calculateWatermarks(
if resolvedTs == model.Ts(math.MaxUint64) {
return schedulerv2.CheckpointCannotProceed, 0
}

checkpointTs := resolvedTs
for _, position := range state.TaskPositions {
if checkpointTs > position.CheckPointTs {
Expand Down
Loading

0 comments on commit 7aefe87

Please sign in to comment.