Skip to content

Commit

Permalink
changefeed(ticdc): send bootstrap message asynchronously to prevent b…
Browse files Browse the repository at this point in the history
…lock other changefeeds (#11573) (#11588)

close #11565
  • Loading branch information
ti-chi-bot authored Sep 12, 2024
1 parent 04bd8f6 commit 8dee428
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 94 deletions.
8 changes: 7 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
if err != nil {
return errors.Trace(err)
}
// bootstrap not finished yet, cannot send any event.
if !c.ddlManager.isBootstrapped() {
return nil
}

err = c.handleBarrier(ctx, barrier)
if err != nil {
Expand Down Expand Up @@ -687,7 +691,9 @@ LOOP2:
c.redoMetaMgr,
downstreamType,
util.GetOrZero(info.Config.BDRMode),
info.Config.Sink.ShouldSendAllBootstrapAtStart())
info.Config.Sink.ShouldSendAllBootstrapAtStart(),
ctx.Throw,
)

// create scheduler
cfg := *c.cfg
Expand Down
88 changes: 65 additions & 23 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type mockDDLSink struct {
syncPoint model.Ts
syncPointHis []model.Ts

bootstrapError bool

wg sync.WaitGroup
}

Expand Down Expand Up @@ -146,16 +148,15 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo
return m.mu.checkpointTs, m.mu.currentTables
}

func (m *mockDDLSink) close(ctx context.Context) error {
func (m *mockDDLSink) close(_ context.Context) error {
m.wg.Wait()
return nil
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
func (m *mockDDLSink) emitBootstrap(_ context.Context, bootstrap *model.DDLEvent) error {
if m.bootstrapError {
return errors.New("emit bootstrap error")
}
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
Expand Down Expand Up @@ -196,8 +197,24 @@ func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) {
// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}

func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
) (
func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
}

func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
bootstrapError: true,
}
}

func createChangefeed4Test(ctx cdcContext.Context,
newMockDDLSink func(model.ChangeFeedID, *model.ChangeFeedInfo, func(error), func(error)) DDLSink,
t *testing.T) (
*changefeed, map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
up := upstream.NewUpstream4Test(&gc.MockPDClient{
Expand Down Expand Up @@ -228,12 +245,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
},
newMockDDLSink,
// new scheduler
func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64,
Expand Down Expand Up @@ -263,7 +275,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,

func TestPreCheck(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.Tick(ctx, captures)
tester.MustApplyPatches()
require.NotNil(t, cf.state.Status)
Expand All @@ -283,7 +295,7 @@ func TestPreCheck(t *testing.T) {

func TestInitialize(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, captures)
Expand All @@ -298,7 +310,7 @@ func TestInitialize(t *testing.T) {

func TestChangefeedHandleError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, captures)
Expand All @@ -316,6 +328,36 @@ func TestChangefeedHandleError(t *testing.T) {
require.Equal(t, cf.state.Info.Error.Message, "fake error")
}

func TestTrySendBootstrapMeetError(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
_ = helper.DDL2Event("create table test.t(id int primary key, b int)")

ctx := cdcContext.NewContext4Test(context.Background(), true)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSinkWithBootstrapError, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)

// pre check
cf.Tick(ctx, captures)
tester.MustApplyPatches()

// initialize
cf.state.Info.Config.Sink.Protocol = util.AddressOf("simple")
cf.state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true)
cf.Tick(ctx, captures)
tester.MustApplyPatches()

require.Eventually(t, func() bool {
cf.Tick(ctx, captures)
tester.MustApplyPatches()
if cf.state.Info.Error != nil {
return cf.state.Info.State == model.StatePending
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestExecDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand All @@ -328,7 +370,7 @@ func TestExecDDL(t *testing.T) {
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)
tickThreeTime := func() {
Expand Down Expand Up @@ -410,7 +452,7 @@ func TestEmitCheckpointTs(t *testing.T) {
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()

defer cf.Close(ctx)
Expand Down Expand Up @@ -470,7 +512,7 @@ func TestSyncPoint(t *testing.T) {
ctx.ChangefeedVars().Info.Config.SyncPointInterval = util.AddressOf(1 * time.Second)
// SyncPoint option is only available for MySQL compatible database.
ctx.ChangefeedVars().Info.SinkURI = "mysql://"
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -500,7 +542,7 @@ func TestSyncPoint(t *testing.T) {
func TestFinished(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -573,7 +615,7 @@ func testChangefeedReleaseResource(
redoLogDir string,
expectedInitialized bool,
) {
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)

// pre check
cf.Tick(ctx, captures)
Expand Down Expand Up @@ -615,7 +657,7 @@ func TestBarrierAdvance(t *testing.T) {
}
ctx.ChangefeedVars().Info.SinkURI = "mysql://"

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// The changefeed load the info from etcd.
Expand Down
117 changes: 76 additions & 41 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -134,8 +135,24 @@ type ddlManager struct {
sinkType model.DownstreamType
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
bootstrapState bootstrapState
reportError func(err error)
}

type bootstrapState int32

const (
bootstrapNotStarted bootstrapState = iota
bootstrapInProgress
bootstrapFinished
)

func storeBootstrapState(addr *bootstrapState, state bootstrapState) {
atomic.StoreInt32((*int32)(addr), int32(state))
}

func loadBootstrapState(addr *bootstrapState) bootstrapState {
return bootstrapState(atomic.LoadInt32((*int32)(addr)))
}

func newDDLManager(
Expand All @@ -151,6 +168,7 @@ func newDDLManager(
sinkType model.DownstreamType,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
reportError func(err error),
) *ddlManager {
log.Info("create ddl manager",
zap.String("namaspace", changefeedID.Namespace),
Expand All @@ -160,6 +178,11 @@ func newDDLManager(
zap.Bool("bdrMode", bdrMode),
zap.Stringer("sinkType", sinkType))

bootstrap := bootstrapFinished
if shouldSendAllBootstrapAtStart {
bootstrap = bootstrapNotStarted
}

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
Expand All @@ -173,47 +196,61 @@ func newDDLManager(
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
bootstrapState: bootstrap,
reportError: reportError,
}
}

func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) {
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
func (m *ddlManager) isBootstrapped() bool {
return loadBootstrapState(&m.bootstrapState) == bootstrapFinished
}

// return true if bootstrapped
func (m *ddlManager) trySendBootstrap(ctx context.Context, currentTables []*model.TableInfo) bool {
bootstrap := loadBootstrapState(&m.bootstrapState)
switch bootstrap {
case bootstrapFinished:
return true
case bootstrapInProgress:
return false
case bootstrapNotStarted:
}
storeBootstrapState(&m.bootstrapState, bootstrapInProgress)
start := time.Now()
defer func() {
go func() {
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)))
for idx, table := range currentTables {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
log.Error("send bootstrap message failed",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)),
zap.Int("emitted", idx+1),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
m.reportError(err)
return
}
}
storeBootstrapState(&m.bootstrapState, bootstrapFinished)
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}
}
m.bootstraped = true
return true, nil
return m.isBootstrapped()
}

// tick the ddlHandler, it does the following things:
Expand All @@ -233,19 +270,17 @@ func (m *ddlManager) tick(
m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

ok, err := m.checkAndSendBootstrapMsgs(ctx)
currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}

// before bootstrap finished, cannot send any event.
ok := m.trySendBootstrap(ctx, currentTables)
if !ok {
return nil, nil, nil
}

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}

if m.executingDDL == nil {
m.ddlSink.emitCheckpointTs(m.checkpointTs, currentTables)
}
Expand Down
Loading

0 comments on commit 8dee428

Please sign in to comment.