diff --git a/dm/dm/worker/relay.go b/dm/dm/worker/relay.go index b0ccc934293..0656e165ca3 100644 --- a/dm/dm/worker/relay.go +++ b/dm/dm/worker/relay.go @@ -50,10 +50,8 @@ type RelayHolder interface { Result() *pb.ProcessResult // Update updates relay config online Update(ctx context.Context, cfg *config.SourceConfig) error - // RegisterListener registers a relay listener - RegisterListener(el relay.Listener) - // UnRegisterListener unregisters a relay listener - UnRegisterListener(el relay.Listener) + // Relay returns relay object + Relay() relay.Process } // NewRelayHolder is relay holder initializer @@ -310,12 +308,8 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo { return h.relay.ActiveRelayLog() } -func (h *realRelayHolder) RegisterListener(el relay.Listener) { - h.relay.RegisterListener(el) -} - -func (h *realRelayHolder) UnRegisterListener(el relay.Listener) { - h.relay.UnRegisterListener(el) +func (h *realRelayHolder) Relay() relay.Process { + return h.relay } /******************** dummy relay holder ********************/ @@ -326,14 +320,16 @@ type dummyRelayHolder struct { stage pb.Stage relayBinlog string - cfg *config.SourceConfig + cfg *config.SourceConfig + relay2 relay.Process } // NewDummyRelayHolder creates a new RelayHolder. func NewDummyRelayHolder(cfg *config.SourceConfig) RelayHolder { return &dummyRelayHolder{ - cfg: cfg, - stage: pb.Stage_New, + cfg: cfg, + stage: pb.Stage_New, + relay2: &relay.Relay{}, } } @@ -436,8 +432,6 @@ func (d *dummyRelayHolder) Stage() pb.Stage { return d.stage } -func (d *dummyRelayHolder) RegisterListener(el relay.Listener) { -} - -func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) { +func (d *dummyRelayHolder) Relay() relay.Process { + return d.relay2 } diff --git a/dm/dm/worker/relay_test.go b/dm/dm/worker/relay_test.go index bbf12856de6..589843c80ca 100644 --- a/dm/dm/worker/relay_test.go +++ b/dm/dm/worker/relay_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/dm/dm/unit" "github.com/pingcap/ticdc/dm/pkg/binlog" "github.com/pingcap/ticdc/dm/pkg/gtid" + "github.com/pingcap/ticdc/dm/pkg/log" pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/utils" "github.com/pingcap/ticdc/dm/relay" @@ -46,6 +47,10 @@ type DummyRelay struct { reloadErr error } +func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader { + return nil +} + func (d *DummyRelay) RegisterListener(el relay.Listener) { } diff --git a/dm/dm/worker/server_test.go b/dm/dm/worker/server_test.go index b9470f34634..3d77b729b96 100644 --- a/dm/dm/worker/server_test.go +++ b/dm/dm/worker/server_test.go @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) @@ -551,7 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) { func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) { etcd.Close() - time.Sleep(retryConnectSleepTime + time.Duration(defaultKeepAliveTTL+3)*time.Second) + c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool { + return s.getWorker(true) == nil + }), IsTrue) c.Assert(s.getWorker(true), IsNil) } diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index a456bcdec01..fd36f8e75a2 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -357,11 +357,9 @@ func (w *SourceWorker) EnableRelay() (err error) { w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay) }() - w.relayHolder.RegisterListener(w.subTaskHolder) - w.relayEnabled.Store(true) w.l.Info("relay enabled") - w.subTaskHolder.resetAllSubTasks(true) + w.subTaskHolder.resetAllSubTasks(w.getRelayWithoutLock()) return nil } @@ -386,12 +384,11 @@ func (w *SourceWorker) DisableRelay() { w.l.Info("finish refreshing task checker") } - w.subTaskHolder.resetAllSubTasks(false) + w.subTaskHolder.resetAllSubTasks(nil) if w.relayHolder != nil { r := w.relayHolder w.relayHolder = nil - r.UnRegisterListener(w.subTaskHolder) r.Close() } if w.relayPurger != nil { @@ -523,7 +520,15 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.St } w.l.Info("subtask created", zap.Stringer("config", cfg2)) - st.Run(expectStage) + st.Run(expectStage, w.getRelayWithoutLock()) + return nil +} + +// caller should make sure w.Lock is locked before calling this method. +func (w *SourceWorker) getRelayWithoutLock() relay.Process { + if w.relayHolder != nil { + return w.relayHolder.Relay() + } return nil } @@ -570,10 +575,10 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { err = st.Pause() case pb.TaskOp_Resume: w.l.Info("resume sub task", zap.String("task", name)) - err = st.Resume() + err = st.Resume(w.getRelayWithoutLock()) case pb.TaskOp_AutoResume: w.l.Info("auto_resume sub task", zap.String("task", name)) - err = st.Resume() + err = st.Resume(w.getRelayWithoutLock()) default: err = terror.ErrWorkerUpdateTaskStage.Generatef("invalid operate %s on subtask %v", op, name) } @@ -1043,5 +1048,5 @@ func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErro return terror.ErrWorkerSubTaskNotFound.Generate(req.Task) } - return st.HandleError(ctx, req) + return st.HandleError(ctx, req, w.getRelayWithoutLock()) } diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 7cf46630398..b6b8f7dd2c7 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{}) func (t *testWorkerFunctionalities) SetUpSuite(c *C) { NewRelayHolder = NewDummyRelayHolder NewSubTask = NewRealSubTask - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { atomic.AddInt32(&t.createUnitCount, 1) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index aa7cbeba221..3fdb67d8650 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -45,22 +45,12 @@ const ( waitRelayCatchupTimeout = 30 * time.Second ) -type relayNotifier struct { - // ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times - ch chan interface{} -} - -// Notified implements streamer.EventNotifier. -func (r relayNotifier) Notified() chan interface{} { - return r.ch -} - // createRealUnits is subtask units initializer // it can be used for testing. var createUnits = createRealUnits // createRealUnits creates process units base on task mode. -func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit { +func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, relay relay.Process) []unit.Unit { failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) { log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly")) failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)}) @@ -75,7 +65,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor } else { us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } - us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) case config.ModeFull: // NOTE: maybe need another checker in the future? us = append(us, dumpling.NewDumpling(cfg)) @@ -85,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } case config.ModeIncrement: - us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, relay)) default: log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode)) } @@ -119,8 +109,6 @@ type SubTask struct { etcdClient *clientv3.Client workerName string - - notifier relay.EventNotifier } // NewSubTask is subtask initializer @@ -143,15 +131,14 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * cancel: cancel, etcdClient: etcdClient, workerName: workerName, - notifier: &relayNotifier{ch: make(chan interface{}, 1)}, } updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName) return &st } // initUnits initializes the sub task processing units. -func (st *SubTask) initUnits() error { - st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier) +func (st *SubTask) initUnits(relay relay.Process) error { + st.units = createUnits(st.cfg, st.etcdClient, st.workerName, relay) if len(st.units) < 1 { return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode) } @@ -211,7 +198,7 @@ func (st *SubTask) initUnits() error { // Run runs the sub task. // TODO: check concurrent problems. -func (st *SubTask) Run(expectStage pb.Stage) { +func (st *SubTask) Run(expectStage pb.Stage, relay relay.Process) { if st.Stage() == pb.Stage_Finished || st.Stage() == pb.Stage_Running { st.l.Warn("prepare to run a subtask with invalid stage", zap.Stringer("current stage", st.Stage()), @@ -219,7 +206,7 @@ func (st *SubTask) Run(expectStage pb.Stage) { return } - if err := st.initUnits(); err != nil { + if err := st.initUnits(relay); err != nil { st.l.Error("fail to initial subtask", log.ShortError(err)) st.fail(err) return @@ -511,9 +498,9 @@ func (st *SubTask) Pause() error { // Resume resumes the paused sub task // TODO: similar to Run, refactor later. -func (st *SubTask) Resume() error { +func (st *SubTask) Resume(relay relay.Process) error { if !st.initialized.Load() { - st.Run(pb.Stage_Running) + st.Run(pb.Stage_Running, relay) return nil } @@ -714,7 +701,7 @@ func (st *SubTask) fail(err error) { } // HandleError handle error for syncer unit. -func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) error { +func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest, relay relay.Process) error { syncUnit, ok := st.currUnit.(*syncer.Syncer) if !ok { return terror.ErrWorkerOperSyncUnitOnly.Generate(st.currUnit.Type()) @@ -726,7 +713,7 @@ func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReq } if st.Stage() == pb.Stage_Paused { - err = st.Resume() + err = st.Resume(relay) } return err } @@ -738,11 +725,3 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage)) } } - -func (st *SubTask) relayNotify() { - // skip if there's pending notify - select { - case st.notifier.Notified() <- struct{}{}: - default: - } -} diff --git a/dm/dm/worker/subtask_holder.go b/dm/dm/worker/subtask_holder.go index e90aa9fd98e..8a87c057a8c 100644 --- a/dm/dm/worker/subtask_holder.go +++ b/dm/dm/worker/subtask_holder.go @@ -17,7 +17,7 @@ import ( "context" "sync" - "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/ticdc/dm/relay" ) // subTaskHolder holds subtask instances. @@ -50,7 +50,7 @@ func (h *subTaskHolder) removeSubTask(name string) { } // resetAllSubTasks does Close, change cfg.UseRelay then Init the subtasks. -func (h *subTaskHolder) resetAllSubTasks(useRelay bool) { +func (h *subTaskHolder) resetAllSubTasks(relay relay.Process) { h.mu.Lock() defer h.mu.Unlock() for _, st := range h.subTasks { @@ -58,8 +58,8 @@ func (h *subTaskHolder) resetAllSubTasks(useRelay bool) { st.Close() // TODO: make a st.reset st.ctx, st.cancel = context.WithCancel(context.Background()) - st.cfg.UseRelay = useRelay - st.Run(stage) + st.cfg.UseRelay = relay != nil + st.Run(stage, relay) } } @@ -90,14 +90,3 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask { } return result } - -// OnEvent implements relay.Listener -// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here -// as relay event need to broadcast to every syncer(most subtask have a syncer). -func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) { - h.mu.RLock() - defer h.mu.RUnlock() - for _, s := range h.subTasks { - s.relayNotify() - } -} diff --git a/dm/dm/worker/subtask_test.go b/dm/dm/worker/subtask_test.go index 111ab901c1f..d59ce5de9ea 100644 --- a/dm/dm/worker/subtask_test.go +++ b/dm/dm/worker/subtask_test.go @@ -177,20 +177,20 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { return nil } - st.Run(pb.Stage_Running) + st.Run(pb.Stage_Running, nil) c.Assert(st.Stage(), Equals, pb.Stage_Paused) c.Assert(strings.Contains(st.Result().Errors[0].String(), "has no dm units for mode"), IsTrue) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } - st.Run(pb.Stage_Running) + st.Run(pb.Stage_Running, nil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -223,7 +223,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { c.Assert(st.Stage(), Equals, pb.Stage_Paused) // restore from pausing - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.CurrUnit(), Equals, mockLoader) c.Assert(st.Result(), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) @@ -251,7 +251,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { } // run again - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.CurrUnit(), Equals, mockLoader) c.Assert(st.Result(), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) @@ -265,7 +265,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { } // run again - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.CurrUnit(), Equals, mockLoader) c.Assert(st.Result(), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) @@ -297,11 +297,11 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } - st.Run(pb.Stage_Running) + st.Run(pb.Stage_Running, nil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -334,7 +334,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { } // resume - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -347,7 +347,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { } // resume - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -374,12 +374,12 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { c.Assert(strings.Contains(st.Result().Errors[0].Message, "dumper process error"), IsTrue) // resume twice - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) - c.Assert(st.Resume(), NotNil) + c.Assert(st.Resume(nil), NotNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -401,7 +401,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.Result().Errors, HasLen, 0) - st.Run(pb.Stage_Finished) + st.Run(pb.Stage_Finished, nil) c.Assert(st.CurrUnit(), Equals, mockLoader) c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.Result().Errors, HasLen, 0) @@ -421,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -431,7 +431,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { c.Assert(st.CurrUnit(), Equals, nil) c.Assert(st.Result(), IsNil) - c.Assert(st.Resume(), IsNil) + c.Assert(st.Resume(nil), IsNil) c.Assert(st.Stage(), Equals, pb.Stage_Running) c.Assert(st.CurrUnit(), Equals, mockDumper) c.Assert(st.Result(), IsNil) @@ -446,11 +446,11 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker") c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } - st.Run(pb.Stage_Finished) + st.Run(pb.Stage_Finished, nil) c.Assert(st.Stage(), Equals, pb.Stage_Finished) c.Assert(st.CurrUnit(), Equals, nil) c.Assert(st.Result(), IsNil) diff --git a/dm/pkg/binlog/reader/tcp.go b/dm/pkg/binlog/reader/tcp.go index 7a27770f231..973b615b4bf 100644 --- a/dm/pkg/binlog/reader/tcp.go +++ b/dm/pkg/binlog/reader/tcp.go @@ -15,16 +15,11 @@ package reader import ( "context" - "database/sql" "encoding/json" - "fmt" - "strconv" "sync" - "sync/atomic" gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" - "github.com/go-sql-driver/mysql" "go.uber.org/zap" "github.com/pingcap/failpoint" @@ -33,11 +28,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" - "github.com/pingcap/ticdc/dm/pkg/utils" ) -var customID int64 - // TCPReader is a binlog event reader which read binlog events from a TCP stream. type TCPReader struct { syncerCfg replication.BinlogSyncerConfig @@ -138,39 +130,8 @@ func (r *TCPReader) Close() error { failpoint.Return(nil) }) - defer r.syncer.Close() - connID := r.syncer.LastConnectionID() - if connID > 0 { - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", - r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port) - if r.syncerCfg.TLSConfig != nil { - tlsName := "replicate" + strconv.FormatInt(atomic.AddInt64(&customID, 1), 10) - err := mysql.RegisterTLSConfig(tlsName, r.syncerCfg.TLSConfig) - if err != nil { - return terror.WithScope( - terror.Annotatef(terror.DBErrorAdapt(err, terror.ErrDBDriverError), - "fail to register tls config for master %s:%d", r.syncerCfg.Host, r.syncerCfg.Port), terror.ScopeUpstream) - } - dsn += "&tls=" + tlsName - defer mysql.DeregisterTLSConfig(tlsName) - } - db, err := sql.Open("mysql", dsn) - if err != nil { - return terror.WithScope( - terror.Annotatef(terror.DBErrorAdapt(err, terror.ErrDBDriverError), - "open connection to the master %s:%d", r.syncerCfg.Host, r.syncerCfg.Port), terror.ScopeUpstream) - } - defer db.Close() - - // try to KILL the conn in default timeout. - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout) - defer cancel() - err = utils.KillConn(ctx, db, connID) - if err != nil { - return terror.WithScope(terror.Annotatef(err, "kill connection %d for master %s:%d", connID, r.syncerCfg.Host, r.syncerCfg.Port), terror.ScopeUpstream) - } - } - + // unclosed conn bug already fixed in go-mysql, https://github.com/go-mysql-org/go-mysql/pull/411 + r.syncer.Close() r.stage = common.StageClosed return nil } diff --git a/dm/pkg/binlog/reader/util.go b/dm/pkg/binlog/reader/util.go index 702ba9d7a17..73d268012b7 100644 --- a/dm/pkg/binlog/reader/util.go +++ b/dm/pkg/binlog/reader/util.go @@ -27,9 +27,9 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog/event" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" + "github.com/pingcap/ticdc/dm/pkg/parser" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/common" ) // GetGTIDsForPosFromStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer. @@ -56,7 +56,7 @@ func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.P log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err2)) } - isDDL := common.CheckIsDDL(string(ev.Query), parser2) + isDDL := parser.CheckIsDDL(string(ev.Query), parser2) if isDDL { if latestGSet == nil { // GTID not enabled, can't get GTIDs for the position. diff --git a/dm/pkg/parser/common.go b/dm/pkg/parser/common.go index 7f32b718915..73a5420d6cc 100644 --- a/dm/pkg/parser/common.go +++ b/dm/pkg/parser/common.go @@ -335,3 +335,27 @@ func SplitDDL(stmt ast.StmtNode, schema string) (sqls []string, err error) { func genTableName(schema string, table string) *filter.Table { return &filter.Table{Schema: schema, Name: table} } + +// CheckIsDDL checks input SQL whether is a valid DDL statement. +func CheckIsDDL(sql string, p *parser.Parser) bool { + sql = utils.TrimCtrlChars(sql) + + if utils.IsBuildInSkipDDL(sql) { + return false + } + + // if parse error, treat it as not a DDL + stmts, err := Parse(p, sql, "", "") + if err != nil || len(stmts) == 0 { + return false + } + + stmt := stmts[0] + switch stmt.(type) { + case ast.DDLNode: + return true + default: + // other thing this like `BEGIN` + return false + } +} diff --git a/dm/pkg/parser/common_test.go b/dm/pkg/parser/common_test.go index 8450490ae38..9d9f272850b 100644 --- a/dm/pkg/parser/common_test.go +++ b/dm/pkg/parser/common_test.go @@ -417,3 +417,34 @@ func (t *testParserSuite) TestResolveDDL(c *C) { } } } + +func (t *testParserSuite) TestCheckIsDDL(c *C) { + var ( + cases = []struct { + sql string + isDDL bool + }{ + { + sql: "CREATE DATABASE test_is_ddl", + isDDL: true, + }, + { + sql: "BEGIN", + isDDL: false, + }, + { + sql: "INSERT INTO test_is_ddl.test_is_ddl_table VALUES (1)", + isDDL: false, + }, + { + sql: "INVAID SQL STATEMENT", + isDDL: false, + }, + } + parser2 = parser.New() + ) + + for _, cs := range cases { + c.Assert(CheckIsDDL(cs.sql, parser2), Equals, cs.isDDL) + } +} diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 2a98638532a..622ed0801a8 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -19,30 +19,27 @@ import ( "os" "sync" + "github.com/pingcap/errors" "go.uber.org/atomic" "go.uber.org/zap" - "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" ) // BinlogWriter is a binlog event writer which writes binlog events to a file. type BinlogWriter struct { - cfg *BinlogWriterConfig + mu sync.RWMutex - mu sync.RWMutex - stage common.Stage - offset atomic.Int64 - - file *os.File + offset atomic.Int64 + file *os.File + filename string logger log.Logger } // BinlogWriterStatus represents the status of a BinlogWriter. type BinlogWriterStatus struct { - Stage string `json:"stage"` Filename string `json:"filename"` Offset int64 `json:"offset"` } @@ -57,29 +54,15 @@ func (s *BinlogWriterStatus) String() string { return string(data) } -// BinlogWriterConfig is the configuration used by a BinlogWriter. -type BinlogWriterConfig struct { - Filename string -} - // NewBinlogWriter creates a BinlogWriter instance. -func NewBinlogWriter(logger log.Logger, cfg *BinlogWriterConfig) *BinlogWriter { +func NewBinlogWriter(logger log.Logger) *BinlogWriter { return &BinlogWriter{ - cfg: cfg, logger: logger, } } -// Start implements Writer.Start. -func (w *BinlogWriter) Start() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StageNew { - return terror.ErrBinlogWriterNotStateNew.Generate(w.stage, common.StageNew) - } - - f, err := os.OpenFile(w.cfg.Filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) +func (w *BinlogWriter) Open(filename string) error { + f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) if err != nil { return terror.ErrBinlogWriterOpenFile.Delegate(err) } @@ -92,42 +75,42 @@ func (w *BinlogWriter) Start() error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } + w.mu.Lock() + defer w.mu.Unlock() + w.offset.Store(fs.Size()) w.file = f - w.stage = common.StagePrepared + w.filename = filename + return nil } -// Close implements Writer.Close. func (w *BinlogWriter) Close() error { w.mu.Lock() defer w.mu.Unlock() - if w.stage != common.StagePrepared { - return terror.ErrBinlogWriterStateCannotClose.Generate(w.stage, common.StagePrepared) - } - var err error if w.file != nil { - err2 := w.flush() // try flush manually before close. + err2 := w.file.Sync() // try sync manually before close. if err2 != nil { w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2)) } err = w.file.Close() - w.file = nil } - w.stage = common.StageClosed + w.file = nil + w.offset.Store(0) + w.filename = "" + return err } -// Write implements Writer.Write. func (w *BinlogWriter) Write(rawData []byte) error { w.mu.RLock() defer w.mu.RUnlock() - if w.stage != common.StagePrepared { - return terror.ErrBinlogWriterNeedStart.Generate(w.stage, common.StagePrepared) + if w.file == nil { + return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) } n, err := w.file.Write(rawData) @@ -136,35 +119,16 @@ func (w *BinlogWriter) Write(rawData []byte) error { return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData)) } -// Flush implements Writer.Flush. -func (w *BinlogWriter) Flush() error { +func (w *BinlogWriter) Status() *BinlogWriterStatus { w.mu.RLock() defer w.mu.RUnlock() - if w.stage != common.StagePrepared { - return terror.ErrBinlogWriterNeedStart.Generate(w.stage, common.StagePrepared) - } - - return w.flush() -} - -// Status implements Writer.Status. -func (w *BinlogWriter) Status() interface{} { - w.mu.RLock() - stage := w.stage - w.mu.RUnlock() - return &BinlogWriterStatus{ - Stage: stage.String(), - Filename: w.cfg.Filename, + Filename: w.filename, Offset: w.offset.Load(), } } -// flush flushes the buffered data to the disk. -func (w *BinlogWriter) flush() error { - if w.file == nil { - return terror.ErrBinlogWriterFileNotOpened.Generate(w.cfg.Filename) - } - return terror.ErrBinlogWriterFileSync.Delegate(w.file.Sync()) +func (w *BinlogWriter) Offset() int64 { + return w.offset.Load() } diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 2d6bb0cbfd7..a8c5644fe10 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -15,22 +15,15 @@ package relay import ( "bytes" - "fmt" "os" "path/filepath" "strings" - "testing" . "github.com/pingcap/check" - "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/log" ) -func TestBinlogWriterSuite(t *testing.T) { - TestingT(t) -} - var _ = Suite(&testBinlogWriterSuite{}) type testBinlogWriterSuite struct{} @@ -39,83 +32,69 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { dir := c.MkDir() filename := filepath.Join(dir, "test-mysql-bin.000001") var ( - cfg = &BinlogWriterConfig{ - Filename: filename, - } allData bytes.Buffer + data1 = []byte("test-data") ) - w := NewBinlogWriter(log.L(), cfg) - c.Assert(w, NotNil) - - // check status, stageNew - status := w.Status() - fwStatus, ok := status.(*BinlogWriterStatus) - c.Assert(ok, IsTrue) - c.Assert(fwStatus.Stage, Equals, common.StageNew.String()) - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) - fwStatusStr := fwStatus.String() - c.Assert(strings.Contains(fwStatusStr, common.StageNew.String()), IsTrue) - - // not prepared - data1 := []byte("test-data") - err := w.Write(data1) - c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", common.StageNew)) - err = w.Flush() - c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", common.StageNew)) - - // start - err = w.Start() - c.Assert(err, IsNil) - - // check status, stagePrepared - status = w.Status() - fwStatus, ok = status.(*BinlogWriterStatus) - c.Assert(ok, IsTrue) - c.Assert(fwStatus.Stage, Equals, common.StagePrepared.String()) - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) - - // re-prepare is invalid - err = w.Start() - c.Assert(err, NotNil) - - // write the data - err = w.Write(data1) - c.Assert(err, IsNil) - allData.Write(data1) - - // write data again - data2 := []byte("another-data") - err = w.Write(data2) - c.Assert(err, IsNil) - allData.Write(data2) - - // test Flush interface method simply - err = w.Flush() - c.Assert(err, IsNil) - - // close the reader - c.Assert(w.Close(), IsNil) - - // check status, stageClosed - status = w.Status() - fwStatus, ok = status.(*BinlogWriterStatus) - c.Assert(ok, IsTrue) - c.Assert(fwStatus.Stage, Equals, common.StageClosed.String()) - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) - - // re-close is invalid - c.Assert(w.Close(), NotNil) - - // can not Writer/Flush anymore - c.Assert(w.Write(data2), NotNil) - c.Assert(w.Flush(), NotNil) - - // try to read the data back - dataInFile, err := os.ReadFile(filename) - c.Assert(err, IsNil) - c.Assert(dataInFile, DeepEquals, allData.Bytes()) + { + w := NewBinlogWriter(log.L()) + c.Assert(w, NotNil) + c.Assert(w.Open(filename), IsNil) + fwStatus := w.Status() + c.Assert(fwStatus.Filename, Equals, filename) + c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) + fwStatusStr := fwStatus.String() + c.Assert(strings.Contains(fwStatusStr, "filename"), IsTrue) + c.Assert(w.Close(), IsNil) + } + + { + // not opened + w := NewBinlogWriter(log.L()) + err := w.Write(data1) + c.Assert(err, ErrorMatches, "*not opened") + + // open non exist dir + err = w.Open(filepath.Join(dir, "not-exist", "bin.000001")) + c.Assert(err, ErrorMatches, "*no such file or directory") + } + + { + // normal call flow + w := NewBinlogWriter(log.L()) + err := w.Open(filename) + c.Assert(err, IsNil) + c.Assert(w.file, NotNil) + c.Assert(w.filename, Equals, filename) + c.Assert(w.offset.Load(), Equals, int64(0)) + + err = w.Write(data1) + c.Assert(err, IsNil) + allData.Write(data1) + + fwStatus := w.Status() + c.Assert(fwStatus.Filename, Equals, filename) + c.Assert(fwStatus.Offset, Equals, int64(len(data1))) + + // write data again + data2 := []byte("another-data") + err = w.Write(data2) + c.Assert(err, IsNil) + allData.Write(data2) + + c.Assert(w.offset.Load(), Equals, int64(allData.Len())) + + err = w.Close() + c.Assert(err, IsNil) + c.Assert(w.file, IsNil) + c.Assert(w.filename, Equals, "") + c.Assert(w.offset.Load(), Equals, int64(0)) + + c.Assert(w.Close(), IsNil) // noop + + // try to read the data back + dataInFile, err := os.ReadFile(filename) + c.Assert(err, IsNil) + c.Assert(dataInFile, DeepEquals, allData.Bytes()) + } } diff --git a/dm/relay/common/util.go b/dm/relay/common/util.go deleted file mode 100644 index d75c6fc1ac1..00000000000 --- a/dm/relay/common/util.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/parser/ast" - - parserpkg "github.com/pingcap/ticdc/dm/pkg/parser" - "github.com/pingcap/ticdc/dm/pkg/utils" -) - -// CheckIsDDL checks input SQL whether is a valid DDL statement. -func CheckIsDDL(sql string, p *parser.Parser) bool { - sql = utils.TrimCtrlChars(sql) - - if utils.IsBuildInSkipDDL(sql) { - return false - } - - // if parse error, treat it as not a DDL - stmts, err := parserpkg.Parse(p, sql, "", "") - if err != nil || len(stmts) == 0 { - return false - } - - stmt := stmts[0] - switch stmt.(type) { - case ast.DDLNode: - return true - default: - // other thing this like `BEGIN` - return false - } -} diff --git a/dm/relay/common/util_test.go b/dm/relay/common/util_test.go deleted file mode 100644 index d0e23a211b7..00000000000 --- a/dm/relay/common/util_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package common - -import ( - "testing" - - "github.com/pingcap/check" - "github.com/pingcap/tidb/parser" -) - -func TestSuite(t *testing.T) { - check.TestingT(t) -} - -var _ = check.Suite(&testUtilSuite{}) - -type testUtilSuite struct{} - -func (t *testUtilSuite) TestCheckIsDDL(c *check.C) { - var ( - cases = []struct { - sql string - isDDL bool - }{ - { - sql: "CREATE DATABASE test_is_ddl", - isDDL: true, - }, - { - sql: "BEGIN", - isDDL: false, - }, - { - sql: "INSERT INTO test_is_ddl.test_is_ddl_table VALUES (1)", - isDDL: false, - }, - { - sql: "INVAID SQL STATEMENT", - isDDL: false, - }, - } - parser2 = parser.New() - ) - - for _, cs := range cases { - c.Assert(CheckIsDDL(cs.sql, parser2), check.Equals, cs.isDDL) - } -} diff --git a/dm/relay/file_util.go b/dm/relay/file_util.go index a1190b99d30..526262f2350 100644 --- a/dm/relay/file_util.go +++ b/dm/relay/file_util.go @@ -29,8 +29,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog/event" "github.com/pingcap/ticdc/dm/pkg/binlog/reader" "github.com/pingcap/ticdc/dm/pkg/gtid" + parserpkg "github.com/pingcap/ticdc/dm/pkg/parser" "github.com/pingcap/ticdc/dm/pkg/terror" - "github.com/pingcap/ticdc/dm/relay/common" ) // checkBinlogHeaderExist checks if the file has a binlog file header. @@ -197,7 +197,7 @@ func getTxnPosGTIDs(ctx context.Context, filename string, p *parser.Parser) (int case *replication.FormatDescriptionEvent: latestPos = int64(e.Header.LogPos) case *replication.QueryEvent: - isDDL := common.CheckIsDDL(string(ev.Query), p) + isDDL := parserpkg.CheckIsDDL(string(ev.Query), p) if isDDL { if latestGSet != nil { // GTID may not be enabled in the binlog err = latestGSet.Update(nextGTIDStr) diff --git a/dm/relay/local_reader.go b/dm/relay/local_reader.go index ad05656e4b5..49f51d29e0a 100644 --- a/dm/relay/local_reader.go +++ b/dm/relay/local_reader.go @@ -71,11 +71,13 @@ type BinlogReader struct { usingGTID bool prevGset, currGset mysql.GTIDSet - notifier EventNotifier + // ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times + notifyCh chan interface{} + relay Process } -// NewBinlogReader creates a new BinlogReader. -func NewBinlogReader(notifier EventNotifier, logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { +// newBinlogReader creates a new BinlogReader. +func newBinlogReader(logger log.Logger, cfg *BinlogReaderConfig, relay Process) *BinlogReader { ctx, cancel := context.WithCancel(context.Background()) // only can be canceled in `Close` parser := replication.NewBinlogParser() parser.SetVerifyChecksum(true) @@ -87,14 +89,17 @@ func NewBinlogReader(notifier EventNotifier, logger log.Logger, cfg *BinlogReade newtctx := tcontext.NewContext(ctx, logger.WithFields(zap.String("component", "binlog reader"))) - return &BinlogReader{ + binlogReader := &BinlogReader{ cfg: cfg, parser: parser, indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename), cancel: cancel, tctx: newtctx, - notifier: notifier, + notifyCh: make(chan interface{}, 1), + relay: relay, } + binlogReader.relay.RegisterListener(binlogReader) + return binlogReader } // checkRelayPos will check whether the given relay pos is too big. @@ -588,7 +593,7 @@ func (r *BinlogReader) parseFile( go func(latestPos int64) { defer wg.Done() checker := relayLogFileChecker{ - notifier: r.notifier, + notifier: r, relayDir: r.cfg.RelayDir, currentUUID: currentUUID, latestRelayLogDir: relayLogDir, @@ -641,6 +646,7 @@ func (r *BinlogReader) Close() { r.cancel() r.parser.Stop() r.wg.Wait() + r.relay.UnRegisterListener(r) r.tctx.L().Info("binlog reader closed") } @@ -686,3 +692,15 @@ func (r *BinlogReader) advanceCurrentGtidSet(gtid string) (bool, error) { } return false, err } + +func (r *BinlogReader) Notified() chan interface{} { + return r.notifyCh +} + +func (r *BinlogReader) OnEvent(_ *replication.BinlogEvent) { + // skip if there's pending notify + select { + case r.notifyCh <- struct{}{}: + default: + } +} diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index f77d96b02b4..f57456a72f8 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -25,7 +25,6 @@ import ( "strconv" "strings" "sync" - "testing" "time" "github.com/BurntSushi/toml" @@ -53,10 +52,6 @@ type testReaderSuite struct { lastGTID gtid.Set } -func TestReader(t *testing.T) { - TestingT(t) -} - func (t *testReaderSuite) SetUpSuite(c *C) { var err error t.lastPos = 0 @@ -69,6 +64,15 @@ func (t *testReaderSuite) TearDownSuite(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval"), IsNil) } +func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify bool) *BinlogReader { + relay := NewRealRelay(&Config{Flavor: gmysql.MySQLFlavor}) + reader := newBinlogReader(logger, cfg, relay) + if notify { + reader.notifyCh <- struct{}{} + } + return reader +} + func (t *testReaderSuite) TestParseFileBase(c *C) { var ( filename = "test-mysql-bin.000001" @@ -86,7 +90,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { currentUUID := "invalid-current-uuid" relayDir := filepath.Join(baseDir, currentUUID) cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r := newBinlogReaderForTest(log.L(), cfg, true) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*invalid-current-uuid.*") @@ -102,7 +106,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { relayDir = filepath.Join(baseDir, currentUUID) fullPath := filepath.Join(relayDir, filename) cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) // relay log file not exists, failed needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( @@ -272,7 +276,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { nextFullPath = filepath.Join(nextRelayDir, nextFilename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) ) // create the current relay log file and meta @@ -343,7 +347,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { fullPath = filepath.Join(relayDir, filename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) ) // create the current relay log file and write some events @@ -379,7 +383,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // meet `err EOF` error (when parsing binlog event) ignored ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() - r.notifier = newDummyEventNotifier(1) + r.notifyCh <- struct{}{} // notify again needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) @@ -393,10 +397,9 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { func (t *testReaderSuite) TestUpdateUUIDs(c *C) { var ( - en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(en, log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) ) c.Assert(r.uuids, HasLen, 0) @@ -421,7 +424,6 @@ func (t *testReaderSuite) TestUpdateUUIDs(c *C) { func (t *testReaderSuite) TestStartSyncByPos(c *C) { var ( - en = newDummyEventNotifier(1) filenamePrefix = "test-mysql-bin.00000" notUsedGTIDSetStr = t.lastGTID.String() baseDir = c.MkDir() @@ -433,7 +435,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { "b60868af-5a6f-11e9-9ea3-0242ac160008.000003", } cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(en, log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, false) startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -565,7 +567,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { baseDir = c.MkDir() events []*replication.BinlogEvent cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(newDummyEventNotifier(0), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, false) lastPos uint32 lastGTID gtid.Set previousGset, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "") @@ -756,7 +758,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { } r.Close() - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) excludeStrs := []string{} // exclude first uuid @@ -800,12 +802,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil) r.Close() - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) _, err = r.StartSyncByGTID(excludeGset) // error because file has been purge c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue) @@ -814,12 +816,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil) r.Close() - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) _, err = r.StartSyncByGTID(excludeGset) // error because subdir has been purge c.Assert(err, ErrorMatches, ".*no such file or directory.*") @@ -836,7 +838,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) - r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) + r := newBinlogReaderForTest(log.L(), cfg, true) err := r.checkRelayPos(startPos) c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") @@ -855,7 +857,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) // write UUIDs into index file - r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) // create a new reader + r = newBinlogReaderForTest(log.L(), cfg, true) // create a new reader uuidBytes := t.uuidListToBytes(c, UUIDs) err = os.WriteFile(r.indexPath, uuidBytes, 0o600) c.Assert(err, IsNil) @@ -898,10 +900,9 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { var ( - en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(en, log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) mysqlGset, _ = gmysql.ParseMysqlGTIDSet("b60868af-5a6f-11e9-9ea3-0242ac160006:1-6") mariadbGset, _ = gmysql.ParseMariadbGTIDSet("0-1-5") ) @@ -933,10 +934,9 @@ func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { func (t *testReaderSuite) TestReParseUsingGTID(c *C) { var ( - en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(en, log.L(), cfg) + r = newBinlogReaderForTest(log.L(), cfg, true) uuid = "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001" gtidStr = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" file = "mysql.000001" @@ -1019,8 +1019,9 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) { time.Sleep(time.Second) _, err = f.Write(events[i].RawData) c.Assert(err, IsNil) + _ = f.Sync() select { - case en.Notified() <- struct{}{}: + case r.notifyCh <- struct{}{}: default: } } diff --git a/dm/relay/purger_test.go b/dm/relay/purger_test.go index 0919077c60f..318226ca31a 100644 --- a/dm/relay/purger_test.go +++ b/dm/relay/purger_test.go @@ -19,7 +19,6 @@ import ( "os" "path/filepath" "strings" - "testing" "time" . "github.com/pingcap/check" @@ -49,10 +48,6 @@ var _ = Suite(&testPurgerSuite{ }, }) -func TestPurgerSuite(t *testing.T) { - TestingT(t) -} - type testPurgerSuite struct { uuids []string relayFiles [][]string diff --git a/dm/relay/relay.go b/dm/relay/relay.go index e0322a4ef41..f563ad9d16e 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -41,10 +41,10 @@ import ( "github.com/pingcap/ticdc/dm/pkg/conn" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" + parserpkg "github.com/pingcap/ticdc/dm/pkg/parser" pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/transformer" ) // used to fill RelayLogInfo. @@ -105,6 +105,8 @@ type Process interface { RegisterListener(el Listener) // UnRegisterListener unregisters a relay listener UnRegisterListener(el Listener) + // NewReader creates a new relay reader + NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader } // Relay relays mysql binlog to local file. @@ -123,17 +125,21 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } + + writer Writer listeners map[Listener]struct{} // make it a set to make it easier to remove listener } // NewRealRelay creates an instance of Relay. func NewRealRelay(cfg *Config) Process { - return &Relay{ + r := &Relay{ cfg: cfg, meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), logger: log.With(zap.String("component", "relay log")), listeners: make(map[Listener]struct{}), } + r.writer = NewFileWriter(r.logger) + return r } // Init implements the dm.Unit interface. @@ -281,12 +287,11 @@ func (r *Relay) process(ctx context.Context) error { } }() - writer2, err := r.setUpWriter(parser2) - if err != nil { - return err - } + uuid, pos := r.meta.Pos() + r.writer.Init(r.meta.Dir(), pos.Name) + r.logger.Info("started underlying writer", zap.String("UUID", uuid), zap.String("filename", pos.Name)) defer func() { - err = writer2.Close() + err = r.writer.Close() if err != nil { r.logger.Error("fail to close binlog event writer", zap.Error(err)) } @@ -297,14 +302,12 @@ func (r *Relay) process(ctx context.Context) error { return err } - transformer2 := transformer.NewTransformer(parser2) - go r.doIntervalOps(ctx) // handles binlog events with retry mechanism. // it only do the retry for some binlog reader error now. for { - eventIdx, err := r.handleEvents(ctx, reader2, transformer2, writer2) + eventIdx, err := r.handleEvents(ctx, reader2, parser2) checkError: if err == nil { return nil @@ -338,7 +341,7 @@ func (r *Relay) process(ctx context.Context) error { err = err2 goto checkError } - tResult := transformer2.Transform(res.Event) + tResult := r.preprocessEvent(res.Event, parser2) // do not count skip event if !tResult.Ignore { i++ @@ -392,25 +395,11 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser } // setup a special writer to do the recovering - cfg := &FileConfig{ - RelayDir: r.meta.Dir(), - Filename: latestPos.Name, - } - writer2 := NewFileWriter(r.logger, cfg, parser2) - err := writer2.Start() - if err != nil { - return terror.Annotatef(err, "start recover writer for UUID %s with config %+v", uuid, cfg) - } - defer func() { - err2 := writer2.Close() - if err2 != nil { - r.logger.Error("fail to close recover writer", zap.String("UUID", uuid), zap.Reflect("config", cfg), log.ShortError(err2)) - } - }() - r.logger.Info("started recover writer", zap.String("UUID", uuid), zap.Reflect("config", cfg)) + binlogDir := r.meta.Dir() + r.logger.Info("started recover", zap.String("binlog dir", binlogDir), zap.String("filename", latestPos.Name)) // NOTE: recover a relay log file with too many binlog events may take a little long time. - result, err := writer2.Recover(ctx) + result, err := r.doRecovering(ctx, r.meta.Dir(), latestPos.Name, parser2) if err == nil { relayLogHasMore := result.LatestPos.Compare(latestPos) > 0 || (result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID)) @@ -448,7 +437,136 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser } } } - return terror.Annotatef(err, "recover for UUID %s with config %+v", uuid, cfg) + return terror.Annotatef(err, "recover for UUID %s with relay dir %s, filename %s", uuid, binlogDir, latestPos.Name) +} + +// recoverResult represents a result for a binlog recover operation. +type recoverResult struct { + // if truncate trailing incomplete events during recovering in relay log + Truncated bool + // the latest binlog position after recover operation has done. + LatestPos mysql.Position + // the latest binlog GTID set after recover operation has done. + LatestGTIDs gtid.Set +} + +// doRecovering tries to recover the current binlog file. +// 1. read events from the file +// 2. +// a. update the position with the event's position if the transaction finished +// b. update the GTID set with the event's GTID if the transaction finished +// 3. truncate any incomplete events/transactions +// now, we think a transaction finished if we received a XIDEvent or DDL in QueryEvent +// NOTE: handle cases when file size > 4GB. +func (r *Relay) doRecovering(ctx context.Context, binlogDir, filename string, parser *parser.Parser) (recoverResult, error) { + fullName := filepath.Join(binlogDir, filename) + fs, err := os.Stat(fullName) + if (err != nil && os.IsNotExist(err)) || (err == nil && len(filename) == 0) { + return recoverResult{}, nil // no file need to recover + } else if err != nil { + return recoverResult{}, terror.ErrRelayWriterGetFileStat.Delegate(err, fullName) + } + + // get latest pos/GTID set for all completed transactions from the file + latestPos, latestGTIDs, err := getTxnPosGTIDs(ctx, fullName, parser) + if err != nil { + return recoverResult{}, terror.Annotatef(err, "get latest pos/GTID set from %s", fullName) + } + + // mock file truncated by recover + failpoint.Inject("MockRecoverRelayWriter", func() { + r.logger.Info("mock recover relay writer") + failpoint.Goto("bypass") + }) + + // in most cases, we think the file is fine, so compare the size is simpler. + if fs.Size() == latestPos { + return recoverResult{ + Truncated: false, + LatestPos: mysql.Position{Name: filename, Pos: uint32(latestPos)}, + LatestGTIDs: latestGTIDs, + }, nil + } else if fs.Size() < latestPos { + return recoverResult{}, terror.ErrRelayWriterLatestPosGTFileSize.Generate(latestPos, fs.Size()) + } + + failpoint.Label("bypass") + + // truncate the file + f, err := os.OpenFile(fullName, os.O_WRONLY, 0o644) + if err != nil { + return recoverResult{}, terror.Annotatef(terror.ErrRelayWriterFileOperate.New(err.Error()), "open %s", fullName) + } + defer f.Close() + err = f.Truncate(latestPos) + if err != nil { + return recoverResult{}, terror.Annotatef(terror.ErrRelayWriterFileOperate.New(err.Error()), "truncate %s to %d", fullName, latestPos) + } + + return recoverResult{ + Truncated: true, + LatestPos: mysql.Position{Name: filename, Pos: uint32(latestPos)}, + LatestGTIDs: latestGTIDs, + }, nil +} + +const ( + ignoreReasonHeartbeat = "heartbeat event" + ignoreReasonArtificialFlag = "artificial flag (0x0020) set" +) + +type preprocessResult struct { + Ignore bool // whether the event should be ignored + IgnoreReason string // why the transformer ignore the event + LogPos uint32 // binlog event's End_log_pos or Position in RotateEvent + NextLogName string // next binlog filename, only valid for RotateEvent + GTIDSet mysql.GTIDSet // GTIDSet got from QueryEvent and XIDEvent when RawModeEnabled not true + CanSaveGTID bool // whether can save GTID into meta, true for DDL query and XIDEvent +} + +func (r *Relay) preprocessEvent(e *replication.BinlogEvent, parser2 *parser.Parser) preprocessResult { + result := preprocessResult{ + LogPos: e.Header.LogPos, + } + + switch ev := e.Event.(type) { + case *replication.PreviousGTIDsEvent: + result.CanSaveGTID = true + case *replication.MariadbGTIDListEvent: + result.CanSaveGTID = true + case *replication.RotateEvent: + // NOTE: we need to get the first binlog filename from fake RotateEvent when using auto position + result.LogPos = uint32(ev.Position) // next event's position + result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name + case *replication.QueryEvent: + // when RawModeEnabled not true, QueryEvent will be parsed. + if parserpkg.CheckIsDDL(string(ev.Query), parser2) { + // we only update/save GTID for DDL/XID event + // if the query is something like `BEGIN`, we do not update/save GTID. + result.GTIDSet = ev.GSet + result.CanSaveGTID = true + } + case *replication.XIDEvent: + // when RawModeEnabled not true, XIDEvent will be parsed. + result.GTIDSet = ev.GSet + result.CanSaveGTID = true // need save GTID for XID + case *replication.GenericEvent: + // handle some un-parsed events + if e.Header.EventType == replication.HEARTBEAT_EVENT { + // ignore artificial heartbeat event + // ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html + result.Ignore = true + result.IgnoreReason = ignoreReasonHeartbeat + } + default: + if e.Header.Flags&replication.LOG_EVENT_ARTIFICIAL_F != 0 { + // ignore events with LOG_EVENT_ARTIFICIAL_F flag(0x0020) set + // ref: https://dev.mysql.com/doc/internals/en/binlog-event-flag.html + result.Ignore = true + result.IgnoreReason = ignoreReasonArtificialFlag + } + } + return result } // handleEvents handles binlog events, including: @@ -460,8 +578,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser func (r *Relay) handleEvents( ctx context.Context, reader2 Reader, - transformer2 transformer.Transformer, - writer2 Writer, + parser2 *parser.Parser, ) (int, error) { var ( _, lastPos = r.meta.Pos() @@ -526,7 +643,7 @@ func (r *Relay) handleEvents( // 2. transform events transformTimer := time.Now() - tResult := transformer2.Transform(e) + tResult := r.preprocessEvent(e, parser2) binlogTransformDurationHistogram.Observe(time.Since(transformTimer).Seconds()) if len(tResult.NextLogName) > 0 && tResult.NextLogName > lastPos.Name { lastPos = mysql.Position{ @@ -570,7 +687,7 @@ func (r *Relay) handleEvents( // 3. save events into file writeTimer := time.Now() r.logger.Debug("writing binlog event", zap.Reflect("header", e.Header)) - wResult, err := writer2.WriteEvent(e) + wResult, err := r.writer.WriteEvent(e) if err != nil { relayLogWriteErrorCounter.Inc() return eventIndex, err @@ -843,22 +960,6 @@ func (r *Relay) setUpReader(ctx context.Context) (Reader, error) { return reader2, nil } -// setUpWriter setups the underlying writer used to writer binlog events into file or other places. -func (r *Relay) setUpWriter(parser2 *parser.Parser) (Writer, error) { - uuid, pos := r.meta.Pos() - cfg := &FileConfig{ - RelayDir: r.meta.Dir(), - Filename: pos.Name, - } - writer2 := NewFileWriter(r.logger, cfg, parser2) - if err := writer2.Start(); err != nil { - return nil, terror.Annotatef(err, "start writer for UUID %s with config %+v", uuid, cfg) - } - - r.logger.Info("started underlying writer", zap.String("UUID", uuid), zap.Reflect("config", cfg)) - return writer2, nil -} - func (r *Relay) masterNode() string { return fmt.Sprintf("%s:%d", r.cfg.From.Host, r.cfg.From.Port) } @@ -1115,6 +1216,10 @@ func (r *Relay) notify(e *replication.BinlogEvent) { } } +func (r *Relay) NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { + return newBinlogReader(logger, cfg, r) +} + // RegisterListener implements Process.RegisterListener. func (r *Relay) RegisterListener(el Listener) { r.Lock() diff --git a/dm/relay/relay_test.go b/dm/relay/relay_test.go index 86fe8df6647..060957a3928 100644 --- a/dm/relay/relay_test.go +++ b/dm/relay/relay_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/ticdc/dm/pkg/conn" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/transformer" ) var _ = Suite(&testRelaySuite{}) @@ -123,16 +122,11 @@ type mockWriter struct { latestEvent *replication.BinlogEvent } -func (w *mockWriter) Start() error { - return nil -} - func (w *mockWriter) Close() error { return nil } -func (w *mockWriter) Recover(ctx context.Context) (RecoverResult, error) { - return RecoverResult{}, nil +func (w *mockWriter) Init(relayDir, filename string) { } func (w *mockWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { @@ -404,11 +398,11 @@ func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, func (t *testRelaySuite) TestHandleEvent(c *C) { // NOTE: we can test metrics later. var ( - reader2 = &mockReader{} - transformer2 = transformer.NewTransformer(parser.New()) - writer2 = &mockWriter{} - relayCfg = newRelayCfg(c, gmysql.MariaDBFlavor) - r = NewRelay(relayCfg).(*Relay) + reader2 = &mockReader{} + parser2 = parser.New() + writer2 = &mockWriter{} + relayCfg = newRelayCfg(c, gmysql.MariaDBFlavor) + r = NewRelay(relayCfg).(*Relay) eventHeader = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -419,6 +413,9 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { fakeRotateEv, _ = event.GenRotateEvent(eventHeader, 0, []byte(binlogPos.Name), uint64(1234)) queryEv, _ = event.GenQueryEvent(eventHeader, 123, 0, 0, 0, nil, nil, []byte("CREATE DATABASE db_relay_test")) ) + + r.writer = writer2 + cfg := getDBConfigForTest() conn.InitMockDB(c) db, err := conn.DefaultDBProvider.Apply(cfg) @@ -440,7 +437,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { replication.ErrSyncClosed, replication.ErrNeedSyncAgain, } { - _, handleErr := r.handleEvents(context.Background(), reader2, transformer2, writer2) + _, handleErr := r.handleEvents(context.Background(), reader2, parser2) c.Assert(errors.Cause(handleErr), Equals, reader2.err) } @@ -450,7 +447,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // writer return error to force handleEvents return writer2.err = errors.New("writer error for testing") // return with the annotated writer error - _, err = r.handleEvents(context.Background(), reader2, transformer2, writer2) + _, err = r.handleEvents(context.Background(), reader2, parser2) c.Assert(errors.Cause(err), Equals, writer2.err) // after handle rotate event, we save and flush the meta immediately c.Assert(r.meta.Dirty(), Equals, false) @@ -469,7 +466,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { lm := r.meta.(*LocalMeta) backupUUID := lm.currentUUID lm.currentUUID = "not exist" - _, err = r.handleEvents(context.Background(), reader2, transformer2, writer2) + _, err = r.handleEvents(context.Background(), reader2, parser2) c.Assert(os.IsNotExist(errors.Cause(err)), Equals, true) lm.currentUUID = backupUUID } @@ -481,7 +478,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // writer return error writer2.err = errors.New("writer error for testing") // return with the annotated writer error - _, err = r.handleEvents(context.Background(), reader2, transformer2, writer2) + _, err = r.handleEvents(context.Background(), reader2, parser2) c.Assert(errors.Cause(err), Equals, writer2.err) c.Assert(r.meta.Dirty(), Equals, false) @@ -489,7 +486,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() writer2.err = nil - _, err = r.handleEvents(ctx, reader2, transformer2, writer2) // returned when ctx timeout + _, err = r.handleEvents(ctx, reader2, parser2) // returned when ctx timeout c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) @@ -504,7 +501,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { // write a QueryEvent with GTID sets reader2.result.Event = queryEv - _, err = r.handleEvents(ctx2, reader2, transformer2, writer2) + _, err = r.handleEvents(ctx2, reader2, parser2) c.Assert(errors.Cause(err), Equals, ctx.Err()) // check written event c.Assert(writer2.latestEvent, Equals, reader2.result.Event) @@ -523,7 +520,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { } ctx4, cancel4 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel4() - _, err = r.handleEvents(ctx4, reader2, transformer2, writer2) + _, err = r.handleEvents(ctx4, reader2, parser2) c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx4.Done(): @@ -536,7 +533,7 @@ func (t *testRelaySuite) TestHandleEvent(c *C) { writer2.result.Ignore = true ctx5, cancel5 := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel5() - _, err = r.handleEvents(ctx5, reader2, transformer2, writer2) + _, err = r.handleEvents(ctx5, reader2, parser2) c.Assert(errors.Cause(err), Equals, ctx.Err()) select { case <-ctx5.Done(): @@ -636,3 +633,311 @@ func (t *testRelaySuite) verifyMetadata(c *C, r *Relay, uuidExpected string, c.Assert(err, IsNil) c.Assert(UUIDs, DeepEquals, uuidsExpected) } + +func (t *testRelaySuite) TestPreprocessEvent(c *C) { + type Case struct { + event *replication.BinlogEvent + result preprocessResult + } + relay := &Relay{} + parser2 := parser.New() + var ( + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + Flags: 0x01, + } + latestPos uint32 = 456789 + gtidStr = "9f61c5f9-1eef-11e9-b6cf-0242ac140003:5" + gtidSet, _ = gtid.ParserGTID(gmysql.MySQLFlavor, gtidStr) + schema = []byte("test_schema") + cases = make([]Case, 0, 10) + ) + + // RotateEvent + nextLogName := "mysql-bin.000123" + position := uint64(4) + ev, err := event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, IsNil) + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // fake RotateEvent with zero timestamp + header.Timestamp = 0 + ev, err = event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, IsNil) + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + header.Timestamp = uint32(time.Now().Unix()) // set to non-zero + + // fake RotateEvent with zero logPos + fakeRotateHeader := *header + ev, err = event.GenRotateEvent(&fakeRotateHeader, latestPos, []byte(nextLogName), position) + c.Assert(err, IsNil) + ev.Header.LogPos = 0 // set to zero + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // QueryEvent for DDL + query := []byte("CREATE TABLE test_tbl (c1 INT)") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, IsNil) + ev.Event.(*replication.QueryEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // QueryEvent for non-DDL + query = []byte("BEGIN") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, IsNil) + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: ev.Header.LogPos, + }, + }) + + // XIDEvent + xid := uint64(135) + ev, err = event.GenXIDEvent(header, latestPos, xid) + c.Assert(err, IsNil) + ev.Event.(*replication.XIDEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // GenericEvent, non-HEARTBEAT_EVENT + ev = &replication.BinlogEvent{Header: header, Event: &replication.GenericEvent{}} + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: ev.Header.LogPos, + }, + }) + + // GenericEvent, HEARTBEAT_EVENT + genericHeader := *header + ev = &replication.BinlogEvent{Header: &genericHeader, Event: &replication.GenericEvent{}} + ev.Header.EventType = replication.HEARTBEAT_EVENT + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + Ignore: true, + IgnoreReason: ignoreReasonHeartbeat, + LogPos: ev.Header.LogPos, + }, + }) + + // other event type without LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(gmysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, IsNil) + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + LogPos: ev.Header.LogPos, + }, + }) + + // other event type with LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(gmysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, IsNil) + ev.Header.Flags |= replication.LOG_EVENT_ARTIFICIAL_F + cases = append(cases, Case{ + event: ev, + result: preprocessResult{ + Ignore: true, + IgnoreReason: ignoreReasonArtificialFlag, + LogPos: ev.Header.LogPos, + }, + }) + + for _, cs := range cases { + c.Assert(relay.preprocessEvent(cs.event, parser2), DeepEquals, cs.result) + } +} + +func (t *testRelaySuite) TestRecoverMySQL(c *C) { + var ( + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" + parser2 = parser.New() + flavor = gmysql.MySQLFlavor + previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" + latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" + latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" + ) + + r := NewRelay(&Config{Flavor: flavor}).(*Relay) + + // different SIDs in GTID set + previousGTIDSet, err := gtid.ParserGTID(flavor, previousGTIDSetStr) + c.Assert(err, IsNil) + latestGTID1, err := gtid.ParserGTID(flavor, latestGTIDStr1) + c.Assert(err, IsNil) + latestGTID2, err := gtid.ParserGTID(flavor, latestGTIDStr2) + c.Assert(err, IsNil) + + // generate binlog events + g, _, baseData := genBinlogEventsWithGTIDs(c, flavor, previousGTIDSet, latestGTID1, latestGTID2) + + // expected latest pos/GTID set + expectedPos := gmysql.Position{Name: filename, Pos: uint32(len(baseData))} + // 3 DDL + 10 DML + expectedGTIDsStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" + expectedGTIDs, err := gtid.ParserGTID(flavor, expectedGTIDsStr) + c.Assert(err, IsNil) + + // write the events to a file + fullName := filepath.Join(relayDir, filename) + err = os.WriteFile(fullName, baseData, 0o644) + c.Assert(err, IsNil) + + // try recover, but in fact do nothing + result, err := r.doRecovering(context.Background(), relayDir, filename, parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsFalse) + c.Assert(result.LatestPos, DeepEquals, expectedPos) + c.Assert(result.LatestGTIDs, DeepEquals, expectedGTIDs) + + // check file size, whether no recovering operation applied + fs, err := os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData))) + + // generate another transaction, DDL + extraEvents, extraData, err := g.GenDDLEvents("db2", "CREATE DATABASE db2") + c.Assert(err, IsNil) + c.Assert(extraEvents, HasLen, 2) // [GTID, Query] + + // write an incomplete event to the file + corruptData := extraEvents[0].RawData[:len(extraEvents[0].RawData)-2] + f, err := os.OpenFile(fullName, os.O_WRONLY|os.O_APPEND, 0o644) + c.Assert(err, IsNil) + _, err = f.Write(corruptData) + c.Assert(err, IsNil) + c.Assert(f.Close(), IsNil) + + // check file size, increased + fs, err = os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData)+len(corruptData))) + + // try recover, truncate the incomplete event + result, err = r.doRecovering(context.Background(), relayDir, filename, parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsTrue) + c.Assert(result.LatestPos, DeepEquals, expectedPos) + c.Assert(result.LatestGTIDs, DeepEquals, expectedGTIDs) + + // check file size, truncated + fs, err = os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData))) + + // write an incomplete transaction + f, err = os.OpenFile(fullName, os.O_WRONLY|os.O_APPEND, 0o644) + c.Assert(err, IsNil) + var extraLen int64 + for i := 0; i < len(extraEvents)-1; i++ { + _, err = f.Write(extraEvents[i].RawData) + c.Assert(err, IsNil) + extraLen += int64(len(extraEvents[i].RawData)) + } + c.Assert(f.Close(), IsNil) + + // check file size, increased + fs, err = os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData))+extraLen) + + // try recover, truncate the incomplete transaction + result, err = r.doRecovering(context.Background(), relayDir, filename, parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsTrue) + c.Assert(result.LatestPos, DeepEquals, expectedPos) + c.Assert(result.LatestGTIDs, DeepEquals, expectedGTIDs) + + // check file size, truncated + fs, err = os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData))) + + // write an completed transaction + f, err = os.OpenFile(fullName, os.O_WRONLY|os.O_APPEND, 0o644) + c.Assert(err, IsNil) + for i := 0; i < len(extraEvents); i++ { + _, err = f.Write(extraEvents[i].RawData) + c.Assert(err, IsNil) + } + c.Assert(f.Close(), IsNil) + + // check file size, increased + fs, err = os.Stat(fullName) + c.Assert(err, IsNil) + c.Assert(fs.Size(), Equals, int64(len(baseData)+len(extraData))) + + // try recover, no operation applied + expectedPos.Pos += uint32(len(extraData)) + // 4 DDL + 10 DML + expectedGTIDsStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-506,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" + expectedGTIDs, err = gtid.ParserGTID(flavor, expectedGTIDsStr) + c.Assert(err, IsNil) + result, err = r.doRecovering(context.Background(), relayDir, filename, parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsFalse) + c.Assert(result.LatestPos, DeepEquals, expectedPos) + c.Assert(result.LatestGTIDs, DeepEquals, expectedGTIDs) + + // compare file data + var allData bytes.Buffer + allData.Write(baseData) + allData.Write(extraData) + fileData, err := os.ReadFile(fullName) + c.Assert(err, IsNil) + c.Assert(fileData, DeepEquals, allData.Bytes()) +} + +func (t *testRelaySuite) TestRecoverMySQLNone(c *C) { + relayDir := c.MkDir() + parser2 := parser.New() + + r := NewRelay(&Config{Flavor: gmysql.MySQLFlavor}).(*Relay) + + // no file specified to recover + result, err := r.doRecovering(context.Background(), relayDir, "", parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsFalse) + + filename := "mysql-bin.000001" + + // file not exist, no need to recover + result, err = r.doRecovering(context.Background(), relayDir, filename, parser2) + c.Assert(err, IsNil) + c.Assert(result.Truncated, IsFalse) +} diff --git a/dm/relay/relay_writer.go b/dm/relay/relay_writer.go index 0cb0b52ac06..5d66063ae3a 100644 --- a/dm/relay/relay_writer.go +++ b/dm/relay/relay_writer.go @@ -14,23 +14,16 @@ package relay import ( - "context" - "os" "path/filepath" - "sync" "time" gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/parser" "go.uber.org/atomic" "go.uber.org/zap" "github.com/pingcap/ticdc/dm/pkg/binlog" - "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/binlog/event" - "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" @@ -47,16 +40,6 @@ type WResult struct { IgnoreReason string // why the writer ignore the event } -// RecoverResult represents a result for a binlog recover operation. -type RecoverResult struct { - // if truncate trailing incomplete events during recovering in relay log - Truncated bool - // the latest binlog position after recover operation has done. - LatestPos gmysql.Position - // the latest binlog GTID set after recover operation has done. - LatestGTIDs gtid.Set -} - // Writer writes binlog events into disk or any other memory structure. // The writer should support: // 1. write binlog events and report the operation result @@ -65,115 +48,49 @@ type RecoverResult struct { // 4. rotate binlog(relay) file if needed // 5. rollback/discard unfinished binlog entries(events or transactions) type Writer interface { - // Start prepares the writer for writing binlog events. - Start() error - + // Init inits the writer, should be called before any other method + Init(relayDir, filename string) // Close closes the writer and release the resource. Close() error - // Recover tries to recover the binlog file or any other memory structure associate with this writer. - // It is often used to recover a binlog file with some corrupt/incomplete binlog events/transactions at the end of the file. - // It is not safe for concurrent use by multiple goroutines. - // It should be called before writing to the file. - Recover(ctx context.Context) (RecoverResult, error) - // WriteEvent writes an binlog event's data into disk or any other places. // It is not safe for concurrent use by multiple goroutines. WriteEvent(ev *replication.BinlogEvent) (WResult, error) - - // Flush flushes the buffered data to a stable storage or sends through the network. - // It is not safe for concurrent use by multiple goroutines. - Flush() error -} - -// FileConfig is the configuration used by the FileWriter. -type FileConfig struct { - RelayDir string // directory to store relay log files. - Filename string // the startup relay log filename, if not set then a fake RotateEvent must be the first event. } // FileWriter implements Writer interface. type FileWriter struct { - cfg *FileConfig - - mu sync.Mutex - stage common.Stage - // underlying binlog writer, // it will be created/started until needed. out *BinlogWriter - // the parser often used to verify events's statement through parsing them. - parser *parser.Parser - + relayDir string // this dir contains the UUID filename atomic.String // current binlog filename logger log.Logger } // NewFileWriter creates a FileWriter instances. -func NewFileWriter(logger log.Logger, cfg *FileConfig, parser2 *parser.Parser) Writer { +func NewFileWriter(logger log.Logger) Writer { w := &FileWriter{ - cfg: cfg, - parser: parser2, logger: logger.WithFields(zap.String("sub component", "relay writer")), } - w.filename.Store(cfg.Filename) // set the startup filename + w.out = NewBinlogWriter(w.logger) return w } -// Start implements Writer.Start. -func (w *FileWriter) Start() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StageNew { - return terror.ErrRelayWriterNotStateNew.Generate(w.stage, common.StageNew) - } - w.stage = common.StagePrepared - - return nil +func (w *FileWriter) Init(relayDir, filename string) { + w.relayDir = relayDir + w.filename.Store(filename) } // Close implements Writer.Close. func (w *FileWriter) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StagePrepared { - return terror.ErrRelayWriterStateCannotClose.Generate(w.stage, common.StagePrepared) - } - - var err error - if w.out != nil { - err = w.out.Close() - } - - w.stage = common.StageClosed - return err -} - -// Recover implements Writer.Recover. -func (w *FileWriter) Recover(ctx context.Context) (RecoverResult, error) { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StagePrepared { - return RecoverResult{}, terror.ErrRelayWriterNeedStart.Generate(w.stage, common.StagePrepared) - } - - return w.doRecovering(ctx) + return w.out.Close() } // WriteEvent implements Writer.WriteEvent. func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StagePrepared { - return WResult{}, terror.ErrRelayWriterNeedStart.Generate(w.stage, common.StagePrepared) - } - switch ev.Event.(type) { case *replication.FormatDescriptionEvent: return w.handleFormatDescriptionEvent(ev) @@ -184,29 +101,10 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { } } -// Flush implements Writer.Flush. -func (w *FileWriter) Flush() error { - w.mu.Lock() - defer w.mu.Unlock() - - if w.stage != common.StagePrepared { - return terror.ErrRelayWriterNeedStart.Generate(w.stage, common.StagePrepared) - } - - if w.out != nil { - return w.out.Flush() - } - return terror.ErrRelayWriterNotOpened.Generate() -} - // offset returns the current offset of the binlog file. // it is only used for testing now. func (w *FileWriter) offset() int64 { - if w.out == nil { - return 0 - } - status := w.out.Status().(*BinlogWriterStatus) - return status.Offset + return w.out.Offset() } // handle FormatDescriptionEvent: @@ -216,12 +114,10 @@ func (w *FileWriter) offset() int64 { // 4. write the FormatDescriptionEvent if not exists one func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (WResult, error) { // close the previous binlog file - if w.out != nil { - w.logger.Info("closing previous underlying binlog writer", zap.Reflect("status", w.out.Status())) - err := w.out.Close() - if err != nil { - return WResult{}, terror.Annotate(err, "close previous underlying binlog writer") - } + w.logger.Info("closing previous underlying binlog writer", zap.Reflect("status", w.out.Status())) + err := w.out.Close() + if err != nil { + return WResult{}, terror.Annotate(err, "close previous underlying binlog writer") } // verify filename @@ -230,16 +126,11 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( } // open/create a new binlog file - filename := filepath.Join(w.cfg.RelayDir, w.filename.Load()) - outCfg := &BinlogWriterConfig{ - Filename: filename, - } - out := NewBinlogWriter(w.logger, outCfg) - err := out.Start() + filename := filepath.Join(w.relayDir, w.filename.Load()) + err = w.out.Open(filename) if err != nil { return WResult{}, terror.Annotatef(err, "start underlying binlog writer for %s", filename) } - w.out = out w.logger.Info("open underlying binlog writer", zap.Reflect("status", w.out.Status())) // write the binlog file header if not exists @@ -310,9 +201,6 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result WRes Ignore: true, IgnoreReason: ignoreReasonFakeRotate, }, nil - } else if w.out == nil { - // if not open a binlog file yet, then non-fake RotateEvent can't be handled - return result, terror.ErrRelayWriterRotateEvWithNoWriter.Generate(ev.Header) } result, err = w.handlePotentialHoleOrDuplicate(ev) @@ -324,7 +212,7 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result WRes err = w.out.Write(ev.RawData) if err != nil { - return result, terror.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.cfg.RelayDir, currFile)) + return result, terror.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.relayDir, currFile)) } return WResult{ @@ -385,11 +273,7 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) func (w *FileWriter) handleFileHoleExist(ev *replication.BinlogEvent) (bool, error) { // 1. detect whether a hole exists evStartPos := int64(ev.Header.LogPos - ev.Header.EventSize) - outFs, ok := w.out.Status().(*BinlogWriterStatus) - if !ok { - return false, terror.ErrRelayWriterStatusNotValid.Generate(w.out.Status()) - } - fileOffset := outFs.Offset + fileOffset := w.out.Offset() holeSize := evStartPos - fileOffset if holeSize <= 0 { // no hole exists, but duplicate events may exists, this should be handled in another place. @@ -418,7 +302,7 @@ func (w *FileWriter) handleFileHoleExist(ev *replication.BinlogEvent) (bool, err // handleDuplicateEventsExist tries to handle a potential duplicate event in the binlog file. func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (WResult, error) { - filename := filepath.Join(w.cfg.RelayDir, w.filename.Load()) + filename := filepath.Join(w.relayDir, w.filename.Load()) duplicate, err := checkIsDuplicateEvent(filename, ev) if err != nil { return WResult{}, terror.Annotatef(err, "check event %+v whether duplicate in %s", ev.Header, filename) @@ -436,63 +320,3 @@ func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (WR IgnoreReason: reason, }, nil } - -// doRecovering tries to recover the current binlog file. -// 1. read events from the file -// 2. -// a. update the position with the event's position if the transaction finished -// b. update the GTID set with the event's GTID if the transaction finished -// 3. truncate any incomplete events/transactions -// now, we think a transaction finished if we received a XIDEvent or DDL in QueryEvent -// NOTE: handle cases when file size > 4GB. -func (w *FileWriter) doRecovering(ctx context.Context) (RecoverResult, error) { - filename := filepath.Join(w.cfg.RelayDir, w.filename.Load()) - fs, err := os.Stat(filename) - if (err != nil && os.IsNotExist(err)) || (err == nil && len(w.filename.Load()) == 0) { - return RecoverResult{}, nil // no file need to recover - } else if err != nil { - return RecoverResult{}, terror.ErrRelayWriterGetFileStat.Delegate(err, filename) - } - - // get latest pos/GTID set for all completed transactions from the file - latestPos, latestGTIDs, err := getTxnPosGTIDs(ctx, filename, w.parser) - if err != nil { - return RecoverResult{}, terror.Annotatef(err, "get latest pos/GTID set from %s", filename) - } - - // mock file truncated by recover - failpoint.Inject("MockRecoverRelayWriter", func() { - w.logger.Info("mock recover relay writer") - failpoint.Goto("bypass") - }) - - // in most cases, we think the file is fine, so compare the size is simpler. - if fs.Size() == latestPos { - return RecoverResult{ - Truncated: false, - LatestPos: gmysql.Position{Name: w.filename.Load(), Pos: uint32(latestPos)}, - LatestGTIDs: latestGTIDs, - }, nil - } else if fs.Size() < latestPos { - return RecoverResult{}, terror.ErrRelayWriterLatestPosGTFileSize.Generate(latestPos, fs.Size()) - } - - failpoint.Label("bypass") - - // truncate the file - f, err := os.OpenFile(filename, os.O_WRONLY, 0o644) - if err != nil { - return RecoverResult{}, terror.Annotatef(terror.ErrRelayWriterFileOperate.New(err.Error()), "open %s", filename) - } - defer f.Close() - err = f.Truncate(latestPos) - if err != nil { - return RecoverResult{}, terror.Annotatef(terror.ErrRelayWriterFileOperate.New(err.Error()), "truncate %s to %d", filename, latestPos) - } - - return RecoverResult{ - Truncated: true, - LatestPos: gmysql.Position{Name: w.filename.Load(), Pos: uint32(latestPos)}, - LatestGTIDs: latestGTIDs, - }, nil -} diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index 3779afd420f..c6afcdaaa59 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -15,19 +15,14 @@ package relay import ( "bytes" - "context" - "fmt" "os" "path/filepath" - "testing" "time" gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/pingcap/check" - "github.com/pingcap/tidb/parser" - "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/binlog/event" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" @@ -35,25 +30,13 @@ import ( var _ = check.Suite(&testFileWriterSuite{}) -func TestFileWriterSuite(t *testing.T) { - check.TestingT(t) -} - -type testFileWriterSuite struct { - parser *parser.Parser -} - -func (t *testFileWriterSuite) SetUpSuite(c *check.C) { - t.parser = parser.New() -} +type testFileWriterSuite struct{} func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } - header = &replication.EventHeader{ + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" + header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, Flags: 0x01, @@ -62,47 +45,26 @@ func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { ev, _ = event.GenFormatDescriptionEvent(header, latestPos) ) - w := NewFileWriter(log.L(), cfg, t.parser) + w := NewFileWriter(log.L()) c.Assert(w, check.NotNil) // not prepared - _, err := w.Recover(context.Background()) - c.Assert(err, check.ErrorMatches, fmt.Sprintf(".*%s.*", common.StageNew)) - _, err = w.WriteEvent(ev) - c.Assert(err, check.ErrorMatches, fmt.Sprintf(".*%s.*", common.StageNew)) - err = w.Flush() - c.Assert(err, check.ErrorMatches, fmt.Sprintf(".*%s.*", common.StageNew)) - - // start writer - err = w.Start() - c.Assert(err, check.IsNil) - c.Assert(w.Start(), check.NotNil) // re-start is invalid - - // flush without opened underlying writer - err = w.Flush() - c.Assert(err, check.ErrorMatches, ".*no underlying writer opened.*") + _, err := w.WriteEvent(ev) + c.Assert(err, check.ErrorMatches, ".*not valid.*") - // recover - rres, err := w.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(rres.Truncated, check.IsFalse) + w.Init(relayDir, filename) // write event res, err := w.WriteEvent(ev) c.Assert(err, check.IsNil) c.Assert(res.Ignore, check.IsFalse) - // flush buffered data - c.Assert(w.Flush(), check.IsNil) - // close the writer c.Assert(w.Close(), check.IsNil) - c.Assert(w.Close(), check.ErrorMatches, fmt.Sprintf(".*%s.*", common.StageClosed)) // re-close is invalid } func (t *testFileWriterSuite) TestRelayDir(c *check.C) { var ( - cfg = &FileConfig{} header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, @@ -113,42 +75,38 @@ func (t *testFileWriterSuite) TestRelayDir(c *check.C) { ev, err := event.GenFormatDescriptionEvent(header, latestPos) c.Assert(err, check.IsNil) - // no dir specified - w1 := NewFileWriter(log.L(), cfg, t.parser) + // not inited + w1 := NewFileWriter(log.L()) defer w1.Close() - c.Assert(w1.Start(), check.IsNil) _, err = w1.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") // invalid dir - cfg.RelayDir = "invalid\x00path" - w2 := NewFileWriter(log.L(), cfg, t.parser) + w2 := NewFileWriter(log.L()) defer w2.Close() - c.Assert(w2.Start(), check.IsNil) + w2.Init("invalid\x00path", "bin.000001") _, err = w2.WriteEvent(ev) - c.Assert(err, check.ErrorMatches, ".*not valid.*") + c.Assert(err, check.ErrorMatches, ".*invalid argument.*") // valid directory, but no filename specified - cfg.RelayDir = c.MkDir() - w3 := NewFileWriter(log.L(), cfg, t.parser) + tmpRelayDir := c.MkDir() + w3 := NewFileWriter(log.L()) defer w3.Close() - c.Assert(w3.Start(), check.IsNil) + w3.Init(tmpRelayDir, "") _, err = w3.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") // valid directory, but invalid filename - cfg.Filename = "test-mysql-bin.666abc" - w4 := NewFileWriter(log.L(), cfg, t.parser) + w4 := NewFileWriter(log.L()) defer w4.Close() - c.Assert(w4.Start(), check.IsNil) + w4.Init(tmpRelayDir, "test-mysql-bin.666abc") _, err = w4.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") // valid directory, valid filename - cfg.Filename = "test-mysql-bin.000001" - w5 := NewFileWriter(log.L(), cfg, t.parser) + w5 := NewFileWriter(log.L()) defer w5.Close() - c.Assert(w5.Start(), check.IsNil) + w5.Init(tmpRelayDir, "test-mysql-bin.000001") result, err := w5.WriteEvent(ev) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) @@ -156,11 +114,9 @@ func (t *testFileWriterSuite) TestRelayDir(c *check.C) { func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } - header = &replication.EventHeader{ + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" + header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, Flags: 0x01, @@ -171,14 +127,14 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { c.Assert(err, check.IsNil) // write FormatDescriptionEvent to empty file - w := NewFileWriter(log.L(), cfg, t.parser) + w := NewFileWriter(log.L()) defer w.Close() - c.Assert(w.Start(), check.IsNil) + w.Init(relayDir, filename) result, err := w.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) fileSize := int64(len(replication.BinLogFileHeader) + len(formatDescEv.RawData)) - t.verifyFilenameOffset(c, w, cfg.Filename, fileSize) + t.verifyFilenameOffset(c, w, filename, fileSize) latestPos = formatDescEv.Header.LogPos // write FormatDescriptionEvent again, ignore @@ -186,7 +142,7 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsTrue) c.Assert(result.IgnoreReason, check.Equals, ignoreReasonAlreadyExists) - t.verifyFilenameOffset(c, w, cfg.Filename, fileSize) + t.verifyFilenameOffset(c, w, filename, fileSize) // write another event queryEv, err := event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, []byte("schema"), []byte("BEGIN")) @@ -195,14 +151,14 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) fileSize += int64(len(queryEv.RawData)) - t.verifyFilenameOffset(c, w, cfg.Filename, fileSize) + t.verifyFilenameOffset(c, w, filename, fileSize) // write FormatDescriptionEvent again, ignore result, err = w.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsTrue) c.Assert(result.IgnoreReason, check.Equals, ignoreReasonAlreadyExists) - t.verifyFilenameOffset(c, w, cfg.Filename, fileSize) + t.verifyFilenameOffset(c, w, filename, fileSize) // check events by reading them back events := make([]*replication.BinlogEvent, 0, 2) @@ -215,8 +171,8 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { events = append(events, e) return nil } - filename := filepath.Join(cfg.RelayDir, cfg.Filename) - err = replication.NewBinlogParser().ParseFile(filename, 0, onEventFunc) + fullName := filepath.Join(relayDir, filename) + err = replication.NewBinlogParser().ParseFile(fullName, 0, onEventFunc) c.Assert(err, check.IsNil) c.Assert(events, check.HasLen, 2) c.Assert(events[0], check.DeepEquals, formatDescEv) @@ -232,10 +188,8 @@ func (t *testFileWriterSuite) verifyFilenameOffset(c *check.C, w Writer, filenam func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check.C) { var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" nextFilename = "test-mysql-bin.000002" nextFilePos uint64 = 4 header = &replication.EventHeader{ @@ -270,17 +224,17 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check c.Assert(holeRotateEv, check.NotNil) // 1: non-fake RotateEvent before FormatDescriptionEvent, invalid - w1 := NewFileWriter(log.L(), cfg, t.parser) + w1 := NewFileWriter(log.L()) defer w1.Close() - c.Assert(w1.Start(), check.IsNil) + w1.Init(relayDir, filename) _, err = w1.WriteEvent(rotateEv) - c.Assert(err, check.ErrorMatches, ".*no binlog file opened.*") + c.Assert(err, check.ErrorMatches, ".*file not opened.*") // 2. fake RotateEvent before FormatDescriptionEvent - cfg.RelayDir = c.MkDir() // use a new relay directory - w2 := NewFileWriter(log.L(), cfg, t.parser) + relayDir = c.MkDir() // use a new relay directory + w2 := NewFileWriter(log.L()) defer w2.Close() - c.Assert(w2.Start(), check.IsNil) + w2.Init(relayDir, filename) result, err := w2.WriteEvent(fakeRotateEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsTrue) // ignore fake RotateEvent @@ -293,9 +247,9 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check fileSize := int64(len(replication.BinLogFileHeader) + len(formatDescEv.RawData)) t.verifyFilenameOffset(c, w2, nextFilename, fileSize) - // cfg.Filename should be empty, next file should contain only one FormatDescriptionEvent - filename1 := filepath.Join(cfg.RelayDir, cfg.Filename) - filename2 := filepath.Join(cfg.RelayDir, nextFilename) + // filename should be empty, next file should contain only one FormatDescriptionEvent + filename1 := filepath.Join(relayDir, filename) + filename2 := filepath.Join(relayDir, nextFilename) _, err = os.Stat(filename1) c.Assert(os.IsNotExist(err), check.IsTrue) data, err := os.ReadFile(filename2) @@ -305,10 +259,10 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check c.Assert(data[fileHeaderLen:], check.DeepEquals, formatDescEv.RawData) // 3. FormatDescriptionEvent before fake RotateEvent - cfg.RelayDir = c.MkDir() // use a new relay directory - w3 := NewFileWriter(log.L(), cfg, t.parser) + relayDir = c.MkDir() // use a new relay directory + w3 := NewFileWriter(log.L()) defer w3.Close() - c.Assert(w3.Start(), check.IsNil) + w3.Init(relayDir, filename) result, err = w3.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result, check.NotNil) @@ -322,9 +276,9 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check t.verifyFilenameOffset(c, w3, nextFilename, fileSize) - // cfg.Filename should contain only one FormatDescriptionEvent, next file should be empty - filename1 = filepath.Join(cfg.RelayDir, cfg.Filename) - filename2 = filepath.Join(cfg.RelayDir, nextFilename) + // filename should contain only one FormatDescriptionEvent, next file should be empty + filename1 = filepath.Join(relayDir, filename) + filename2 = filepath.Join(relayDir, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) data, err = os.ReadFile(filename1) @@ -333,10 +287,10 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check c.Assert(data[fileHeaderLen:], check.DeepEquals, formatDescEv.RawData) // 4. FormatDescriptionEvent before non-fake RotateEvent - cfg.RelayDir = c.MkDir() // use a new relay directory - w4 := NewFileWriter(log.L(), cfg, t.parser) + relayDir = c.MkDir() // use a new relay directory + w4 := NewFileWriter(log.L()) defer w4.Close() - c.Assert(w4.Start(), check.IsNil) + w4.Init(relayDir, filename) result, err = w4.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result, check.NotNil) @@ -357,9 +311,9 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check _, err = w4.WriteEvent(rotateEv) c.Assert(err, check.ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") - // cfg.Filename should contain both one FormatDescriptionEvent and one RotateEvent, next file should be empty - filename1 = filepath.Join(cfg.RelayDir, cfg.Filename) - filename2 = filepath.Join(cfg.RelayDir, nextFilename) + // filename should contain both one FormatDescriptionEvent and one RotateEvent, next file should be empty + filename1 = filepath.Join(relayDir, filename) + filename2 = filepath.Join(relayDir, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) data, err = os.ReadFile(filename1) @@ -378,10 +332,8 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { latestGTIDStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestXID uint64 = 10 - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" ) previousGTIDSet, err := gtid.ParserGTID(flavor, previousGTIDSetStr) c.Assert(err, check.IsNil) @@ -424,30 +376,28 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { allData.Write(data) // write the events to the file - w := NewFileWriter(log.L(), cfg, t.parser) - c.Assert(w.Start(), check.IsNil) + w := NewFileWriter(log.L()) + w.Init(relayDir, filename) for _, ev := range allEvents { result, err2 := w.WriteEvent(ev) c.Assert(err2, check.IsNil) c.Assert(result.Ignore, check.IsFalse) // no event is ignored } - t.verifyFilenameOffset(c, w, cfg.Filename, int64(allData.Len())) + t.verifyFilenameOffset(c, w, filename, int64(allData.Len())) // read the data back from the file - filename := filepath.Join(cfg.RelayDir, cfg.Filename) - obtainData, err := os.ReadFile(filename) + fullName := filepath.Join(relayDir, filename) + obtainData, err := os.ReadFile(fullName) c.Assert(err, check.IsNil) c.Assert(obtainData, check.DeepEquals, allData.Bytes()) } func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } - header = &replication.EventHeader{ + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" + header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, } @@ -457,9 +407,9 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { c.Assert(err, check.IsNil) c.Assert(formatDescEv, check.NotNil) - w := NewFileWriter(log.L(), cfg, t.parser) + w := NewFileWriter(log.L()) defer w.Close() - c.Assert(w.Start(), check.IsNil) + w.Init(relayDir, filename) // write the FormatDescriptionEvent, no hole exists result, err := w.WriteEvent(formatDescEv) @@ -481,7 +431,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) fileSize := int64(queryEv.Header.LogPos) - t.verifyFilenameOffset(c, w, cfg.Filename, fileSize) + t.verifyFilenameOffset(c, w, filename, fileSize) // read events back from the file to check the dummy event events := make([]*replication.BinlogEvent, 0, 3) @@ -494,8 +444,8 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { events = append(events, e) return nil } - filename := filepath.Join(cfg.RelayDir, cfg.Filename) - err = replication.NewBinlogParser().ParseFile(filename, 0, onEventFunc) + fullName := filepath.Join(relayDir, filename) + err = replication.NewBinlogParser().ParseFile(fullName, 0, onEventFunc) c.Assert(err, check.IsNil) c.Assert(events, check.HasLen, 3) c.Assert(events[0], check.DeepEquals, formatDescEv) @@ -511,19 +461,17 @@ func (t *testFileWriterSuite) TestHandleDuplicateEventsExist(c *check.C) { // NOTE: not duplicate event already tested in other cases var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } - header = &replication.EventHeader{ + relayDir = c.MkDir() + filename = "test-mysql-bin.000001" + header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, } latestPos uint32 = 4 ) - w := NewFileWriter(log.L(), cfg, t.parser) + w := NewFileWriter(log.L()) defer w.Close() - c.Assert(w.Start(), check.IsNil) + w.Init(relayDir, filename) // write a FormatDescriptionEvent, not duplicate formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) @@ -553,173 +501,3 @@ func (t *testFileWriterSuite) TestHandleDuplicateEventsExist(c *check.C) { _, err = w.WriteEvent(queryEv) c.Assert(err, check.ErrorMatches, ".*handle a potential duplicate event.*") } - -func (t *testFileWriterSuite) TestRecoverMySQL(c *check.C) { - var ( - cfg = &FileConfig{ - RelayDir: c.MkDir(), - Filename: "test-mysql-bin.000001", - } - - flavor = gmysql.MySQLFlavor - previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" - latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" - latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" - ) - - w := NewFileWriter(log.L(), cfg, t.parser) - defer w.Close() - c.Assert(w.Start(), check.IsNil) - - // different SIDs in GTID set - previousGTIDSet, err := gtid.ParserGTID(flavor, previousGTIDSetStr) - c.Assert(err, check.IsNil) - latestGTID1, err := gtid.ParserGTID(flavor, latestGTIDStr1) - c.Assert(err, check.IsNil) - latestGTID2, err := gtid.ParserGTID(flavor, latestGTIDStr2) - c.Assert(err, check.IsNil) - - // generate binlog events - g, _, baseData := genBinlogEventsWithGTIDs(c, flavor, previousGTIDSet, latestGTID1, latestGTID2) - - // expected latest pos/GTID set - expectedPos := gmysql.Position{Name: cfg.Filename, Pos: uint32(len(baseData))} - // 3 DDL + 10 DML - expectedGTIDsStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" - expectedGTIDs, err := gtid.ParserGTID(flavor, expectedGTIDsStr) - c.Assert(err, check.IsNil) - - // write the events to a file - filename := filepath.Join(cfg.RelayDir, cfg.Filename) - err = os.WriteFile(filename, baseData, 0o644) - c.Assert(err, check.IsNil) - - // try recover, but in fact do nothing - result, err := w.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsFalse) - c.Assert(result.LatestPos, check.DeepEquals, expectedPos) - c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) - - // check file size, whether no recovering operation applied - fs, err := os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData))) - - // generate another transaction, DDL - extraEvents, extraData, err := g.GenDDLEvents("db2", "CREATE DATABASE db2") - c.Assert(err, check.IsNil) - c.Assert(extraEvents, check.HasLen, 2) // [GTID, Query] - - // write an incomplete event to the file - corruptData := extraEvents[0].RawData[:len(extraEvents[0].RawData)-2] - f, err := os.OpenFile(filename, os.O_WRONLY|os.O_APPEND, 0o644) - c.Assert(err, check.IsNil) - _, err = f.Write(corruptData) - c.Assert(err, check.IsNil) - c.Assert(f.Close(), check.IsNil) - - // check file size, increased - fs, err = os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData)+len(corruptData))) - - // try recover, truncate the incomplete event - result, err = w.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsTrue) - c.Assert(result.LatestPos, check.DeepEquals, expectedPos) - c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) - - // check file size, truncated - fs, err = os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData))) - - // write an incomplete transaction - f, err = os.OpenFile(filename, os.O_WRONLY|os.O_APPEND, 0o644) - c.Assert(err, check.IsNil) - var extraLen int64 - for i := 0; i < len(extraEvents)-1; i++ { - _, err = f.Write(extraEvents[i].RawData) - c.Assert(err, check.IsNil) - extraLen += int64(len(extraEvents[i].RawData)) - } - c.Assert(f.Close(), check.IsNil) - - // check file size, increased - fs, err = os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData))+extraLen) - - // try recover, truncate the incomplete transaction - result, err = w.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsTrue) - c.Assert(result.LatestPos, check.DeepEquals, expectedPos) - c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) - - // check file size, truncated - fs, err = os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData))) - - // write an completed transaction - f, err = os.OpenFile(filename, os.O_WRONLY|os.O_APPEND, 0o644) - c.Assert(err, check.IsNil) - for i := 0; i < len(extraEvents); i++ { - _, err = f.Write(extraEvents[i].RawData) - c.Assert(err, check.IsNil) - } - c.Assert(f.Close(), check.IsNil) - - // check file size, increased - fs, err = os.Stat(filename) - c.Assert(err, check.IsNil) - c.Assert(fs.Size(), check.Equals, int64(len(baseData)+len(extraData))) - - // try recover, no operation applied - expectedPos.Pos += uint32(len(extraData)) - // 4 DDL + 10 DML - expectedGTIDsStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-506,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456,686e1ab6-c47e-11e7-a42c-6c92bf46f384:234-567" - expectedGTIDs, err = gtid.ParserGTID(flavor, expectedGTIDsStr) - c.Assert(err, check.IsNil) - result, err = w.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsFalse) - c.Assert(result.LatestPos, check.DeepEquals, expectedPos) - c.Assert(result.LatestGTIDs, check.DeepEquals, expectedGTIDs) - - // compare file data - var allData bytes.Buffer - allData.Write(baseData) - allData.Write(extraData) - fileData, err := os.ReadFile(filename) - c.Assert(err, check.IsNil) - c.Assert(fileData, check.DeepEquals, allData.Bytes()) -} - -func (t *testFileWriterSuite) TestRecoverMySQLNone(c *check.C) { - cfg := &FileConfig{ - RelayDir: c.MkDir(), - } - - w1 := NewFileWriter(log.L(), cfg, t.parser) - defer w1.Close() - c.Assert(w1.Start(), check.IsNil) - - // no file specified to recover - result, err := w1.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsFalse) - - cfg.Filename = "mysql-bin.000001" - w2 := NewFileWriter(log.L(), cfg, t.parser) - defer w2.Close() - c.Assert(w2.Start(), check.IsNil) - - // file not exist, no need to recover - result, err = w2.Recover(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(result.Truncated, check.IsFalse) -} diff --git a/dm/relay/remote_retry_test.go b/dm/relay/remote_retry_test.go index 20754274d09..a79b74fb48d 100644 --- a/dm/relay/remote_retry_test.go +++ b/dm/relay/remote_retry_test.go @@ -16,17 +16,12 @@ package relay import ( "context" "errors" - "testing" "time" gmysql "github.com/go-mysql-org/go-mysql/mysql" . "github.com/pingcap/check" ) -func TestReaderRetrySuite(t *testing.T) { - TestingT(t) -} - var _ = Suite(&testReaderRetrySuite{}) type testReaderRetrySuite struct{} diff --git a/dm/relay/transformer/transformer.go b/dm/relay/transformer/transformer.go deleted file mode 100644 index a9aa0d3c090..00000000000 --- a/dm/relay/transformer/transformer.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package transformer - -import ( - "github.com/go-mysql-org/go-mysql/mysql" - "github.com/go-mysql-org/go-mysql/replication" - "github.com/pingcap/tidb/parser" - - "github.com/pingcap/ticdc/dm/relay/common" -) - -const ( - ignoreReasonHeartbeat = "heartbeat event" - ignoreReasonArtificialFlag = "artificial flag (0x0020) set" -) - -// Result represents a transform result. -type Result struct { - Ignore bool // whether the event should be ignored - IgnoreReason string // why the transformer ignore the event - LogPos uint32 // binlog event's End_log_pos or Position in RotateEvent - NextLogName string // next binlog filename, only valid for RotateEvent - GTIDSet mysql.GTIDSet // GTIDSet got from QueryEvent and XIDEvent when RawModeEnabled not true - CanSaveGTID bool // whether can save GTID into meta, true for DDL query and XIDEvent -} - -// Transformer receives binlog events from a reader and transforms them. -// The transformed binlog events should be send to one or more writers. -// The transformer should support: -// 1. extract binlog position, GTID info from the event. -// 2. decide the event whether needed by a downstream writer. -// - the downstream writer may also drop some events according to its strategy. -// NOTE: more features maybe moved from outer into Transformer later. -type Transformer interface { - // Transform transforms a binlog event. - Transform(e *replication.BinlogEvent) Result -} - -// transformer implements Transformer interface. -type transformer struct { - parser2 *parser.Parser // used to parse query statement -} - -// NewTransformer creates a Transformer instance. -func NewTransformer(parser2 *parser.Parser) Transformer { - return &transformer{ - parser2: parser2, - } -} - -// Transform implements Transformer.Transform. -func (t *transformer) Transform(e *replication.BinlogEvent) Result { - result := Result{ - LogPos: e.Header.LogPos, - } - - switch ev := e.Event.(type) { - case *replication.PreviousGTIDsEvent: - result.CanSaveGTID = true - case *replication.MariadbGTIDListEvent: - result.CanSaveGTID = true - case *replication.RotateEvent: - // NOTE: we need to get the first binlog filename from fake RotateEvent when using auto position - result.LogPos = uint32(ev.Position) // next event's position - result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name - case *replication.QueryEvent: - // when RawModeEnabled not true, QueryEvent will be parsed. - if common.CheckIsDDL(string(ev.Query), t.parser2) { - // we only update/save GTID for DDL/XID event - // if the query is something like `BEGIN`, we do not update/save GTID. - result.GTIDSet = ev.GSet - result.CanSaveGTID = true - } - case *replication.XIDEvent: - // when RawModeEnabled not true, XIDEvent will be parsed. - result.GTIDSet = ev.GSet - result.CanSaveGTID = true // need save GTID for XID - case *replication.GenericEvent: - // handle some un-parsed events - if e.Header.EventType == replication.HEARTBEAT_EVENT { - // ignore artificial heartbeat event - // ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html - result.Ignore = true - result.IgnoreReason = ignoreReasonHeartbeat - } - default: - if e.Header.Flags&replication.LOG_EVENT_ARTIFICIAL_F != 0 { - // ignore events with LOG_EVENT_ARTIFICIAL_F flag(0x0020) set - // ref: https://dev.mysql.com/doc/internals/en/binlog-event-flag.html - result.Ignore = true - result.IgnoreReason = ignoreReasonArtificialFlag - } - } - return result -} diff --git a/dm/relay/transformer/transformer_test.go b/dm/relay/transformer/transformer_test.go deleted file mode 100644 index cbf4fbfee8b..00000000000 --- a/dm/relay/transformer/transformer_test.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package transformer - -import ( - "testing" - "time" - - "github.com/go-mysql-org/go-mysql/mysql" - "github.com/go-mysql-org/go-mysql/replication" - "github.com/pingcap/check" - "github.com/pingcap/tidb/parser" - - "github.com/pingcap/ticdc/dm/pkg/binlog/event" - "github.com/pingcap/ticdc/dm/pkg/gtid" -) - -var _ = check.Suite(&testTransformerSuite{}) - -func TestSuite(t *testing.T) { - check.TestingT(t) -} - -type testTransformerSuite struct{} - -type Case struct { - event *replication.BinlogEvent - result Result -} - -func (t *testTransformerSuite) TestTransform(c *check.C) { - var ( - tran = NewTransformer(parser.New()) - header = &replication.EventHeader{ - Timestamp: uint32(time.Now().Unix()), - ServerID: 11, - Flags: 0x01, - } - latestPos uint32 = 456789 - gtidStr = "9f61c5f9-1eef-11e9-b6cf-0242ac140003:5" - gtidSet, _ = gtid.ParserGTID(mysql.MySQLFlavor, gtidStr) - schema = []byte("test_schema") - cases = make([]Case, 0, 10) - ) - - // RotateEvent - nextLogName := "mysql-bin.000123" - position := uint64(4) - ev, err := event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) - c.Assert(err, check.IsNil) - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: uint32(position), - NextLogName: nextLogName, - }, - }) - - // fake RotateEvent with zero timestamp - header.Timestamp = 0 - ev, err = event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) - c.Assert(err, check.IsNil) - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: uint32(position), - NextLogName: nextLogName, - }, - }) - header.Timestamp = uint32(time.Now().Unix()) // set to non-zero - - // fake RotateEvent with zero logPos - fakeRotateHeader := *header - ev, err = event.GenRotateEvent(&fakeRotateHeader, latestPos, []byte(nextLogName), position) - c.Assert(err, check.IsNil) - ev.Header.LogPos = 0 // set to zero - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: uint32(position), - NextLogName: nextLogName, - }, - }) - - // QueryEvent for DDL - query := []byte("CREATE TABLE test_tbl (c1 INT)") - ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) - c.Assert(err, check.IsNil) - ev.Event.(*replication.QueryEvent).GSet = gtidSet.Origin() // set GTIDs manually - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: ev.Header.LogPos, - GTIDSet: gtidSet.Origin(), - CanSaveGTID: true, - }, - }) - - // QueryEvent for non-DDL - query = []byte("BEGIN") - ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) - c.Assert(err, check.IsNil) - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: ev.Header.LogPos, - }, - }) - - // XIDEvent - xid := uint64(135) - ev, err = event.GenXIDEvent(header, latestPos, xid) - c.Assert(err, check.IsNil) - ev.Event.(*replication.XIDEvent).GSet = gtidSet.Origin() // set GTIDs manually - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: ev.Header.LogPos, - GTIDSet: gtidSet.Origin(), - CanSaveGTID: true, - }, - }) - - // GenericEvent, non-HEARTBEAT_EVENT - ev = &replication.BinlogEvent{Header: header, Event: &replication.GenericEvent{}} - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: ev.Header.LogPos, - }, - }) - - // GenericEvent, HEARTBEAT_EVENT - genericHeader := *header - ev = &replication.BinlogEvent{Header: &genericHeader, Event: &replication.GenericEvent{}} - ev.Header.EventType = replication.HEARTBEAT_EVENT - cases = append(cases, Case{ - event: ev, - result: Result{ - Ignore: true, - IgnoreReason: ignoreReasonHeartbeat, - LogPos: ev.Header.LogPos, - }, - }) - - // other event type without LOG_EVENT_ARTIFICIAL_F - ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) - c.Assert(err, check.IsNil) - cases = append(cases, Case{ - event: ev, - result: Result{ - LogPos: ev.Header.LogPos, - }, - }) - - // other event type with LOG_EVENT_ARTIFICIAL_F - ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) - c.Assert(err, check.IsNil) - ev.Header.Flags |= replication.LOG_EVENT_ARTIFICIAL_F - cases = append(cases, Case{ - event: ev, - result: Result{ - Ignore: true, - IgnoreReason: ignoreReasonArtificialFlag, - LogPos: ev.Header.LogPos, - }, - }) - - for _, cs := range cases { - c.Assert(tran.Transform(cs.event), check.DeepEquals, cs.result) - } -} diff --git a/dm/relay/upstream_reader_test.go b/dm/relay/upstream_reader_test.go index 81cd77232c7..3feb2392f18 100644 --- a/dm/relay/upstream_reader_test.go +++ b/dm/relay/upstream_reader_test.go @@ -15,7 +15,6 @@ package relay import ( "context" - "testing" "time" "github.com/go-mysql-org/go-mysql/replication" @@ -27,10 +26,6 @@ import ( var _ = check.Suite(&testRemoteReaderSuite{}) -func TestRemoteReaderSuite(t *testing.T) { - check.TestingT(t) -} - type testRemoteReaderSuite struct{} func (t *testRemoteReaderSuite) TestInterface(c *check.C) { diff --git a/dm/relay/util_test.go b/dm/relay/util_test.go index 834051054fd..d573a8845ac 100644 --- a/dm/relay/util_test.go +++ b/dm/relay/util_test.go @@ -18,11 +18,10 @@ import ( "fmt" "io" - "github.com/pingcap/errors" - "github.com/DATA-DOG/go-sqlmock" gmysql "github.com/go-mysql-org/go-mysql/mysql" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/ticdc/dm/pkg/conn" "github.com/pingcap/ticdc/dm/pkg/utils" diff --git a/dm/syncer/streamer_controller.go b/dm/syncer/streamer_controller.go index a4c3db51360..7912b4a057a 100644 --- a/dm/syncer/streamer_controller.go +++ b/dm/syncer/streamer_controller.go @@ -122,19 +122,20 @@ type StreamerController struct { // whether the server id is updated serverIDUpdated bool - notifier relay.EventNotifier + relay relay.Process } // NewStreamerController creates a new streamer controller. -func NewStreamerController(notifier relay.EventNotifier, +func NewStreamerController( syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, - binlogType BinlogType, localBinlogDir string, timezone *time.Location, + relay relay.Process, ) *StreamerController { var strategy retryStrategy = alwaysRetryStrategy{} + binlogType := toBinlogType(relay) if binlogType != LocalBinlog { strategy = &maxIntervalRetryStrategy{ interval: minErrorRetryInterval, @@ -156,7 +157,7 @@ func NewStreamerController(notifier relay.EventNotifier, timezone: timezone, fromDB: fromDB, closed: true, - notifier: notifier, + relay: relay, } return streamerController @@ -178,7 +179,7 @@ func (c *StreamerController) Start(tctx *tcontext.Context, location binlog.Locat err = c.updateServerIDAndResetReplication(tctx, location) } if err != nil { - c.close(tctx) + c.close() return err } @@ -200,10 +201,8 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.streamerProducer != nil { switch t := c.streamerProducer.(type) { case *remoteBinlogReader: - err2 := c.closeBinlogSyncer(tctx, t.reader) - if err2 != nil { - tctx.L().Warn("fail to close remote binlog reader", zap.Error(err)) - } + // unclosed conn bug already fixed in go-mysql, https://github.com/go-mysql-org/go-mysql/pull/411 + t.reader.Close() case *localBinlogReader: // check the uuid before close ctx, cancel := context.WithTimeout(tctx.Ctx, utils.DefaultDBTimeout) @@ -235,7 +234,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{relay.NewBinlogReader(c.notifier, tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamerProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) @@ -339,36 +338,14 @@ func (c *StreamerController) ReopenWithRetry(tctx *tcontext.Context, location bi return err } -func (c *StreamerController) closeBinlogSyncer(logtctx *tcontext.Context, binlogSyncer *replication.BinlogSyncer) error { - if binlogSyncer == nil { - return nil - } - - lastSlaveConnectionID := binlogSyncer.LastConnectionID() - defer binlogSyncer.Close() - if lastSlaveConnectionID > 0 { - // try to KILL the conn in default timeout, but it's not a big problem even failed. - ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout) - defer cancel() - err := c.fromDB.KillConn(ctx, lastSlaveConnectionID) - if err != nil { - logtctx.L().Error("fail to kill last connection", zap.Uint32("connection ID", lastSlaveConnectionID), log.ShortError(err)) - if !utils.IsNoSuchThreadError(err) { - return err - } - } - } - return nil -} - // Close closes streamer. func (c *StreamerController) Close(tctx *tcontext.Context) { c.Lock() - c.close(tctx) + c.close() c.Unlock() } -func (c *StreamerController) close(tctx *tcontext.Context) { +func (c *StreamerController) close() { if c.closed { return } @@ -377,10 +354,7 @@ func (c *StreamerController) close(tctx *tcontext.Context) { switch r := c.streamerProducer.(type) { case *remoteBinlogReader: // process remote binlog reader - err := c.closeBinlogSyncer(tctx, r.reader) - if err != nil { - tctx.L().Warn("fail to close remote binlog reader", zap.Error(err)) - } + r.reader.Close() case *localBinlogReader: // process local binlog reader r.reader.Close() diff --git a/dm/syncer/streamer_controller_test.go b/dm/syncer/streamer_controller_test.go index 2f1de17efe8..43292c0abf3 100644 --- a/dm/syncer/streamer_controller_test.go +++ b/dm/syncer/streamer_controller_test.go @@ -19,6 +19,8 @@ import ( "github.com/go-mysql-org/go-mysql/replication" . "github.com/pingcap/check" "github.com/pingcap/errors" + + "github.com/pingcap/ticdc/dm/relay" ) func (s *testSyncerSuite) TestIsConnectionRefusedError(c *C) { @@ -33,8 +35,8 @@ func (s *testSyncerSuite) TestIsConnectionRefusedError(c *C) { } func (s *testSyncerSuite) TestCanErrorRetry(c *C) { - controller := NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, - LocalBinlog, "", nil) + relay2 := &relay.Relay{} + controller := NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, "", nil, relay2) mockErr := errors.New("test") @@ -50,8 +52,7 @@ func (s *testSyncerSuite) TestCanErrorRetry(c *C) { }() // test with remote binlog - controller = NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, - RemoteBinlog, "", nil) + controller = NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, "", nil, nil) c.Assert(controller.CanRetry(mockErr), IsTrue) c.Assert(controller.CanRetry(mockErr), IsFalse) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 97873674df9..8c7f23e687a 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -133,7 +133,6 @@ type Syncer struct { binlogType BinlogType streamerController *StreamerController - enableRelay bool wg sync.WaitGroup // counts goroutines jobWg sync.WaitGroup // counts ddl/flush job in-flight in s.dmlJobCh and s.ddlJobCh @@ -225,11 +224,11 @@ type Syncer struct { workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time - notifier relay.EventNotifier + relay relay.Process } // NewSyncer creates a new Syncer. -func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier relay.EventNotifier) *Syncer { +func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay relay.Process) *Syncer { logger := log.With(zap.String("task", cfg.Name), zap.String("unit", "binlog replication")) syncer := &Syncer{ pessimist: shardddl.NewPessimist(&logger, etcdClient, cfg.Name, cfg.SourceID), @@ -248,12 +247,11 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier syncer.done = nil syncer.setTimezone() syncer.addJobFunc = syncer.addJob - syncer.enableRelay = cfg.UseRelay syncer.cli = etcdClient syncer.checkpoint = NewRemoteCheckPoint(syncer.tctx, cfg, syncer.checkpointID()) - syncer.binlogType = toBinlogType(cfg.UseRelay) + syncer.binlogType = toBinlogType(relay) syncer.errOperatorHolder = operator.NewHolder(&logger) syncer.readerHub = streamer.GetReaderHub() @@ -267,7 +265,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier syncer.workerJobTSArray[i] = atomic.NewInt64(0) } syncer.lastCheckpointFlushedTime = time.Time{} - syncer.notifier = notifier + syncer.relay = relay return syncer } @@ -324,7 +322,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.streamerController = NewStreamerController(s.notifier, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + s.streamerController = NewStreamerController(s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.cfg.RelayDir, s.timezone, s.relay) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) if err != nil { @@ -3321,7 +3319,7 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { return false, nil } // set enableGTID to false for new streamerController - streamerController := NewStreamerController(s.notifier, s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.cfg.RelayDir, s.timezone, s.relay) endPos := binlog.AdjustPosition(location.Position) startPos := mysql.Position{ diff --git a/dm/syncer/util.go b/dm/syncer/util.go index 4fc914437d3..5c751bd7b3a 100644 --- a/dm/syncer/util.go +++ b/dm/syncer/util.go @@ -26,10 +26,11 @@ import ( "github.com/pingcap/ticdc/dm/pkg/conn" tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/terror" + "github.com/pingcap/ticdc/dm/relay" ) -func toBinlogType(enableRelay bool) BinlogType { - if enableRelay { +func toBinlogType(relay relay.Process) BinlogType { + if relay != nil { return LocalBinlog } diff --git a/dm/syncer/util_test.go b/dm/syncer/util_test.go index b2f1dc12647..2a7204f66f3 100644 --- a/dm/syncer/util_test.go +++ b/dm/syncer/util_test.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" _ "github.com/pingcap/tidb/types/parser_driver" + + "github.com/pingcap/ticdc/dm/relay" ) var _ = Suite(&testUtilSuite{}) @@ -91,20 +93,20 @@ func (t *testUtilSuite) TestGetTableByDML(c *C) { func (t *testUtilSuite) TestToBinlogType(c *C) { testCases := []struct { - enableRelay bool - tp BinlogType + relay relay.Process + tp BinlogType }{ { - true, + &relay.Relay{}, LocalBinlog, }, { - false, + nil, RemoteBinlog, }, } for _, testCase := range testCases { - tp := toBinlogType(testCase.enableRelay) + tp := toBinlogType(testCase.relay) c.Assert(tp, Equals, testCase.tp) } }