diff --git a/cmd/dm-portal/main.go b/cmd/dm-portal/main.go index 44eee7b83a..44d3772e66 100644 --- a/cmd/dm-portal/main.go +++ b/cmd/dm-portal/main.go @@ -18,11 +18,12 @@ import ( "net/http" "os" + "github.com/rakyll/statik/fs" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/portal" _ "github.com/pingcap/dm/dm/portal/statik" "github.com/pingcap/dm/pkg/log" - "github.com/rakyll/statik/fs" - "go.uber.org/zap" ) func main() { diff --git a/cmd/dm-syncer/main.go b/cmd/dm-syncer/main.go index 5414e0d876..d678944054 100644 --- a/cmd/dm-syncer/main.go +++ b/cmd/dm-syncer/main.go @@ -74,7 +74,7 @@ func main() { log.L().Info("", zap.Stringer("dm-syncer conf", conf)) }) - sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer. + sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL for singleton syncer. sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) diff --git a/cmd/dm-worker/main.go b/cmd/dm-worker/main.go index 220c354b59..3085238931 100644 --- a/cmd/dm-worker/main.go +++ b/cmd/dm-worker/main.go @@ -25,12 +25,13 @@ import ( globalLog "github.com/pingcap/log" "go.uber.org/zap" + lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/dm/dm/ctl/common" "github.com/pingcap/dm/dm/worker" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" - lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log" ) func main() { diff --git a/dm/pbmock/dmmaster.go b/dm/pbmock/dmmaster.go index 39f9123c3f..8b3db82173 100644 --- a/dm/pbmock/dmmaster.go +++ b/dm/pbmock/dmmaster.go @@ -9,8 +9,9 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - pb "github.com/pingcap/dm/dm/pb" grpc "google.golang.org/grpc" + + pb "github.com/pingcap/dm/dm/pb" ) // MockMasterClient is a mock of MasterClient interface. diff --git a/dm/pbmock/dmworker.go b/dm/pbmock/dmworker.go index a7601b37b9..bd6e3710d1 100644 --- a/dm/pbmock/dmworker.go +++ b/dm/pbmock/dmworker.go @@ -9,8 +9,9 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - pb "github.com/pingcap/dm/dm/pb" grpc "google.golang.org/grpc" + + pb "github.com/pingcap/dm/dm/pb" ) // MockWorkerClient is a mock of WorkerClient interface. diff --git a/dm/worker/relay.go b/dm/worker/relay.go index c34e41f40e..342bf3d9e4 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -51,6 +51,10 @@ type RelayHolder interface { Result() *pb.ProcessResult // Update updates relay config online Update(ctx context.Context, cfg *config.SourceConfig) error + // RegisterListener registers a relay listener + RegisterListener(el relay.Listener) + // UnRegisterListener unregisters a relay listener + UnRegisterListener(el relay.Listener) } // NewRelayHolder is relay holder initializer @@ -307,6 +311,14 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo { return h.relay.ActiveRelayLog() } +func (h *realRelayHolder) RegisterListener(el relay.Listener) { + h.relay.RegisterListener(el) +} + +func (h *realRelayHolder) UnRegisterListener(el relay.Listener) { + h.relay.UnRegisterListener(el) +} + /******************** dummy relay holder ********************/ type dummyRelayHolder struct { @@ -424,3 +436,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage { defer d.Unlock() return d.stage } + +func (d *dummyRelayHolder) RegisterListener(el relay.Listener) { +} + +func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) { +} diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index bd51d24ef4..fcabf03da9 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -47,6 +47,12 @@ type DummyRelay struct { reloadErr error } +func (d *DummyRelay) RegisterListener(el relay.Listener) { +} + +func (d *DummyRelay) UnRegisterListener(el relay.Listener) { +} + // NewDummyRelay creates an instance of dummy Relay. func NewDummyRelay(cfg *relay.Config) relay.Process { return &DummyRelay{} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 31d52cb80a..b696f8d083 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) @@ -120,7 +121,7 @@ func (t *testServer) TestServer(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/worker/source_worker.go b/dm/worker/source_worker.go index 854283885a..66ad1af6fb 100644 --- a/dm/worker/source_worker.go +++ b/dm/worker/source_worker.go @@ -353,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) { w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay) }() + w.relayHolder.RegisterListener(w.subTaskHolder) + w.relayEnabled.Store(true) w.l.Info("relay enabled") w.subTaskHolder.resetAllSubTasks(true) @@ -385,6 +387,7 @@ func (w *SourceWorker) DisableRelay() { if w.relayHolder != nil { r := w.relayHolder w.relayHolder = nil + r.UnRegisterListener(w.subTaskHolder) r.Close() } if w.relayPurger != nil { diff --git a/dm/worker/source_worker_test.go b/dm/worker/source_worker_test.go index 4d401a414b..e3977dc805 100644 --- a/dm/worker/source_worker_test.go +++ b/dm/worker/source_worker_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/dm/pkg/conn" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" ) @@ -241,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{}) func (t *testWorkerFunctionalities) SetUpSuite(c *C) { NewRelayHolder = NewDummyRelayHolder NewSubTask = NewRealSubTask - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { atomic.AddInt32(&t.createUnitCount, 1) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) @@ -416,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index a01954da8f..ae63704dcb 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/shardddl/pessimism" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer" @@ -44,12 +45,22 @@ const ( waitRelayCatchupTimeout = 30 * time.Second ) +type relayNotifier struct { + // ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times + ch chan interface{} +} + +// Notified implements streamer.EventNotifier. +func (r relayNotifier) Notified() chan interface{} { + return r.ch +} + // createRealUnits is subtask units initializer // it can be used for testing. var createUnits = createRealUnits // createRealUnits creates process units base on task mode. -func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) []unit.Unit { +func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit { failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) { log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly")) failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)}) @@ -64,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor } else { us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } - us = append(us, syncer.NewSyncer(cfg, etcdClient)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) case config.ModeFull: // NOTE: maybe need another checker in the future? us = append(us, dumpling.NewDumpling(cfg)) @@ -74,7 +85,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } case config.ModeIncrement: - us = append(us, syncer.NewSyncer(cfg, etcdClient)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) default: log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode)) } @@ -108,6 +119,8 @@ type SubTask struct { etcdClient *clientv3.Client workerName string + + notifier streamer.EventNotifier } // NewSubTask is subtask initializer @@ -130,6 +143,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * cancel: cancel, etcdClient: etcdClient, workerName: workerName, + notifier: &relayNotifier{ch: make(chan interface{}, 1)}, } updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName) return &st @@ -137,7 +151,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * // initUnits initializes the sub task processing units. func (st *SubTask) initUnits() error { - st.units = createUnits(st.cfg, st.etcdClient, st.workerName) + st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier) if len(st.units) < 1 { return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode) } @@ -724,3 +738,11 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage)) } } + +func (st *SubTask) relayNotify() { + // skip if there's pending notify + select { + case st.notifier.Notified() <- struct{}{}: + default: + } +} diff --git a/dm/worker/subtask_holder.go b/dm/worker/subtask_holder.go index 20d3bcde41..e90aa9fd98 100644 --- a/dm/worker/subtask_holder.go +++ b/dm/worker/subtask_holder.go @@ -16,6 +16,8 @@ package worker import ( "context" "sync" + + "github.com/go-mysql-org/go-mysql/replication" ) // subTaskHolder holds subtask instances. @@ -88,3 +90,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask { } return result } + +// OnEvent implements relay.Listener +// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here +// as relay event need to broadcast to every syncer(most subtask have a syncer). +func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) { + h.mu.RLock() + defer h.mu.RUnlock() + for _, s := range h.subTasks { + s.relayNotify() + } +} diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index a2f94a18c8..53c961f33d 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/dm/dumpling" "github.com/pingcap/dm/loader" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer" @@ -47,10 +48,10 @@ func (t *testSubTask) TestCreateUnits(c *C) { Mode: "xxx", } worker := "worker" - c.Assert(createUnits(cfg, nil, worker), HasLen, 0) + c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0) cfg.Mode = config.ModeFull - unitsFull := createUnits(cfg, nil, worker) + unitsFull := createUnits(cfg, nil, worker, nil) c.Assert(unitsFull, HasLen, 2) _, ok := unitsFull[0].(*dumpling.Dumpling) c.Assert(ok, IsTrue) @@ -58,13 +59,13 @@ func (t *testSubTask) TestCreateUnits(c *C) { c.Assert(ok, IsTrue) cfg.Mode = config.ModeIncrement - unitsIncr := createUnits(cfg, nil, worker) + unitsIncr := createUnits(cfg, nil, worker, nil) c.Assert(unitsIncr, HasLen, 1) _, ok = unitsIncr[0].(*syncer.Syncer) c.Assert(ok, IsTrue) cfg.Mode = config.ModeAll - unitsAll := createUnits(cfg, nil, worker) + unitsAll := createUnits(cfg, nil, worker, nil) c.Assert(unitsAll, HasLen, 3) _, ok = unitsAll[0].(*dumpling.Dumpling) c.Assert(ok, IsTrue) @@ -176,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return nil } st.Run(pb.Stage_Running) @@ -185,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -296,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -420,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -445,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker") c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index a38677f73a..9dc8a32f70 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -22,7 +22,6 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/pingcap/failpoint" "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog" @@ -49,6 +48,12 @@ type SwitchPath struct { nextBinlogName string } +// EventNotifier notifies whether there is new binlog event written to the file. +type EventNotifier interface { + // Notified returns a channel used to check whether there is new binlog event written to the file + Notified() chan interface{} +} + // CollectAllBinlogFiles collects all valid binlog files in dir, and returns filenames in binlog ascending order. func CollectAllBinlogFiles(dir string) ([]string, error) { if dir == "" { @@ -202,123 +207,118 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { } } +type relayLogFileChecker struct { + notifier EventNotifier + relayDir, currentUUID string + latestRelayLogDir, latestFilePath, latestFile string + beginOffset, endOffset int64 +} + // relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created. -// we check the size of the file first, if the size is the same as the latest file, we assume there is no new write -// so we need to check relay meta file to see if the new relay log is created. -// this func will be blocked until current filesize changed or meta file updated or context cancelled. -// we need to make sure that only one channel (updatePathCh or errCh) has events written to it. -func relayLogUpdatedOrNewCreated(ctx context.Context, watcherInterval time.Duration, dir string, - latestFilePath, latestFile string, latestFileSize int64, updatePathCh chan string, errCh chan error) { - ticker := time.NewTicker(watcherInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - errCh <- terror.Annotate(ctx.Err(), "context meet error") +func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, updatePathCh chan string, switchCh chan SwitchPath, errCh chan error) { + // binlog file may have rotated if we read nothing last time(either it's the first read or after notified) + lastReadCnt := r.endOffset - r.beginOffset + if lastReadCnt == 0 { + meta := &Meta{} + _, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta) + if err != nil { + errCh <- terror.Annotate(err, "decode relay meta toml file failed") return - case <-ticker.C: - cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s latestFileSize=%d", latestFilePath, latestFileSize) + } + // current watched file size have no change means that no new writes have been made + // our relay meta file will be updated immediately after receive the rotate event, + // although we cannot ensure that the binlog filename in the meta is the next file after latestFile + // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) + // will find the right one + if meta.BinLogName != r.latestFile { + // we need check file size again, as the file may have been changed during our metafile check + cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset) + if err2 != nil { + errCh <- terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) return } - failpoint.Inject("CMPAlwaysReturn0", func() { - cmp = 0 - }) switch { case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) - return + errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) case cmp > 0: - updatePathCh <- latestFilePath - return + updatePathCh <- r.latestFilePath default: - // current watched file size have no change means that no new writes have been made - // our relay meta file will be updated immediately after receive the rotate event - // although we cannot ensure that the binlog filename in the meta is the next file after latestFile - // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) - // will find the right one - meta := &Meta{} - _, err = toml.DecodeFile(filepath.Join(dir, utils.MetaFilename), meta) - if err != nil { - errCh <- terror.Annotate(err, "decode relay meta toml file failed") - return - } - if meta.BinLogName != latestFile { - // we need check file size again, as the file may have been changed during our metafile check - cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s latestFileSize=%d", - latestFilePath, latestFileSize) - return - } - switch { - case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) - case cmp > 0: - updatePathCh <- latestFilePath - default: - nextFilePath := filepath.Join(dir, meta.BinLogName) - log.L().Info("newer relay log file is already generated", - zap.String("now file path", latestFilePath), - zap.String("new file path", nextFilePath)) - updatePathCh <- nextFilePath - } - return - } + nextFilePath := filepath.Join(r.latestRelayLogDir, meta.BinLogName) + log.L().Info("newer relay log file is already generated", + zap.String("now file path", r.latestFilePath), + zap.String("new file path", nextFilePath)) + updatePathCh <- nextFilePath } + return } - } -} -// needSwitchSubDir checks whether the reader need to switch to next relay sub directory. -func needSwitchSubDir(ctx context.Context, relayDir, currentUUID string, switchCh chan SwitchPath, errCh chan error) { - var ( - err error - nextUUID string - nextBinlogName string - uuids []string - ) - - ticker := time.NewTicker(watcherInterval) - defer func() { - ticker.Stop() + // maybe UUID index file changed + switchPath, err := r.getSwitchPath() if err != nil { errCh <- err - } - }() - for { - select { - case <-ctx.Done(): return - case <-ticker.C: - // reload uuid - uuids, err = utils.ParseUUIDIndex(path.Join(relayDir, utils.UUIDIndexFilename)) - if err != nil { - return - } - nextUUID, _, err = getNextUUID(currentUUID, uuids) + } + if switchPath != nil { + // we need check file size again, as the file may have been changed during path check + cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) if err != nil { + errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) return } - if len(nextUUID) == 0 { - continue + switch { + case cmp < 0: + errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) + case cmp > 0: + updatePathCh <- r.latestFilePath + default: + switchCh <- *switchPath } + return + } + } - // try get the first binlog file in next sub directory - nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID) - if err != nil { - // because creating sub directory and writing relay log file are not atomic - // just continue to observe subdir - if terror.ErrBinlogFilesNotFound.Equal(err) { - err = nil - continue - } - return - } + timer := time.NewTimer(watcherInterval) + defer timer.Stop() + select { + case <-ctx.Done(): + errCh <- terror.Annotate(ctx.Err(), "context meet error") + case <-r.notifier.Notified(): + // the notified event may not be the current relay file + // in that case we may read 0 bytes and check again + updatePathCh <- r.latestFilePath + case <-timer.C: + // for a task start after source shutdown or there's no new write, it'll not be notified, + // and if it's reading from dir 000001 and there's need to switch dir to 000002, + // after the task read files in dir 000001, the read size > 0, so it goes to the select directly, + // since there is no notify, it blocks, that'll leave dir 000002 un-synced. + // so we stop waiting after watcherInterval to give it a chance to check again + updatePathCh <- r.latestFilePath + } +} - switchCh <- SwitchPath{nextUUID, nextBinlogName} - return +func (r *relayLogFileChecker) getSwitchPath() (*SwitchPath, error) { + // reload uuid + uuids, err := utils.ParseUUIDIndex(path.Join(r.relayDir, utils.UUIDIndexFilename)) + if err != nil { + return nil, err + } + nextUUID, _, err := getNextUUID(r.currentUUID, uuids) + if err != nil { + return nil, err + } + if len(nextUUID) == 0 { + return nil, nil + } + + // try to get the first binlog file in next subdirectory + nextBinlogName, err := getFirstBinlogName(r.relayDir, nextUUID) + if err != nil { + // because creating subdirectory and writing relay log file are not atomic + if terror.ErrBinlogFilesNotFound.Equal(err) { + return nil, nil } + return nil, err } + + return &SwitchPath{nextUUID, nextBinlogName}, nil } diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 42b6e09a83..1a28229452 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -20,13 +20,11 @@ import ( "os" "path" "path/filepath" - "sync" "time" "github.com/BurntSushi/toml" . "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" @@ -296,30 +294,59 @@ func (t *testFileSuite) TestFileSizeUpdated(c *C) { c.Assert(cmp, Equals, 1) } +type dummyEventNotifier struct { + ch chan interface{} +} + +func (d *dummyEventNotifier) Notified() chan interface{} { + return d.ch +} + +func newDummyEventNotifier(n int) EventNotifier { + d := &dummyEventNotifier{ + ch: make(chan interface{}, n), + } + for i := 0; i < n; i++ { + d.ch <- struct{}{} + } + return d +} + func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { var ( relayFiles = []string{ "mysql-bin.000001", "mysql-bin.000002", - "mysql-bin.000003", } - binlogPos = uint32(4) - binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" - relayPaths = make([]string, len(relayFiles)) - data = []byte("meaningless file content") - size = int64(len(data)) - watcherInterval = 100 * time.Millisecond - updatePathCh = make(chan string, 1) - errCh = make(chan error, 1) + binlogPos = uint32(4) + binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" + relayPaths = make([]string, len(relayFiles)) + data = []byte("meaningless file content") + size = int64(len(data)) + updatePathCh = make(chan string, 1) + switchCh = make(chan SwitchPath, 1) + errCh = make(chan error, 1) ) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + en := newDummyEventNotifier(0) // a. relay log dir not exist - relayLogUpdatedOrNewCreated(ctx, watcherInterval, "/not-exists-directory", "/not-exists-filepath", "not-exists-file", 0, updatePathCh, errCh) + checker := &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: "/not-exists-directory", + latestFilePath: "/not-exists-filepath", + latestFile: "not-exists-file", + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err := <-errCh c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") @@ -330,134 +357,257 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { relayPaths[i] = filepath.Join(subDir, rf) } - // b. relay file not found - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) + rotateRelayFile := func(filename string) { + meta := Meta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} + metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) + c.Assert(err2, IsNil) + err = toml.NewEncoder(metaFile).Encode(meta) + c.Assert(err, IsNil) + _ = metaFile.Close() + } + + // meta not found + checker = &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*no such file or directory*") + + // write meta + rotateRelayFile(relayFiles[0]) + + // relay file not found + checker = &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: "not-exists-file", + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err = <-errCh c.Assert(err, ErrorMatches, ".*no such file or directory*") // create the first relay file - err = os.WriteFile(relayPaths[0], nil, 0o600) + err = os.WriteFile(relayPaths[0], data, 0o600) c.Assert(err, IsNil) + // rotate relay file + rotateRelayFile(relayFiles[1]) - // write meta file - meta := Meta{BinLogName: relayFiles[0], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) - c.Assert(err2, IsNil) - err = toml.NewEncoder(metaFile).Encode(meta) + // file decreased when meta changed + err = os.WriteFile(relayPaths[0], nil, 0o600) c.Assert(err, IsNil) - metaFile.Close() - - // c. latest file path not exist - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, "/no-exists-filepath", relayFiles[0], 0, updatePathCh, errCh) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: size, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err = <-errCh - c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") + c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") + + // return changed file in meta + checker = &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up := <-updatePathCh + c.Assert(up, Equals, relayPaths[1]) - // 1. file increased + // file increased when checking meta err = os.WriteFile(relayPaths[0], data, 0o600) - c.Assert(err, IsNil) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[0]) - }() - wg.Wait() - - // 2. file decreased when adding watching - err = os.WriteFile(relayPaths[0], nil, 0o600) - c.Assert(err, IsNil) - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], size, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") - }() - wg.Wait() - - // 3. new binlog file created - err = os.WriteFile(relayPaths[1], nil, 0o600) - c.Assert(err, IsNil) - wg.Add(1) - go func() { - defer wg.Done() - // update meta file - meta = Meta{BinLogName: relayFiles[1], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - }() - wg.Wait() - - // 4. file increased when checking meta - wg.Add(1) - go func() { - defer wg.Done() - // update meta file to new file - meta = Meta{BinLogName: relayFiles[2], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - fi, err4 := os.Stat(relayPaths[1]) - c.Assert(err4, IsNil) - curSize := fi.Size() - // write old binlog file - err5 := os.WriteFile(relayPaths[1], data, 0o600) - c.Assert(err5, IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0", `return(true)`), IsNil) - //nolint:errcheck - defer failpoint.Disable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0") - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], curSize, updatePathCh, errCh) - }() - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - c.Assert(len(errCh), Equals, 0) - c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0"), IsNil) - }() - wg.Wait() - - // 5. context timeout (no new write) - // revert meta file to old file - meta = Meta{BinLogName: relayFiles[1], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - - fi, err2 := os.Stat(relayPaths[1]) - c.Assert(err2, IsNil) - curSize := fi.Size() - newCtx, cancel := context.WithTimeout(ctx, time.Second) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) + c.Assert(len(switchCh), Equals, 0) + c.Assert(len(errCh), Equals, 0) + + // context timeout (no new write) + newCtx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() - relayLogUpdatedOrNewCreated(newCtx, watcherInterval, subDir, relayPaths[1], relayFiles[1], curSize, updatePathCh, errCh) + relayDir := c.MkDir() + t.writeUUIDs(c, relayDir, []string{"xxx.000001", "xxx.000002"}) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err7 := <-errCh c.Assert(err7, ErrorMatches, "context meet error:.*") + + // binlog dir switched, but last file not exists + _ = os.MkdirAll(filepath.Join(relayDir, "xxx.000002"), 0o700) + _ = os.WriteFile(filepath.Join(relayDir, "xxx.000002", "mysql.000001"), nil, 0o600) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*no such file or directory*") + + // binlog dir switched, but last file smaller + err = os.WriteFile(relayPaths[1], nil, 0o600) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: size, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") + + // binlog dir switched, but last file bigger + err = os.WriteFile(relayPaths[1], data, 0o600) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[1]) + + // binlog dir switched, but last file not changed + err = os.WriteFile(relayPaths[1], nil, 0o600) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 1) + sp := <-switchCh + c.Assert(sp.nextUUID, Equals, "xxx.000002") + c.Assert(sp.nextBinlogName, Equals, "mysql.000001") + + // got notified + en = newDummyEventNotifier(1) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) + c.Assert(len(en.Notified()), Equals, 0) + + // got notified on timer + en = newDummyEventNotifier(0) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) } -func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { +func (t *testFileSuite) TestGetSwitchPath(c *C) { var ( relayDir = c.MkDir() UUIDs = []string{ @@ -466,47 +616,34 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { "53ea0ed1-9bf8-11e6-8bea-64006a897c71.000003", } currentUUID = UUIDs[len(UUIDs)-1] // no next UUID - switchCh = make(chan SwitchPath, 1) - errCh = make(chan error, 1) ) - ctx := context.Background() - // invalid UUID in UUIDs, error UUIDs = append(UUIDs, "invalid.uuid") t.writeUUIDs(c, relayDir, UUIDs) - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - err := <-errCh + checker := &relayLogFileChecker{ + relayDir: relayDir, + currentUUID: currentUUID, + } + switchPath, err := checker.getSwitchPath() + c.Assert(switchPath, IsNil) c.Assert(err, ErrorMatches, ".*not valid.*") UUIDs = UUIDs[:len(UUIDs)-1] // remove the invalid UUID t.writeUUIDs(c, relayDir, UUIDs) - // no next UUID, no need switch - newCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - needSwitchSubDir(newCtx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - // no next sub directory - currentUUID = UUIDs[0] - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh + checker.currentUUID = UUIDs[0] + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath, IsNil) c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) // create next sub directory, block err = os.Mkdir(filepath.Join(relayDir, UUIDs[1]), 0o700) c.Assert(err, IsNil) - newCtx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) - defer cancel2() - needSwitchSubDir(newCtx2, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath, IsNil) + c.Assert(err, IsNil) // create a relay log file in the next sub directory nextBinlogPath := filepath.Join(relayDir, UUIDs[1], "mysql-bin.000001") @@ -516,12 +653,10 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { c.Assert(err, IsNil) // switch to the next - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 1) - res := <-switchCh - c.Assert(res.nextUUID, Equals, UUIDs[1]) - c.Assert(res.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath.nextUUID, Equals, UUIDs[1]) + c.Assert(switchPath.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) + c.Assert(err, IsNil) } // nolint:unparam diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index a9a5ac4105..189bdc4a0a 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -79,10 +79,11 @@ type BinlogReader struct { usingGTID bool prevGset, currGset mysql.GTIDSet + notifier EventNotifier } // NewBinlogReader creates a new BinlogReader. -func NewBinlogReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { +func NewBinlogReader(notifier EventNotifier, logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { ctx, cancel := context.WithCancel(context.Background()) // only can be canceled in `Close` parser := replication.NewBinlogParser() parser.SetVerifyChecksum(true) @@ -100,6 +101,7 @@ func NewBinlogReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename), cancel: cancel, tctx: newtctx, + notifier: notifier, } } @@ -451,7 +453,12 @@ func (r *BinlogReader) parseFile( switch ev := e.Event.(type) { case *replication.FormatDescriptionEvent: - // ignore FORMAT_DESCRIPTION event, because go-mysql will send this fake event + // go-mysql will send a duplicate FormatDescriptionEvent event when offset > 4, ignore it + if offset > 4 { + return nil + } + // else just update lastPos + latestPos = int64(e.Header.LogPos) case *replication.RotateEvent: // add master UUID suffix to pos.Name parsed, _ := binlog.ParseFilename(string(ev.NextLogName)) @@ -576,7 +583,6 @@ func (r *BinlogReader) parseFile( } switchCh := make(chan SwitchPath, 1) - switchErrCh := make(chan error, 1) updatePathCh := make(chan string, 1) updateErrCh := make(chan error, 1) newCtx, cancel := context.WithCancel(ctx) @@ -586,28 +592,27 @@ func (r *BinlogReader) parseFile( wg.Wait() }() - wg.Add(1) - go func() { - defer wg.Done() - needSwitchSubDir(newCtx, r.cfg.RelayDir, currentUUID, switchCh, switchErrCh) - }() - wg.Add(1) go func(latestPos int64) { defer wg.Done() - relayLogUpdatedOrNewCreated(newCtx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos, updatePathCh, updateErrCh) + checker := relayLogFileChecker{ + notifier: r.notifier, + relayDir: r.cfg.RelayDir, + currentUUID: currentUUID, + latestRelayLogDir: relayLogDir, + latestFilePath: fullPath, + latestFile: relayLogFile, + beginOffset: offset, + endOffset: latestPos, + } + // TODO no need to be a goroutine now, maybe refactored when refactoring parseFile itself. + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, updateErrCh) }(latestPos) select { case <-ctx.Done(): return false, false, 0, "", "", false, nil case switchResp := <-switchCh: - // wait to ensure old file not updated - pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 }) - if pathUpdated { - // re-parse it - return false, true, latestPos, "", "", replaceWithHeartbeat, nil - } // update new uuid if err = r.updateUUIDs(); err != nil { return false, false, 0, "", "", false, nil @@ -620,8 +625,6 @@ func (r *BinlogReader) parseFile( } // need parse next relay log file or re-collect files return false, false, latestPos, "", "", false, nil - case err := <-switchErrCh: - return false, false, 0, "", "", false, err case err := <-updateErrCh: return false, false, 0, "", "", false, err } diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 84eca75b78..ddbcc1a1f7 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -85,7 +85,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { currentUUID := "invalid-current-uuid" relayDir := filepath.Join(baseDir, currentUUID) cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r := NewBinlogReader(log.L(), cfg) + r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*invalid-current-uuid.*") @@ -101,7 +101,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { relayDir = filepath.Join(baseDir, currentUUID) fullPath := filepath.Join(relayDir, filename) cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) // relay log file not exists, failed needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( @@ -233,7 +233,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { c.Assert(nextBinlogName, Equals, "") c.Assert(replaceWithHeartbeat, Equals, false) - // should only got a RotateEvent and a FormatDescriptionEven + // should only get a RotateEvent i = 0 for { ev, err2 := s.GetEvent(ctx) @@ -242,12 +242,10 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { case replication.ROTATE_EVENT: c.Assert(ev.RawData, DeepEquals, rotateEv.RawData) i++ - case replication.FORMAT_DESCRIPTION_EVENT: - i++ default: c.Fatalf("got unexpected event %+v", ev.Header) } - if i >= 2 { + if i >= 1 { break } } @@ -256,102 +254,6 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { cancel() } -func (t *testReaderSuite) TestParseFileRelayLogUpdatedOrNewCreated(c *C) { - var ( - filename = "test-mysql-bin.000001" - nextFilename = "test-mysql-bin.000002" - notUsedGTIDSetStr = t.lastGTID.String() - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = true - baseEvents, lastPos, lastGTID = t.genBinlogEvents(c, t.lastPos, t.lastGTID) - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - relayDir = filepath.Join(baseDir, currentUUID) - fullPath = filepath.Join(relayDir, filename) - nextPath = filepath.Join(relayDir, nextFilename) - s = newLocalStreamer() - cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) - ) - - // create the current relay log file and write some events - err := os.MkdirAll(relayDir, 0o700) - c.Assert(err, IsNil) - f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) - c.Assert(err, IsNil) - defer f.Close() - _, err = f.Write(replication.BinLogFileHeader) - c.Assert(err, IsNil) - for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) - } - t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - - // no valid update for relay sub dir, timeout, no error - ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) - defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - t.purgeStreamer(c, s) - - // current relay log file updated, need to re-parse it - var wg sync.WaitGroup - wg.Add(1) - extraEvents, _, _ := t.genBinlogEvents(c, lastPos, lastGTID) - go func() { - defer wg.Done() - time.Sleep(500 * time.Millisecond) // wait parseFile started - _, err2 := f.Write(extraEvents[0].RawData) - c.Assert(err2, IsNil) - }() - ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel2() - t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsTrue) - c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - wg.Wait() - t.purgeStreamer(c, s) - - // new relay log file created, need to re-collect files - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(500 * time.Millisecond) // wait parseFile started - err2 := os.WriteFile(nextPath, replication.BinLogFileHeader, 0o600) - c.Assert(err2, IsNil) - }() - ctx3, cancel3 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel3() - t.createMetaFile(c, relayDir, nextFilename, uint32(offset), notUsedGTIDSetStr) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(extraEvents[0].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - wg.Wait() - t.purgeStreamer(c, s) -} - func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { var ( filename = "test-mysql-bin.000001" @@ -361,7 +263,6 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { offset int64 firstParse = true possibleLast = true - baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" switchedUUID = "b60868af-5a6f-11e9-9ea3-0242ac160007.000002" relayDir = filepath.Join(baseDir, currentUUID) @@ -370,21 +271,18 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { nextFullPath = filepath.Join(nextRelayDir, nextFilename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) ) - // create the current relay log file and write some events + // create the current relay log file and meta err := os.MkdirAll(relayDir, 0o700) c.Assert(err, IsNil) f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) c.Assert(err, IsNil) defer f.Close() _, err = f.Write(replication.BinLogFileHeader) + offset = 4 c.Assert(err, IsNil) - for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) - } t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) // invalid UUID in UUID list, error @@ -444,7 +342,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { fullPath = filepath.Join(relayDir, filename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) ) // create the current relay log file and write some events @@ -462,7 +360,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) + c.Assert(needReParse, IsTrue) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") @@ -480,6 +378,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // meet `err EOF` error (when parsing binlog event) ignored ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() + r.notifier = newDummyEventNotifier(1) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) @@ -493,9 +392,10 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { func (t *testReaderSuite) TestUpdateUUIDs(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) ) c.Assert(r.uuids, HasLen, 0) @@ -520,6 +420,7 @@ func (t *testReaderSuite) TestUpdateUUIDs(c *C) { func (t *testReaderSuite) TestStartSyncByPos(c *C) { var ( + en = newDummyEventNotifier(1) filenamePrefix = "test-mysql-bin.00000" notUsedGTIDSetStr = t.lastGTID.String() baseDir = c.MkDir() @@ -531,7 +432,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { "b60868af-5a6f-11e9-9ea3-0242ac160008.000003", } cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -575,18 +476,8 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { // get events from the streamer ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - obtainBaseEvents := make([]*replication.BinlogEvent, 0, (1+2+3)*len(baseEvents)) - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - if len(obtainBaseEvents) == cap(obtainBaseEvents) { - break - } - } + + obtainBaseEvents := readNEvents(ctx, c, s, (1+2+3)*len(baseEvents)) t.verifyNoEventsInStreamer(c, s) // verify obtain base events for i := 0; i < len(obtainBaseEvents); i += len(baseEvents) { @@ -651,12 +542,29 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { r.Close() } +func readNEvents(ctx context.Context, c *C, s Streamer, l int) []*replication.BinlogEvent { + var result []*replication.BinlogEvent + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { + continue // ignore fake event + } + result = append(result, ev) + // start from the first format description event + if len(result) == l { + break + } + } + return result +} + func (t *testReaderSuite) TestStartSyncByGTID(c *C) { var ( baseDir = c.MkDir() events []*replication.BinlogEvent cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(0), log.L(), cfg) lastPos uint32 lastGTID gtid.Set previousGset, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "") @@ -824,19 +732,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var obtainBaseEvents []*replication.BinlogEvent - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - // start from the first format description event - if len(obtainBaseEvents) == len(allEvents) { - break - } - } + obtainBaseEvents := readNEvents(ctx, c, s, len(allEvents)) preGset, err := gmysql.ParseGTIDSet(gmysql.MySQLFlavor, "") c.Assert(err, IsNil) @@ -859,7 +755,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { } r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) excludeStrs := []string{} // exclude first uuid @@ -877,20 +773,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { // StartSyncByGtid exclude first uuid s, err = r.StartSyncByGTID(excludeGset) c.Assert(err, IsNil) - obtainBaseEvents = []*replication.BinlogEvent{} - - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - // start from the first format description event - if len(obtainBaseEvents) == len(allEvents) { - break - } - } + obtainBaseEvents = readNEvents(ctx, c, s, len(allEvents)) gset := excludeGset.Clone() // every gtid event not from first uuid should become heartbeat event @@ -916,12 +799,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(excludeGset) // error because file has been purge c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue) @@ -930,12 +813,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(excludeGset) // error because subdir has been purge c.Assert(err, ErrorMatches, ".*no such file or directory.*") @@ -952,7 +835,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) - r := NewBinlogReader(log.L(), cfg) + r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) err := r.checkRelayPos(startPos) c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") @@ -971,7 +854,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) // write UUIDs into index file - r = NewBinlogReader(log.L(), cfg) // create a new reader + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) // create a new reader uuidBytes := t.uuidListToBytes(c, UUIDs) err = os.WriteFile(r.indexPath, uuidBytes, 0o600) c.Assert(err, IsNil) @@ -1014,9 +897,10 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) mysqlGset, _ = gmysql.ParseMysqlGTIDSet("b60868af-5a6f-11e9-9ea3-0242ac160006:1-6") mariadbGset, _ = gmysql.ParseMariadbGTIDSet("0-1-5") ) @@ -1048,9 +932,10 @@ func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { func (t *testReaderSuite) TestReParseUsingGTID(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) uuid = "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001" gtidStr = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" file = "mysql.000001" @@ -1133,6 +1018,10 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) { time.Sleep(time.Second) _, err = f.Write(events[i].RawData) c.Assert(err, IsNil) + select { + case en.Notified() <- struct{}{}: + default: + } } wg.Wait() } diff --git a/relay/relay.go b/relay/relay.go index 34e9f859f1..c91716082f 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -68,6 +68,12 @@ var NewRelay = NewRealRelay var _ Process = &Relay{} +// Listener defines a binlog event listener of relay log. +type Listener interface { + // OnEvent get called when relay processed an event successfully. + OnEvent(e *replication.BinlogEvent) +} + // Process defines mysql-like relay log process unit. type Process interface { // Init initial relat log unit @@ -98,6 +104,10 @@ type Process interface { ResetMeta() // PurgeRelayDir will clear all contents under w.cfg.RelayDir PurgeRelayDir() error + // RegisterListener registers a relay listener + RegisterListener(el Listener) + // UnRegisterListener unregisters a relay listener + UnRegisterListener(el Listener) } // Relay relays mysql binlog to local file. @@ -116,14 +126,16 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } + listeners map[Listener]struct{} // make it a set to make it easier to remove listener } // NewRealRelay creates an instance of Relay. func NewRealRelay(cfg *Config) Process { return &Relay{ - cfg: cfg, - meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), - logger: log.With(zap.String("component", "relay log")), + cfg: cfg, + meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), + logger: log.With(zap.String("component", "relay log")), + listeners: make(map[Listener]struct{}), } } @@ -560,6 +572,9 @@ func (r *Relay) handleEvents( r.tryUpdateActiveRelayLog(e, lastPos.Name) // even the event ignored we still need to try this update. continue } + + r.notify(e) + relayLogWriteDurationHistogram.Observe(time.Since(writeTimer).Seconds()) r.tryUpdateActiveRelayLog(e, lastPos.Name) // wrote a event, try update the current active relay log. @@ -1078,3 +1093,25 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) defer dbConn.Close() return utils.AddGSetWithPurged(ctx, resultGs, dbConn) } + +func (r *Relay) notify(e *replication.BinlogEvent) { + r.RLock() + defer r.RUnlock() + for el := range r.listeners { + el.OnEvent(e) + } +} + +// RegisterListener implements Process.RegisterListener. +func (r *Relay) RegisterListener(el Listener) { + r.Lock() + defer r.Unlock() + r.listeners[el] = struct{}{} +} + +// UnRegisterListener implements Process.UnRegisterListener. +func (r *Relay) UnRegisterListener(el Listener) { + r.Lock() + defer r.Unlock() + delete(r.listeners, el) +} diff --git a/relay/relay_test.go b/relay/relay_test.go index 28ad060d97..6cf3f3172e 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -319,6 +319,30 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { c.Assert(latestGTIDs.Equal(recoverGTIDSet), IsTrue) } +type dummyListener bool + +func (d *dummyListener) OnEvent(e *replication.BinlogEvent) { + *d = true +} + +func (t *testRelaySuite) TestListener(c *C) { + relay := NewRelay(&Config{}).(*Relay) + c.Assert(len(relay.listeners), Equals, 0) + + lis := dummyListener(false) + relay.RegisterListener(&lis) + c.Assert(len(relay.listeners), Equals, 1) + + relay.notify(nil) + c.Assert(bool(lis), Equals, true) + + relay.UnRegisterListener(&lis) + c.Assert(len(relay.listeners), Equals, 0) + lis = false + relay.notify(nil) + c.Assert(bool(lis), Equals, false) +} + // genBinlogEventsWithGTIDs generates some binlog events used by testFileUtilSuite and testFileWriterSuite. // now, its generated events including 3 DDL and 10 DML. func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, latestGTID2 gtid.Set) (*event.Generator, []*replication.BinlogEvent, []byte) { diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index 2f861397eb..c15c2ee223 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -114,7 +114,7 @@ func (s *testDDLSuite) TestCommentQuote(c *C) { qec.splitDDLs, err = parserpkg.SplitDDL(stmt, qec.ddlSchema) c.Assert(err, IsNil) - syncer := NewSyncer(&config.SubTaskConfig{}, nil) + syncer := NewSyncer(&config.SubTaskConfig{}, nil, nil) syncer.tctx = tctx c.Assert(syncer.genRouter(), IsNil) @@ -217,7 +217,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) { }, } var err error - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.tctx = tctx syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -376,7 +376,7 @@ func (s *testDDLSuite) TestResolveGeneratedColumnSQL(c *C) { } tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveGeneratedColumnSQL"))) - syncer := NewSyncer(&config.SubTaskConfig{}, nil) + syncer := NewSyncer(&config.SubTaskConfig{}, nil, nil) syncer.tctx = tctx c.Assert(syncer.genRouter(), IsNil) p := parser.New() @@ -460,7 +460,7 @@ func (s *testDDLSuite) TestResolveOnlineDDL(c *C) { for _, ca := range cases { plugin, err := onlineddl.NewRealOnlinePlugin(tctx, cfg) c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.tctx = tctx syncer.onlineDDL = plugin c.Assert(plugin.Clear(tctx), IsNil) @@ -541,7 +541,7 @@ func (s *testDDLSuite) TestMistakeOnlineDDLRegex(c *C) { for _, ca := range cases { plugin, err := onlineddl.NewRealOnlinePlugin(tctx, cfg) c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) c.Assert(syncer.genRouter(), IsNil) syncer.onlineDDL = plugin c.Assert(plugin.Clear(tctx), IsNil) @@ -593,7 +593,7 @@ func (s *testDDLSuite) TestDropSchemaInSharding(c *C) { dbCfg := config.GetDBConfigForTest() cfg := s.newSubTaskCfg(dbCfg) cfg.ShardMode = config.ShardPessimistic - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) // nolint:dogsled _, _, _, _, err := syncer.sgk.AddGroup(targetTable, []string{source1}, nil, true) c.Assert(err, IsNil) @@ -620,7 +620,7 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) { dbCfg := config.GetDBConfigForTest() cfg := s.newSubTaskCfg(dbCfg) cfg.ShardMode = config.ShardPessimistic - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) mock := mockOnlinePlugin{ map[string]struct{}{key1: {}, key2: {}}, } diff --git a/syncer/error_test.go b/syncer/error_test.go index beee35e233..7e80055a14 100644 --- a/syncer/error_test.go +++ b/syncer/error_test.go @@ -54,7 +54,7 @@ func (s *testSyncerSuite) TestIgnoreDDLError(c *C) { func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) { var ( - syncer = NewSyncer(s.cfg, nil) + syncer = NewSyncer(s.cfg, nil, nil) tctx = tcontext.Background() conn2 = &dbconn.DBConn{Cfg: s.cfg, ResetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { return nil, nil diff --git a/syncer/filter_test.go b/syncer/filter_test.go index a541b84ed9..d8ffcd157f 100644 --- a/syncer/filter_test.go +++ b/syncer/filter_test.go @@ -56,7 +56,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { IgnoreTables: []*filter.Table{{Schema: "s1", Name: "test"}}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) c.Assert(syncer.genRouter(), IsNil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) @@ -180,7 +180,7 @@ func (s *testFilterSuite) TestSkipByFilter(c *C) { IgnoreDBs: []string{"s1"}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -250,7 +250,7 @@ func (s *testFilterSuite) TestSkipByTable(c *C) { IgnoreDBs: []string{"s1"}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) diff --git a/syncer/handle_error_test.go b/syncer/handle_error_test.go index 1720661c8a..a4405037ce 100644 --- a/syncer/handle_error_test.go +++ b/syncer/handle_error_test.go @@ -26,7 +26,7 @@ import ( func (s *testSyncerSuite) TestHandleError(c *C) { var ( - syncer = NewSyncer(s.cfg, nil) + syncer = NewSyncer(s.cfg, nil, nil) task = "test" ctx = context.Background() cases = []struct { diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index b0f37a74e2..c58dc89c3b 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -121,10 +121,11 @@ type StreamerController struct { // whether the server id is updated serverIDUpdated bool + notifier streamer.EventNotifier } // NewStreamerController creates a new streamer controller. -func NewStreamerController( +func NewStreamerController(notifier streamer.EventNotifier, syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, @@ -154,6 +155,7 @@ func NewStreamerController( timezone: timezone, fromDB: fromDB, closed: true, + notifier: notifier, } return streamerController @@ -232,7 +234,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(c.notifier, tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) diff --git a/syncer/streamer_controller_test.go b/syncer/streamer_controller_test.go index 0b49ff937b..2f1de17efe 100644 --- a/syncer/streamer_controller_test.go +++ b/syncer/streamer_controller_test.go @@ -33,7 +33,7 @@ func (s *testSyncerSuite) TestIsConnectionRefusedError(c *C) { } func (s *testSyncerSuite) TestCanErrorRetry(c *C) { - controller := NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, + controller := NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, LocalBinlog, "", nil) mockErr := errors.New("test") @@ -50,7 +50,7 @@ func (s *testSyncerSuite) TestCanErrorRetry(c *C) { }() // test with remote binlog - controller = NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, + controller = NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, RemoteBinlog, "", nil) c.Assert(controller.CanRetry(mockErr), IsTrue) diff --git a/syncer/syncer.go b/syncer/syncer.go index fec9640747..8150772e7d 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -219,10 +219,12 @@ type Syncer struct { secondsBehindMaster atomic.Int64 // current task delay second behind upstream workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time + + notifier streamer.EventNotifier } // NewSyncer creates a new Syncer. -func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { +func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier streamer.EventNotifier) *Syncer { logger := log.With(zap.String("task", cfg.Name), zap.String("unit", "binlog replication")) syncer := &Syncer{ pessimist: shardddl.NewPessimist(&logger, etcdClient, cfg.Name, cfg.SourceID), @@ -260,6 +262,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.workerJobTSArray[i] = atomic.NewInt64(0) } syncer.lastCheckpointFlushedTime = time.Time{} + syncer.notifier = notifier return syncer } @@ -315,7 +318,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.streamerController = NewStreamerController(s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + s.streamerController = NewStreamerController(s.notifier, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) if err != nil { @@ -3289,7 +3292,7 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { return false, nil } // set enableGTID to false for new streamerController - streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + streamerController := NewStreamerController(s.notifier, s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) endPos := binlog.AdjustPosition(location.Position) startPos := mysql.Position{ diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a6a68fc9ee..7600472a4f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -263,7 +263,7 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { p := parser.New() cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) err = syncer.genRouter() @@ -367,7 +367,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { p := parser.New() cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -403,7 +403,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { p := parser.New() cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -494,7 +494,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { p := parser.New() cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -588,7 +588,7 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) c.Assert(syncer.genRouter(), IsNil) syncer.binlogFilter, err = bf.NewBinlogEvent(false, s.cfg.FilterRules) @@ -707,7 +707,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { func (s *testSyncerSuite) TestcheckpointID(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) checkpointID := syncer.checkpointID() c.Assert(checkpointID, Equals, "101") } @@ -758,7 +758,7 @@ func (s *testSyncerSuite) TestRun(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.cfg.CheckpointFlushInterval = 30 syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ @@ -1001,7 +1001,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, @@ -1144,7 +1144,7 @@ func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) cfg.Mode = config.ModeAll - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) fresh, err := syncer.IsFreshTask(context.Background()) c.Assert(err, IsNil) c.Assert(fresh, IsTrue) @@ -1190,7 +1190,7 @@ func (s *testSyncerSuite) TestTrackDDL(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, diff --git a/tests/_dmctl_tools/check_master_online.go b/tests/_dmctl_tools/check_master_online.go index a1a986fb78..dec45991e8 100644 --- a/tests/_dmctl_tools/check_master_online.go +++ b/tests/_dmctl_tools/check_master_online.go @@ -20,9 +20,10 @@ import ( "google.golang.org/grpc" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use show-ddl-locks request to test DM-master is online diff --git a/tests/_dmctl_tools/check_worker_online.go b/tests/_dmctl_tools/check_worker_online.go index 0bcbb804c2..bd4b33e9dd 100644 --- a/tests/_dmctl_tools/check_worker_online.go +++ b/tests/_dmctl_tools/check_worker_online.go @@ -20,9 +20,10 @@ import ( "google.golang.org/grpc" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use query status request to test DM-worker is online diff --git a/tests/ha_cases3/run.sh b/tests/ha_cases3/run.sh index 4345d63f70..6e68f2d40d 100755 --- a/tests/ha_cases3/run.sh +++ b/tests/ha_cases3/run.sh @@ -65,11 +65,11 @@ function test_stop_task() { task_config=(dm-task.yaml dm-task2.yaml) for name in ${task_name[@]}; do echo "stop tasks $name" - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task $name" \ "\"result\": true" 3 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name" \ "\"result\": false" 1 done @@ -80,13 +80,14 @@ function test_stop_task() { for idx in $(seq 0 1); do echo "start tasks $cur/conf/${task_config[$idx]}" + # if relay hasn't catch up with current sync pos(such as when workload too high), start-task may start with an error + # so we don't check "\"result\": true" here, and check it using query-status run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/${task_config[$idx]}" \ - "\"result\": true" 3 \ "\"source\": \"$SOURCE_ID1\"" 1 \ "\"source\": \"$SOURCE_ID2\"" 1 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${task_name[$idx]}" \ "\"stage\": \"Running\"" 4 done