Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #48] try to fix scheduler problem #90

Merged
merged 20 commits into from
Jun 15, 2022
3 changes: 1 addition & 2 deletions cdc/cdc/model/owner.go
Original file line number Diff line number Diff line change
@@ -195,7 +195,7 @@ type TaskWorkload map[KeySpanID]WorkloadInfo

// WorkloadInfo records the workload info of a keyspan
type WorkloadInfo struct {
Workload uint64 `json:"workload"`
Workload int64 `json:"workload"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change to int64? Can it be negative?

}

// Unmarshal unmarshals into *TaskWorkload from json marshal byte slice
@@ -219,7 +219,6 @@ type KeySpanReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Start []byte
End []byte
// MarkKeySpanID KeySpanID `json:"mark-keyspan-id"`
}

// Clone clones a KeySpanReplicaInfo
2 changes: 2 additions & 0 deletions cdc/cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ type changefeed struct {
feedStateManager *feedStateManager
gcManager gc.Manager
initialized bool

// isRemoved is true if the changefeed is removed
isRemoved bool

@@ -83,6 +84,7 @@ func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newScheduler = newScheduler4Test
return c
}

5 changes: 1 addition & 4 deletions cdc/cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ package owner

import (
"context"
"os"
"path/filepath"

"github.com/pingcap/check"
@@ -105,6 +104,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close(ctx)

// pre check
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
@@ -185,7 +185,4 @@ func testChangefeedReleaseResource(
err := cf.tick(ctx, state, captures)
c.Assert(err, check.IsNil)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
c.Assert(os.IsNotExist(err), check.IsTrue)
}
19 changes: 14 additions & 5 deletions cdc/cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
@@ -79,13 +79,14 @@ func NewSchedulerV2(
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
f updateCurrentKeySpansFunc,
) (*schedulerV2, error) {
ret := &schedulerV2{
changeFeedID: changeFeedID,
messageServer: messageServer,
messageRouter: messageRouter,
stats: &schedulerStats{},
updateCurrentKeySpans: updateCurrentKeySpansImpl,
updateCurrentKeySpans: f,
}
ret.BaseScheduleDispatcher = pscheduler.NewBaseScheduleDispatcher(changeFeedID, ret, checkpointTs)
if err := ret.registerPeerMessageHandlers(ctx); err != nil {
@@ -97,11 +98,11 @@ func NewSchedulerV2(

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(ctx context.Context, startTs uint64) (scheduler, error) {
func newSchedulerV2FromCtx(ctx context.Context, startTs uint64, f updateCurrentKeySpansFunc) (scheduler, error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ret, err := NewSchedulerV2(ctx, changeFeedID, startTs, messageServer, messageRouter)
ret, err := NewSchedulerV2(ctx, changeFeedID, startTs, messageServer, messageRouter, f)
if err != nil {
return nil, errors.Trace(err)
}
@@ -111,9 +112,17 @@ func newSchedulerV2FromCtx(ctx context.Context, startTs uint64) (scheduler, erro
func newScheduler(ctx context.Context, startTs uint64) (scheduler, error) {
conf := config.GetGlobalServerConfig()
if conf.Debug.EnableNewScheduler {
return newSchedulerV2FromCtx(ctx, startTs)
return newSchedulerV2FromCtx(ctx, startTs, updateCurrentKeySpansImpl)
}
return newSchedulerV1(), nil
return newSchedulerV1(updateCurrentKeySpansImpl), nil
}

func newScheduler4Test(ctx context.Context, startTs uint64) (scheduler, error) {
conf := config.GetGlobalServerConfig()
if conf.Debug.EnableNewScheduler {
return newSchedulerV2FromCtx(ctx, startTs, updateCurrentKeySpansImpl4Test)
}
return newSchedulerV1(updateCurrentKeySpansImpl4Test), nil
}

func (s *schedulerV2) Tick(
39 changes: 21 additions & 18 deletions cdc/cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -65,22 +65,24 @@ func TestSchedulerBasics(t *testing.T) {

mockOwnerNode := mockCluster.Nodes["capture-0"]

sched, err := NewSchedulerV2(
ctx,
"cf-1",
1000,
mockOwnerNode.Server,
mockOwnerNode.Router)
require.NoError(t, err)

sched.updateCurrentKeySpans = func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error) {
f := func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error) {
return []model.KeySpanID{1, 2, 3}, map[model.KeySpanID]regionspan.Span{
1: {Start: []byte{'1'}, End: []byte{'2'}},
2: {Start: []byte{'2'}, End: []byte{'3'}},
3: {Start: []byte{'3'}, End: []byte{'4'}},
}, nil
}

sched, err := NewSchedulerV2(
ctx,
"cf-1",
1000,
mockOwnerNode.Server,
mockOwnerNode.Router,
f)

require.NoError(t, err)

for atomic.LoadInt64(&sched.stats.AnnounceSentCount) < numNodes {
checkpointTs, resolvedTs, err := sched.Tick(ctx, &orchestrator.ChangefeedReactorState{
ID: "cf-1",
@@ -228,22 +230,23 @@ func TestSchedulerNoPeer(t *testing.T) {

mockOwnerNode := mockCluster.Nodes["capture-0"]

sched, err := NewSchedulerV2(
ctx,
"cf-1",
1000,
mockOwnerNode.Server,
mockOwnerNode.Router)
require.NoError(t, err)

sched.updateCurrentKeySpans = func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error) {
f := func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error) {
return []model.KeySpanID{1, 2, 3}, map[model.KeySpanID]regionspan.Span{
1: {Start: []byte{'1'}, End: []byte{'2'}},
2: {Start: []byte{'2'}, End: []byte{'3'}},
3: {Start: []byte{'3'}, End: []byte{'4'}},
}, nil
}

sched, err := NewSchedulerV2(
ctx,
"cf-1",
1000,
mockOwnerNode.Server,
mockOwnerNode.Router,
f)
require.NoError(t, err)

// Ticks the scheduler 10 times. It should not panic.
for i := 0; i < 10; i++ {
checkpointTs, resolvedTs, err := sched.Tick(ctx, &orchestrator.ChangefeedReactorState{
26 changes: 19 additions & 7 deletions cdc/cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import (
)

type schedulerJobType string
type updateCurrentKeySpansFunc func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error)

const (
schedulerJobTypeAddKeySpan schedulerJobType = "ADD"
@@ -65,13 +66,13 @@ type oldScheduler struct {
needRebalanceNextTick bool
lastTickCaptureCount int

updateCurrentKeySpans func(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error)
updateCurrentKeySpans updateCurrentKeySpansFunc
}

func newSchedulerV1() scheduler {
func newSchedulerV1(f updateCurrentKeySpansFunc) scheduler {
return &schedulerV1CompatWrapper{&oldScheduler{
moveKeySpanTargets: make(map[model.KeySpanID]model.CaptureID),
updateCurrentKeySpans: updateCurrentKeySpansImpl,
updateCurrentKeySpans: f,
}}
}

@@ -81,11 +82,11 @@ func newSchedulerV1() scheduler {
func (s *oldScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
// currentKeySpans []model.KeySpanID,
captures map[model.CaptureID]*model.CaptureInfo,
) (shouldUpdateState bool, err error) {

s.state = state
s.captures = captures

currentKeySpanIDs, currentKeySpans, err := s.updateCurrentKeySpans(ctx)
if err != nil {
return false, errors.Trace(err)
@@ -210,7 +211,7 @@ func (s *oldScheduler) keyspan2CaptureIndex() (map[model.KeySpanID]model.Capture
// If the TargetCapture of a job is not set, it chooses a capture with the minimum workload(minimum number of keyspans)
// and sets the TargetCapture to the capture.
func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
workloads := make(map[model.CaptureID]uint64)
workloads := make(map[model.CaptureID]int64)

for captureID := range s.captures {
workloads[captureID] = 0
@@ -244,9 +245,11 @@ func (s *oldScheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
}
}

count := 0
getMinWorkloadCapture := func() model.CaptureID {
count++
minCapture := ""
minWorkLoad := uint64(math.MaxUint64)
minWorkLoad := int64(math.MaxInt64)
for captureID, workload := range workloads {
if workload < minWorkLoad {
minCapture = captureID
@@ -491,6 +494,10 @@ func updateCurrentKeySpansImpl(ctx cdcContext.Context) ([]model.KeySpanID, map[m
return currentKeySpansID, currentKeySpans, nil
}

func updateCurrentKeySpansImpl4Test(ctx cdcContext.Context) ([]model.KeySpanID, map[model.KeySpanID]regionspan.Span, error) {
return nil, nil, nil
}

// schedulerV1CompatWrapper is used to wrap the old scheduler to
// support the compatibility with the new scheduler.
// It incorporates watermark calculations into the scheduler, which
@@ -547,6 +554,11 @@ func (w *schedulerV1CompatWrapper) calculateWatermarks(
}
}
}

if resolvedTs == model.Ts(math.MaxUint64) {
return schedulerv2.CheckpointCannotProceed, 0
}

checkpointTs := resolvedTs
for _, position := range state.TaskPositions {
if checkpointTs > position.CheckPointTs {
164 changes: 132 additions & 32 deletions cdc/cdc/owner/scheduler_v1_test.go

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions cdc/cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -926,3 +926,80 @@ func (s *processorSuite) TestIgnorableError(c *check.C) {
c.Assert(isProcessorIgnorableError(tc.err), check.Equals, tc.ignorable)
}
}

func (s *processorSuite) TestHandleKeySpanOperationWithRelatedKeySpans(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
p, tester := initProcessor4Test(ctx, c)
var err error

// no operation
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()

// add keyspan1, in processing
p.changefeed.PatchTaskStatus(p.captureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
status.AddKeySpan(1, &model.KeySpanReplicaInfo{StartTs: 60}, 80, nil)
return status, true, nil
})
tester.MustApplyPatches()
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()
c.Assert(p.changefeed.TaskStatuses[p.captureInfo.ID], check.DeepEquals, &model.TaskStatus{
KeySpans: map[uint64]*model.KeySpanReplicaInfo{
1: {StartTs: 60},
},
Operation: map[uint64]*model.KeySpanOperation{
1: {Delete: false, BoundaryTs: 80, Status: model.OperProcessed},
},
})
c.Assert(p.keyspans, check.HasLen, 1)
c.Assert(p.changefeed.TaskPositions[p.captureInfo.ID].CheckPointTs, check.Equals, uint64(60))
c.Assert(p.changefeed.TaskPositions[p.captureInfo.ID].ResolvedTs, check.Equals, uint64(60))

// add keyspan2 & keyspan3, remove keyspan1
p.changefeed.PatchTaskStatus(p.captureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
status.AddKeySpan(2, &model.KeySpanReplicaInfo{StartTs: 60}, 60, []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}})
status.AddKeySpan(3, &model.KeySpanReplicaInfo{StartTs: 60}, 60, []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}})
status.RemoveKeySpan(1, 60, false)
return status, true, nil
})
tester.MustApplyPatches()
// try to stop keyspand1
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()
c.Assert(p.changefeed.TaskStatuses[p.captureInfo.ID].KeySpans, check.DeepEquals, map[uint64]*model.KeySpanReplicaInfo{
2: {StartTs: 60},
3: {StartTs: 60},
})
c.Assert(p.changefeed.TaskStatuses[p.captureInfo.ID].Operation, check.DeepEquals, map[uint64]*model.KeySpanOperation{
1: {Delete: true, BoundaryTs: 60, Status: model.OperProcessed},
2: {Delete: false, BoundaryTs: 60, Status: model.OperDispatched, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
3: {Delete: false, BoundaryTs: 60, Status: model.OperDispatched, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
})
keyspan1 := p.keyspans[1].(*mockKeySpanPipeline)
keyspan1.status = keyspanpipeline.KeySpanStatusStopped

// finish stoping keyspand1
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()
c.Assert(p.changefeed.TaskStatuses[p.captureInfo.ID].Operation, check.DeepEquals, map[uint64]*model.KeySpanOperation{
1: {Delete: true, BoundaryTs: 60, Status: model.OperFinished},
2: {Delete: false, BoundaryTs: 60, Status: model.OperDispatched, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
3: {Delete: false, BoundaryTs: 60, Status: model.OperDispatched, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
})
cleanUpFinishedOpOperation(p.changefeed, p.captureInfo.ID, tester)

// start keyspan2 & keyspan3
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()
c.Assert(p.changefeed.TaskStatuses[p.captureInfo.ID].Operation, check.DeepEquals, map[uint64]*model.KeySpanOperation{
2: {Delete: false, BoundaryTs: 60, Status: model.OperProcessed, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
3: {Delete: false, BoundaryTs: 60, Status: model.OperProcessed, RelatedKeySpans: []model.KeySpanLocation{{CaptureID: p.captureInfo.ID, KeySpanID: 1}}},
})
}