From 838d439fdb1b6749c54c5fa7bc83e05e159365af Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Sat, 9 Oct 2021 16:09:04 +0800 Subject: [PATCH 01/20] notify syncer on new binlog event --- dm/worker/relay.go | 8 ++++---- dm/worker/source_worker.go | 10 +++++++++- dm/worker/subtask.go | 15 +++++++++++++++ pkg/streamer/file.go | 10 +++++++++- pkg/streamer/reader.go | 6 ++++-- relay/relay.go | 14 ++++++++++++-- syncer/streamer_controller.go | 6 ++++-- syncer/syncer.go | 11 +++++++++-- 8 files changed, 66 insertions(+), 14 deletions(-) diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 1ef228b099..e4fe38bf8e 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -76,13 +76,13 @@ type realRelayHolder struct { } // NewRealRelayHolder creates a new RelayHolder. -func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder { - cfg := relay.FromSourceCfg(sourceCfg) +func NewRealRelayHolder(w *SourceWorker) RelayHolder { + cfg := relay.FromSourceCfg(w.cfg) h := &realRelayHolder{ - cfg: sourceCfg, + cfg: w.cfg, stage: pb.Stage_New, - relay: relay.NewRelay(cfg), + relay: relay.NewRelay(w, cfg), l: log.With(zap.String("component", "relay holder")), } h.closed.Store(true) diff --git a/dm/worker/source_worker.go b/dm/worker/source_worker.go index 854283885a..b3151fd822 100644 --- a/dm/worker/source_worker.go +++ b/dm/worker/source_worker.go @@ -19,6 +19,8 @@ import ( "sync" "time" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/golang/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -320,7 +322,7 @@ func (w *SourceWorker) EnableRelay() (err error) { } // 2. initial relay holder, the cfg's password need decrypt - w.relayHolder = NewRelayHolder(w.cfg) + w.relayHolder = NewRelayHolder(w) relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{ w, }) @@ -1038,3 +1040,9 @@ func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErro return st.HandleError(ctx, req) } + +func (w *SourceWorker) OnEvent(e *replication.BinlogEvent) { + for _, s := range w.subTaskHolder.getAllSubTasks() { + s.relayNotify() + } +} diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index a01954da8f..25a3d2ced5 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -100,6 +100,7 @@ type SubTask struct { units []unit.Unit // units do job one by one currUnit unit.Unit prevUnit unit.Unit + syncer *syncer.Syncer resultWg sync.WaitGroup stage pb.Stage // stage of current sub task @@ -190,6 +191,11 @@ func (st *SubTask) initUnits() error { needCloseUnits = st.units[:skipIdx] st.units = st.units[skipIdx:] + for _, u := range st.units { + if s, ok := u.(*syncer.Syncer); ok { + st.syncer = s + } + } st.setCurrUnit(st.units[0]) return nil @@ -724,3 +730,12 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage)) } } + +func (st *SubTask) relayNotify() { + if st.syncer != nil { + select { + case st.syncer.RelayNotifyCh <- true: + default: + } + } +} diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index a38677f73a..f9c45143af 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -49,6 +49,11 @@ type SwitchPath struct { nextBinlogName string } +// EventNotifier notifies +type EventNotifier interface { + Notified() chan bool +} + // CollectAllBinlogFiles collects all valid binlog files in dir, and returns filenames in binlog ascending order. func CollectAllBinlogFiles(dir string) ([]string, error) { if dir == "" { @@ -207,12 +212,15 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { // so we need to check relay meta file to see if the new relay log is created. // this func will be blocked until current filesize changed or meta file updated or context cancelled. // we need to make sure that only one channel (updatePathCh or errCh) has events written to it. -func relayLogUpdatedOrNewCreated(ctx context.Context, watcherInterval time.Duration, dir string, +func relayLogUpdatedOrNewCreated(n EventNotifier, ctx context.Context, watcherInterval time.Duration, dir string, latestFilePath, latestFile string, latestFileSize int64, updatePathCh chan string, errCh chan error) { ticker := time.NewTicker(watcherInterval) defer ticker.Stop() for { select { + case <-n.Notified(): + updatePathCh <- latestFilePath + return case <-ctx.Done(): errCh <- terror.Annotate(ctx.Err(), "context meet error") return diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index a9a5ac4105..4f4dfd717c 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -79,10 +79,11 @@ type BinlogReader struct { usingGTID bool prevGset, currGset mysql.GTIDSet + en EventNotifier } // NewBinlogReader creates a new BinlogReader. -func NewBinlogReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { +func NewBinlogReader(en EventNotifier, logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { ctx, cancel := context.WithCancel(context.Background()) // only can be canceled in `Close` parser := replication.NewBinlogParser() parser.SetVerifyChecksum(true) @@ -100,6 +101,7 @@ func NewBinlogReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename), cancel: cancel, tctx: newtctx, + en: en, } } @@ -595,7 +597,7 @@ func (r *BinlogReader) parseFile( wg.Add(1) go func(latestPos int64) { defer wg.Done() - relayLogUpdatedOrNewCreated(newCtx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos, updatePathCh, updateErrCh) + relayLogUpdatedOrNewCreated(r.en, newCtx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos, updatePathCh, updateErrCh) }(latestPos) select { diff --git a/relay/relay.go b/relay/relay.go index 12cdf3a62f..dde4cea3ed 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -69,6 +69,12 @@ var NewRelay = NewRealRelay var _ Process = &Relay{} +// EventListener defines a binlog event listener of relay +type EventListener interface { + // OnEvent get called when relay processed an event successfully. + OnEvent(e *replication.BinlogEvent) +} + // Process defines mysql-like relay log process unit. type Process interface { // Init initial relat log unit @@ -117,14 +123,16 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } + el EventListener } // NewRealRelay creates an instance of Relay. -func NewRealRelay(cfg *Config) Process { +func NewRealRelay(el EventListener, cfg *Config) Process { return &Relay{ cfg: cfg, meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), logger: log.With(zap.String("component", "relay log")), + el: el, } } @@ -562,7 +570,9 @@ func (r *Relay) handleEvents( if err != nil { relayLogWriteErrorCounter.Inc() return eventIndex, err - } else if wResult.Ignore { + } + r.el.OnEvent(e) + if wResult.Ignore { r.logger.Info("ignore event by writer", zap.Reflect("header", e.Header), zap.String("reason", wResult.IgnoreReason)) diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index b0f37a74e2..e426c125ad 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -121,10 +121,11 @@ type StreamerController struct { // whether the server id is updated serverIDUpdated bool + s *Syncer } // NewStreamerController creates a new streamer controller. -func NewStreamerController( +func NewStreamerController(s *Syncer, syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, @@ -154,6 +155,7 @@ func NewStreamerController( timezone: timezone, fromDB: fromDB, closed: true, + s: s, } return streamerController @@ -232,7 +234,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(c.s, tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) diff --git a/syncer/syncer.go b/syncer/syncer.go index 4436a89a68..5ba3989aca 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -222,6 +222,8 @@ type Syncer struct { secondsBehindMaster atomic.Int64 // current task delay second behind upstream workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time + + RelayNotifyCh chan bool } // NewSyncer creates a new Syncer. @@ -264,6 +266,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.workerJobTSArray[i] = atomic.NewInt64(0) } syncer.lastCheckpointFlushedTime = time.Time{} + syncer.RelayNotifyCh = make(chan bool, 1) return syncer } @@ -322,7 +325,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.streamerController = NewStreamerController(s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + s.streamerController = NewStreamerController(s, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) if err != nil { @@ -3504,7 +3507,7 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { return false, nil } // set enableGTID to false for new streamerController - streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + streamerController := NewStreamerController(s, s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) endPos := binlog.AdjustPosition(location.Position) startPos := mysql.Position{ @@ -3561,3 +3564,7 @@ func (s *Syncer) delLoadTask() error { s.tctx.Logger.Info("delete load worker in etcd for all mode", zap.String("task", s.cfg.Name), zap.String("source", s.cfg.SourceID)) return nil } + +func (s *Syncer) Notified() chan bool { + return s.RelayNotifyCh +} \ No newline at end of file From 7a14bef2d86d3579e5c0f4b07ece3f2c06cfa319 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 12 Oct 2021 11:43:09 +0800 Subject: [PATCH 02/20] reader notify streamer reader --- dm/worker/relay.go | 30 +++++++++++-- dm/worker/source_worker.go | 13 ++---- dm/worker/subtask.go | 16 +++++-- dm/worker/subtask_holder.go | 12 ++++++ pkg/streamer/file.go | 84 ++++++++++++------------------------- pkg/streamer/reader.go | 2 +- relay/relay.go | 41 ++++++++++++++---- syncer/syncer.go | 9 ++-- 8 files changed, 120 insertions(+), 87 deletions(-) diff --git a/dm/worker/relay.go b/dm/worker/relay.go index e4fe38bf8e..650d7c2518 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -51,6 +51,10 @@ type RelayHolder interface { Result() *pb.ProcessResult // Update updates relay config online Update(ctx context.Context, cfg *config.SourceConfig) error + // RegisterListener registers a relay listener + RegisterListener(el relay.Listener) + // UnRegisterListener unregisters a relay listener + UnRegisterListener(el relay.Listener) } // NewRelayHolder is relay holder initializer @@ -76,13 +80,13 @@ type realRelayHolder struct { } // NewRealRelayHolder creates a new RelayHolder. -func NewRealRelayHolder(w *SourceWorker) RelayHolder { - cfg := relay.FromSourceCfg(w.cfg) +func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder { + cfg := relay.FromSourceCfg(sourceCfg) h := &realRelayHolder{ - cfg: w.cfg, + cfg: sourceCfg, stage: pb.Stage_New, - relay: relay.NewRelay(w, cfg), + relay: relay.NewRelay(cfg), l: log.With(zap.String("component", "relay holder")), } h.closed.Store(true) @@ -315,6 +319,18 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo { return h.relay.ActiveRelayLog() } +func (h *realRelayHolder) RegisterListener(el relay.Listener) { + h.Lock() + defer h.Unlock() + h.relay.RegisterListener(el) +} + +func (h *realRelayHolder) UnRegisterListener(el relay.Listener) { + h.Lock() + defer h.Unlock() + h.relay.UnRegisterListener(el) +} + /******************** dummy relay holder ********************/ type dummyRelayHolder struct { @@ -432,3 +448,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage { defer d.Unlock() return d.stage } + +func (d *dummyRelayHolder) RegisterListener(el relay.Listener) { +} + +func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) { +} diff --git a/dm/worker/source_worker.go b/dm/worker/source_worker.go index b3151fd822..66ad1af6fb 100644 --- a/dm/worker/source_worker.go +++ b/dm/worker/source_worker.go @@ -19,8 +19,6 @@ import ( "sync" "time" - "github.com/go-mysql-org/go-mysql/replication" - "github.com/golang/protobuf/proto" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -322,7 +320,7 @@ func (w *SourceWorker) EnableRelay() (err error) { } // 2. initial relay holder, the cfg's password need decrypt - w.relayHolder = NewRelayHolder(w) + w.relayHolder = NewRelayHolder(w.cfg) relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{ w, }) @@ -355,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) { w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay) }() + w.relayHolder.RegisterListener(w.subTaskHolder) + w.relayEnabled.Store(true) w.l.Info("relay enabled") w.subTaskHolder.resetAllSubTasks(true) @@ -387,6 +387,7 @@ func (w *SourceWorker) DisableRelay() { if w.relayHolder != nil { r := w.relayHolder w.relayHolder = nil + r.UnRegisterListener(w.subTaskHolder) r.Close() } if w.relayPurger != nil { @@ -1040,9 +1041,3 @@ func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErro return st.HandleError(ctx, req) } - -func (w *SourceWorker) OnEvent(e *replication.BinlogEvent) { - for _, s := range w.subTaskHolder.getAllSubTasks() { - s.relayNotify() - } -} diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 25a3d2ced5..dca91bb12a 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -191,14 +191,21 @@ func (st *SubTask) initUnits() error { needCloseUnits = st.units[:skipIdx] st.units = st.units[skipIdx:] + st.postInitSyncer() + + st.setCurrUnit(st.units[0]) + return nil +} + +func (st *SubTask) postInitSyncer() { + // TODO, right now initUnits create units first and then remove unnecessary units(before first non fresh unit) + // maybe can be refactored into check first, then create, so we don't need to loop all units to get syncer here for _, u := range st.units { if s, ok := u.(*syncer.Syncer); ok { st.syncer = s + break } } - - st.setCurrUnit(st.units[0]) - return nil } // Run runs the sub task. @@ -733,8 +740,9 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) func (st *SubTask) relayNotify() { if st.syncer != nil { + // skip if it's there's pending notify select { - case st.syncer.RelayNotifyCh <- true: + case st.syncer.Notified() <- struct{}{}: default: } } diff --git a/dm/worker/subtask_holder.go b/dm/worker/subtask_holder.go index 20d3bcde41..96fc6e54b5 100644 --- a/dm/worker/subtask_holder.go +++ b/dm/worker/subtask_holder.go @@ -15,6 +15,7 @@ package worker import ( "context" + "github.com/go-mysql-org/go-mysql/replication" "sync" ) @@ -88,3 +89,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask { } return result } + +// OnEvent implements relay.Listener +// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here +// as relay event need to broadcast to every syncer(most subtask have a syncer) +func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) { + h.mu.RLock() + defer h.mu.RUnlock() + for _, s := range h.subTasks { + s.relayNotify() + } +} diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index f9c45143af..a6674c205b 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -22,7 +22,6 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/pingcap/failpoint" "go.uber.org/zap" "github.com/pingcap/dm/pkg/binlog" @@ -49,9 +48,10 @@ type SwitchPath struct { nextBinlogName string } -// EventNotifier notifies +// EventNotifier notifies whether there is new binlog event written to the file type EventNotifier interface { - Notified() chan bool + // Notified returns a channel used to check whether there is new binlog event written to the file + Notified() chan interface{} } // CollectAllBinlogFiles collects all valid binlog files in dir, and returns filenames in binlog ascending order. @@ -208,75 +208,45 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { } // relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created. -// we check the size of the file first, if the size is the same as the latest file, we assume there is no new write -// so we need to check relay meta file to see if the new relay log is created. -// this func will be blocked until current filesize changed or meta file updated or context cancelled. -// we need to make sure that only one channel (updatePathCh or errCh) has events written to it. -func relayLogUpdatedOrNewCreated(n EventNotifier, ctx context.Context, watcherInterval time.Duration, dir string, - latestFilePath, latestFile string, latestFileSize int64, updatePathCh chan string, errCh chan error) { - ticker := time.NewTicker(watcherInterval) - defer ticker.Stop() - for { - select { - case <-n.Notified(): - updatePathCh <- latestFilePath - return - case <-ctx.Done(): - errCh <- terror.Annotate(ctx.Err(), "context meet error") +func relayLogUpdatedOrNewCreated(n EventNotifier, ctx context.Context, dir, latestFilePath, latestFile string, + beginOffset, latestFileSize int64, updatePathCh chan string, errCh chan error) { + // binlog file may have rotated if we read nothing last time(either it's the first read or after notified) + lastReadCnt := latestFileSize - beginOffset + if lastReadCnt == 0 { + meta := &Meta{} + _, err := toml.DecodeFile(filepath.Join(dir, utils.MetaFilename), meta) + if err != nil { + errCh <- terror.Annotate(err, "decode relay meta toml file failed") return - case <-ticker.C: + } + if meta.BinLogName != latestFile { + // we need check file size again, as the file may have been changed during our metafile check cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) if err != nil { errCh <- terror.Annotatef(err, "latestFilePath=%s latestFileSize=%d", latestFilePath, latestFileSize) return } - failpoint.Inject("CMPAlwaysReturn0", func() { - cmp = 0 - }) switch { case cmp < 0: errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) - return case cmp > 0: updatePathCh <- latestFilePath - return default: - // current watched file size have no change means that no new writes have been made - // our relay meta file will be updated immediately after receive the rotate event - // although we cannot ensure that the binlog filename in the meta is the next file after latestFile - // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) - // will find the right one - meta := &Meta{} - _, err = toml.DecodeFile(filepath.Join(dir, utils.MetaFilename), meta) - if err != nil { - errCh <- terror.Annotate(err, "decode relay meta toml file failed") - return - } - if meta.BinLogName != latestFile { - // we need check file size again, as the file may have been changed during our metafile check - cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s latestFileSize=%d", - latestFilePath, latestFileSize) - return - } - switch { - case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) - case cmp > 0: - updatePathCh <- latestFilePath - default: - nextFilePath := filepath.Join(dir, meta.BinLogName) - log.L().Info("newer relay log file is already generated", - zap.String("now file path", latestFilePath), - zap.String("new file path", nextFilePath)) - updatePathCh <- nextFilePath - } - return - } + nextFilePath := filepath.Join(dir, meta.BinLogName) + log.L().Info("newer relay log file is already generated", + zap.String("now file path", latestFilePath), + zap.String("new file path", nextFilePath)) + updatePathCh <- nextFilePath } + return } } + select { + case <-ctx.Done(): + errCh <- terror.Annotate(ctx.Err(), "context meet error") + case <-n.Notified(): + updatePathCh <- latestFilePath + } } // needSwitchSubDir checks whether the reader need to switch to next relay sub directory. diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 4f4dfd717c..0e9a03466c 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -597,7 +597,7 @@ func (r *BinlogReader) parseFile( wg.Add(1) go func(latestPos int64) { defer wg.Done() - relayLogUpdatedOrNewCreated(r.en, newCtx, watcherInterval, relayLogDir, fullPath, relayLogFile, latestPos, updatePathCh, updateErrCh) + relayLogUpdatedOrNewCreated(r.en, newCtx, relayLogDir, fullPath, relayLogFile, offset, latestPos, updatePathCh, updateErrCh) }(latestPos) select { diff --git a/relay/relay.go b/relay/relay.go index dde4cea3ed..6185b18f02 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -69,8 +69,8 @@ var NewRelay = NewRealRelay var _ Process = &Relay{} -// EventListener defines a binlog event listener of relay -type EventListener interface { +// Listener defines a binlog event listener of relay log +type Listener interface { // OnEvent get called when relay processed an event successfully. OnEvent(e *replication.BinlogEvent) } @@ -105,6 +105,10 @@ type Process interface { ResetMeta() // PurgeRelayDir will clear all contents under w.cfg.RelayDir PurgeRelayDir() error + // RegisterListener registers a relay listener + RegisterListener(el Listener) + // UnRegisterListener unregisters a relay listener + UnRegisterListener(el Listener) } // Relay relays mysql binlog to local file. @@ -123,16 +127,16 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } - el EventListener + els map[Listener]struct{} } // NewRealRelay creates an instance of Relay. -func NewRealRelay(el EventListener, cfg *Config) Process { +func NewRealRelay(cfg *Config) Process { return &Relay{ cfg: cfg, meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), logger: log.With(zap.String("component", "relay log")), - el: el, + els: make(map[Listener]struct{}), } } @@ -570,15 +574,16 @@ func (r *Relay) handleEvents( if err != nil { relayLogWriteErrorCounter.Inc() return eventIndex, err - } - r.el.OnEvent(e) - if wResult.Ignore { + } else if wResult.Ignore { r.logger.Info("ignore event by writer", zap.Reflect("header", e.Header), zap.String("reason", wResult.IgnoreReason)) r.tryUpdateActiveRelayLog(e, lastPos.Name) // even the event ignored we still need to try this update. continue } + + r.notify(e) + relayLogWriteDurationHistogram.Observe(time.Since(writeTimer).Seconds()) r.tryUpdateActiveRelayLog(e, lastPos.Name) // wrote a event, try update the current active relay log. @@ -1097,3 +1102,23 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) defer dbConn.Close() return utils.AddGSetWithPurged(ctx, resultGs, dbConn) } + +func (r *Relay) notify(e *replication.BinlogEvent) { + r.RLock() + defer r.RUnlock() + for el, _ := range r.els { + el.OnEvent(e) + } +} + +func (r *Relay) RegisterListener(el Listener) { + r.Lock() + defer r.Unlock() + r.els[el] = struct{}{} +} + +func (r *Relay) UnRegisterListener(el Listener) { + r.Lock() + defer r.Unlock() + delete(r.els, el) +} diff --git a/syncer/syncer.go b/syncer/syncer.go index 5ba3989aca..03bef163cb 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -223,7 +223,8 @@ type Syncer struct { workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time - RelayNotifyCh chan bool + // relayNotifyCh with size = 1, we only need to be notified whether binlog file of relay changed, not how many times + relayNotifyCh chan interface{} } // NewSyncer creates a new Syncer. @@ -266,7 +267,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.workerJobTSArray[i] = atomic.NewInt64(0) } syncer.lastCheckpointFlushedTime = time.Time{} - syncer.RelayNotifyCh = make(chan bool, 1) + syncer.relayNotifyCh = make(chan interface{}, 1) return syncer } @@ -3565,6 +3566,6 @@ func (s *Syncer) delLoadTask() error { return nil } -func (s *Syncer) Notified() chan bool { - return s.RelayNotifyCh +func (s *Syncer) Notified() chan interface{} { + return s.relayNotifyCh } \ No newline at end of file From e4f2c7afe069257c50f18855f8ac12bce90003e2 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 13 Oct 2021 20:29:43 +0800 Subject: [PATCH 03/20] move switch path check into binlog file change check --- pkg/streamer/file.go | 140 +++++++++++++++++++++-------------------- pkg/streamer/reader.go | 24 ++++--- 2 files changed, 83 insertions(+), 81 deletions(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index a6674c205b..ed0a629bfb 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -15,19 +15,16 @@ package streamer import ( "context" - "os" - "path" - "path/filepath" - "sort" - "time" - "github.com/BurntSushi/toml" - "go.uber.org/zap" - "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + "go.uber.org/zap" + "os" + "path" + "path/filepath" + "sort" ) // FileCmp is a compare condition used when collecting binlog files. @@ -207,96 +204,103 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { } } +type relayLogFileChecker struct { + n EventNotifier + relayDir, currentUUID string + latestRelayLogDir, latestFilePath, latestFile string + beginOffset, endOffset int64 +} + // relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created. -func relayLogUpdatedOrNewCreated(n EventNotifier, ctx context.Context, dir, latestFilePath, latestFile string, - beginOffset, latestFileSize int64, updatePathCh chan string, errCh chan error) { +func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, updatePathCh chan string, switchCh chan SwitchPath, errCh chan error) { // binlog file may have rotated if we read nothing last time(either it's the first read or after notified) - lastReadCnt := latestFileSize - beginOffset + lastReadCnt := r.endOffset - r.beginOffset if lastReadCnt == 0 { meta := &Meta{} - _, err := toml.DecodeFile(filepath.Join(dir, utils.MetaFilename), meta) + _, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta) if err != nil { errCh <- terror.Annotate(err, "decode relay meta toml file failed") return } - if meta.BinLogName != latestFile { + if meta.BinLogName != r.latestFile { // we need check file size again, as the file may have been changed during our metafile check - cmp, err := fileSizeUpdated(latestFilePath, latestFileSize) + cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s latestFileSize=%d", latestFilePath, latestFileSize) + errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) return } switch { case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(latestFilePath) + errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) case cmp > 0: - updatePathCh <- latestFilePath + updatePathCh <- r.latestFilePath default: - nextFilePath := filepath.Join(dir, meta.BinLogName) + nextFilePath := filepath.Join(r.latestRelayLogDir, meta.BinLogName) log.L().Info("newer relay log file is already generated", - zap.String("now file path", latestFilePath), + zap.String("now file path", r.latestFilePath), zap.String("new file path", nextFilePath)) updatePathCh <- nextFilePath } return + } else { + switchPath, err := r.getSwitchPath() + if err != nil { + errCh <- err + return + } + if switchPath != nil { + // we need check file size again, as the file may have been changed during path check + cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) + if err != nil { + errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) + return + } + switch { + case cmp < 0: + errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) + case cmp > 0: + updatePathCh <- r.latestFilePath + default: + switchCh <- *switchPath + } + return + } } } + select { case <-ctx.Done(): errCh <- terror.Annotate(ctx.Err(), "context meet error") - case <-n.Notified(): - updatePathCh <- latestFilePath + case <-r.n.Notified(): + // the notified event may not be the current relay file + // in that case we may read 0 bytes and check at upper "if statement" + updatePathCh <- r.latestFilePath } } -// needSwitchSubDir checks whether the reader need to switch to next relay sub directory. -func needSwitchSubDir(ctx context.Context, relayDir, currentUUID string, switchCh chan SwitchPath, errCh chan error) { - var ( - err error - nextUUID string - nextBinlogName string - uuids []string - ) - - ticker := time.NewTicker(watcherInterval) - defer func() { - ticker.Stop() - if err != nil { - errCh <- err - } - }() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - // reload uuid - uuids, err = utils.ParseUUIDIndex(path.Join(relayDir, utils.UUIDIndexFilename)) - if err != nil { - return - } - nextUUID, _, err = getNextUUID(currentUUID, uuids) - if err != nil { - return - } - if len(nextUUID) == 0 { - continue - } - - // try get the first binlog file in next sub directory - nextBinlogName, err = getFirstBinlogName(relayDir, nextUUID) - if err != nil { - // because creating sub directory and writing relay log file are not atomic - // just continue to observe subdir - if terror.ErrBinlogFilesNotFound.Equal(err) { - err = nil - continue - } - return - } +func (r *relayLogFileChecker) getSwitchPath() (*SwitchPath, error) { + // reload uuid + uuids, err := utils.ParseUUIDIndex(path.Join(r.relayDir, utils.UUIDIndexFilename)) + if err != nil { + return nil, err + } + nextUUID, _, err := getNextUUID(r.currentUUID, uuids) + if err != nil { + return nil, err + } + if len(nextUUID) == 0 { + return nil, nil + } - switchCh <- SwitchPath{nextUUID, nextBinlogName} - return + // try to get the first binlog file in next subdirectory + nextBinlogName, err := getFirstBinlogName(r.relayDir, nextUUID) + if err != nil { + // because creating subdirectory and writing relay log file are not atomic + if terror.ErrBinlogFilesNotFound.Equal(err) { + return nil, nil } + return nil, err } + + return &SwitchPath{nextUUID, nextBinlogName}, nil } diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 0e9a03466c..25ebbe28d4 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -588,28 +588,26 @@ func (r *BinlogReader) parseFile( wg.Wait() }() - wg.Add(1) - go func() { - defer wg.Done() - needSwitchSubDir(newCtx, r.cfg.RelayDir, currentUUID, switchCh, switchErrCh) - }() - wg.Add(1) go func(latestPos int64) { defer wg.Done() - relayLogUpdatedOrNewCreated(r.en, newCtx, relayLogDir, fullPath, relayLogFile, offset, latestPos, updatePathCh, updateErrCh) + checker := relayLogFileChecker{ + n: r.en, + relayDir: r.cfg.RelayDir, + currentUUID: currentUUID, + latestRelayLogDir: relayLogDir, + latestFilePath: fullPath, + latestFile: relayLogFile, + beginOffset: offset, + endOffset: latestPos, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, updateErrCh) }(latestPos) select { case <-ctx.Done(): return false, false, 0, "", "", false, nil case switchResp := <-switchCh: - // wait to ensure old file not updated - pathUpdated := utils.WaitSomething(3, watcherInterval, func() bool { return len(updatePathCh) > 0 }) - if pathUpdated { - // re-parse it - return false, true, latestPos, "", "", replaceWithHeartbeat, nil - } // update new uuid if err = r.updateUUIDs(); err != nil { return false, false, 0, "", "", false, nil From b067a070ebf4e1950b6fcc965bbc1e63bbe038b4 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Oct 2021 13:39:37 +0800 Subject: [PATCH 04/20] add timer to relayLogUpdatedOrNewCreated to handle special case --- pkg/streamer/file.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index ed0a629bfb..6f8cd09e45 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -25,6 +25,7 @@ import ( "path" "path/filepath" "sort" + "time" ) // FileCmp is a compare condition used when collecting binlog files. @@ -268,12 +269,19 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u } } + timer := time.NewTimer(watcherInterval) + defer timer.Stop() select { case <-ctx.Done(): errCh <- terror.Annotate(ctx.Err(), "context meet error") case <-r.n.Notified(): // the notified event may not be the current relay file - // in that case we may read 0 bytes and check at upper "if statement" + // in that case we may read 0 bytes and check again + updatePathCh <- r.latestFilePath + case <-timer.C: + // for a task start after source shutdown or there's no new write, it'll not be notified, + // and if it's reading from dir 000001 and there's need to switch dir to 000002, + // we stop waiting after watcherInterval to give it a chance to check again updatePathCh <- r.latestFilePath } } From 41568ea642461e24245544e46f17697c5e139945 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Oct 2021 15:30:48 +0800 Subject: [PATCH 05/20] ignore duplicate FORMAT event --- pkg/streamer/reader.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 25ebbe28d4..321be4f98c 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -453,7 +453,12 @@ func (r *BinlogReader) parseFile( switch ev := e.Event.(type) { case *replication.FormatDescriptionEvent: - // ignore FORMAT_DESCRIPTION event, because go-mysql will send this fake event + // go-mysql will send a duplicate FormatDescriptionEvent event when offset > 4, ignore it + if offset > 4 { + return nil + } + // else just update lastPos + latestPos = int64(e.Header.LogPos) case *replication.RotateEvent: // add master UUID suffix to pos.Name parsed, _ := binlog.ParseFilename(string(ev.NextLogName)) From 29c9fd45f29b32374989a0f91770948b125c499b Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Oct 2021 14:38:06 +0800 Subject: [PATCH 06/20] add/fix unit test --- dm/worker/relay_test.go | 6 + pkg/streamer/file_test.go | 420 ++++++++++++++++++----------- pkg/streamer/reader_test.go | 211 ++++----------- syncer/streamer_controller_test.go | 4 +- 4 files changed, 326 insertions(+), 315 deletions(-) diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index cccc9649be..f6a110b35e 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -47,6 +47,12 @@ type DummyRelay struct { reloadErr error } +func (d *DummyRelay) RegisterListener(el relay.Listener) { +} + +func (d *DummyRelay) UnRegisterListener(el relay.Listener) { +} + // NewDummyRelay creates an instance of dummy Relay. func NewDummyRelay(cfg *relay.Config) relay.Process { return &DummyRelay{} diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 42b6e09a83..07524ad81f 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -20,16 +20,13 @@ import ( "os" "path" "path/filepath" - "sync" "time" "github.com/BurntSushi/toml" . "github.com/pingcap/check" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/errors" ) var _ = Suite(&testFileSuite{}) @@ -296,30 +293,59 @@ func (t *testFileSuite) TestFileSizeUpdated(c *C) { c.Assert(cmp, Equals, 1) } +type dummyEventNotifier struct { + ch chan interface{} +} + +func (d *dummyEventNotifier) Notified() chan interface{} { + return d.ch +} + +func newDummyEventNotifier(n int) EventNotifier { + d := &dummyEventNotifier{ + ch: make(chan interface{}, n), + } + for i := 0; i < n; i++ { + d.ch <- struct{}{} + } + return d +} + func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { var ( relayFiles = []string{ "mysql-bin.000001", "mysql-bin.000002", - "mysql-bin.000003", } - binlogPos = uint32(4) - binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" - relayPaths = make([]string, len(relayFiles)) - data = []byte("meaningless file content") - size = int64(len(data)) - watcherInterval = 100 * time.Millisecond - updatePathCh = make(chan string, 1) - errCh = make(chan error, 1) + binlogPos = uint32(4) + binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" + relayPaths = make([]string, len(relayFiles)) + data = []byte("meaningless file content") + size = int64(len(data)) + updatePathCh = make(chan string, 1) + switchCh = make(chan SwitchPath, 1) + errCh = make(chan error, 1) ) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + en := newDummyEventNotifier(0) // a. relay log dir not exist - relayLogUpdatedOrNewCreated(ctx, watcherInterval, "/not-exists-directory", "/not-exists-filepath", "not-exists-file", 0, updatePathCh, errCh) + checker := &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: "/not-exists-directory", + latestFilePath: "/not-exists-filepath", + latestFile: "not-exists-file", + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err := <-errCh c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") @@ -330,134 +356,239 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { relayPaths[i] = filepath.Join(subDir, rf) } - // b. relay file not found - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) + rotateRelayFile := func(filename string) { + meta := Meta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} + metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) + c.Assert(err2, IsNil) + err = toml.NewEncoder(metaFile).Encode(meta) + c.Assert(err, IsNil) + _ = metaFile.Close() + } + + // meta not found + checker = &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*no such file or directory*") + + // write meta + rotateRelayFile(relayFiles[0]) + + // relay file not found + checker = &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: "not-exists-file", + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err = <-errCh c.Assert(err, ErrorMatches, ".*no such file or directory*") // create the first relay file - err = os.WriteFile(relayPaths[0], nil, 0o600) + err = os.WriteFile(relayPaths[0], data, 0o600) c.Assert(err, IsNil) + // rotate relay file + rotateRelayFile(relayFiles[1]) - // write meta file - meta := Meta{BinLogName: relayFiles[0], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) - c.Assert(err2, IsNil) - err = toml.NewEncoder(metaFile).Encode(meta) + // file decreased when meta changed + err = os.WriteFile(relayPaths[0], nil, 0o600) c.Assert(err, IsNil) - metaFile.Close() - - // c. latest file path not exist - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, "/no-exists-filepath", relayFiles[0], 0, updatePathCh, errCh) + checker = &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: size, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err = <-errCh - c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") + c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") + + // return changed file in meta + checker = &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up := <-updatePathCh + c.Assert(up, Equals, relayPaths[1]) - // 1. file increased + // file increased when checking meta err = os.WriteFile(relayPaths[0], data, 0o600) - c.Assert(err, IsNil) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[0]) - }() - wg.Wait() - - // 2. file decreased when adding watching - err = os.WriteFile(relayPaths[0], nil, 0o600) - c.Assert(err, IsNil) - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], size, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") - }() - wg.Wait() - - // 3. new binlog file created - err = os.WriteFile(relayPaths[1], nil, 0o600) - c.Assert(err, IsNil) - wg.Add(1) - go func() { - defer wg.Done() - // update meta file - meta = Meta{BinLogName: relayFiles[1], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[0], relayFiles[0], 0, updatePathCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - }() - wg.Wait() - - // 4. file increased when checking meta - wg.Add(1) - go func() { - defer wg.Done() - // update meta file to new file - meta = Meta{BinLogName: relayFiles[2], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - fi, err4 := os.Stat(relayPaths[1]) - c.Assert(err4, IsNil) - curSize := fi.Size() - // write old binlog file - err5 := os.WriteFile(relayPaths[1], data, 0o600) - c.Assert(err5, IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0", `return(true)`), IsNil) - //nolint:errcheck - defer failpoint.Disable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0") - wg.Add(1) - go func() { - defer wg.Done() - relayLogUpdatedOrNewCreated(ctx, watcherInterval, subDir, relayPaths[1], relayFiles[1], curSize, updatePathCh, errCh) - }() - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - c.Assert(len(errCh), Equals, 0) - c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/CMPAlwaysReturn0"), IsNil) - }() - wg.Wait() - - // 5. context timeout (no new write) - // revert meta file to old file - meta = Meta{BinLogName: relayFiles[1], BinLogPos: binlogPos, BinlogGTID: binlogGTID} - var buf bytes.Buffer - enc := toml.NewEncoder(&buf) - c.Assert(enc.Encode(meta), IsNil) - c.Assert(utils.WriteFileAtomic(path.Join(subDir, utils.MetaFilename), buf.Bytes(), 0o644), IsNil) - - fi, err2 := os.Stat(relayPaths[1]) - c.Assert(err2, IsNil) - curSize := fi.Size() - newCtx, cancel := context.WithTimeout(ctx, time.Second) + checker = &relayLogFileChecker{ + n: en, + relayDir: "", + currentUUID: "", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) + c.Assert(len(switchCh), Equals, 0) + c.Assert(len(errCh), Equals, 0) + + // context timeout (no new write) + newCtx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() - relayLogUpdatedOrNewCreated(newCtx, watcherInterval, subDir, relayPaths[1], relayFiles[1], curSize, updatePathCh, errCh) + relayDir := c.MkDir() + t.writeUUIDs(c, relayDir, []string{"xxx.000001", "xxx.000002"}) + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 1) c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) err7 := <-errCh c.Assert(err7, ErrorMatches, "context meet error:.*") + + // binlog dir switched, but last file not exists + _ = os.MkdirAll(filepath.Join(relayDir, "xxx.000002"), 0o700) + _ = os.WriteFile(filepath.Join(relayDir, "xxx.000002", "mysql.000001"), nil, 0o600) + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*no such file or directory*") + + // binlog dir switched, but last file smaller + err = os.WriteFile(relayPaths[1], nil, 0o600) + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: size, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 1) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 0) + err = <-errCh + c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") + + // binlog dir switched, but last file bigger + err = os.WriteFile(relayPaths[1], data, 0o600) + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[1]) + + // binlog dir switched, but last file not changed + err = os.WriteFile(relayPaths[1], nil, 0o600) + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000001", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[1], + latestFile: relayFiles[1], + beginOffset: 0, + endOffset: 0, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 0) + c.Assert(len(switchCh), Equals, 1) + sp := <-switchCh + c.Assert(sp.nextUUID, Equals, "xxx.000002") + c.Assert(sp.nextBinlogName, Equals, "mysql.000001") + + // got notified + en = newDummyEventNotifier(1) + newCtx, cancel = context.WithTimeout(ctx, time.Second) + defer cancel() + checker = &relayLogFileChecker{ + n: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) } -func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { +func (t *testFileSuite) TestGetSwitchPath(c *C) { var ( relayDir = c.MkDir() UUIDs = []string{ @@ -466,47 +597,34 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { "53ea0ed1-9bf8-11e6-8bea-64006a897c71.000003", } currentUUID = UUIDs[len(UUIDs)-1] // no next UUID - switchCh = make(chan SwitchPath, 1) - errCh = make(chan error, 1) ) - ctx := context.Background() - // invalid UUID in UUIDs, error UUIDs = append(UUIDs, "invalid.uuid") t.writeUUIDs(c, relayDir, UUIDs) - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - err := <-errCh + checker := &relayLogFileChecker{ + relayDir: relayDir, + currentUUID: currentUUID, + } + switchPath, err := checker.getSwitchPath() + c.Assert(switchPath, IsNil) c.Assert(err, ErrorMatches, ".*not valid.*") UUIDs = UUIDs[:len(UUIDs)-1] // remove the invalid UUID t.writeUUIDs(c, relayDir, UUIDs) - // no next UUID, no need switch - newCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - needSwitchSubDir(newCtx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - // no next sub directory - currentUUID = UUIDs[0] - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh + checker.currentUUID = UUIDs[0] + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath, IsNil) c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) // create next sub directory, block err = os.Mkdir(filepath.Join(relayDir, UUIDs[1]), 0o700) c.Assert(err, IsNil) - newCtx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) - defer cancel2() - needSwitchSubDir(newCtx2, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath, IsNil) + c.Assert(err, IsNil) // create a relay log file in the next sub directory nextBinlogPath := filepath.Join(relayDir, UUIDs[1], "mysql-bin.000001") @@ -516,12 +634,10 @@ func (t *testFileSuite) TestNeedSwitchSubDir(c *C) { c.Assert(err, IsNil) // switch to the next - needSwitchSubDir(ctx, relayDir, currentUUID, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(switchCh), Equals, 1) - res := <-switchCh - c.Assert(res.nextUUID, Equals, UUIDs[1]) - c.Assert(res.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) + switchPath, err = checker.getSwitchPath() + c.Assert(switchPath.nextUUID, Equals, UUIDs[1]) + c.Assert(switchPath.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) + c.Assert(err, IsNil) } // nolint:unparam diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 84eca75b78..cdfcd6b7ef 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -85,7 +85,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { currentUUID := "invalid-current-uuid" relayDir := filepath.Join(baseDir, currentUUID) cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r := NewBinlogReader(log.L(), cfg) + r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, ErrorMatches, ".*invalid-current-uuid.*") @@ -101,7 +101,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { relayDir = filepath.Join(baseDir, currentUUID) fullPath := filepath.Join(relayDir, filename) cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) // relay log file not exists, failed needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( @@ -233,7 +233,7 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { c.Assert(nextBinlogName, Equals, "") c.Assert(replaceWithHeartbeat, Equals, false) - // should only got a RotateEvent and a FormatDescriptionEven + // should only get a RotateEvent i = 0 for { ev, err2 := s.GetEvent(ctx) @@ -242,12 +242,10 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { case replication.ROTATE_EVENT: c.Assert(ev.RawData, DeepEquals, rotateEv.RawData) i++ - case replication.FORMAT_DESCRIPTION_EVENT: - i++ default: c.Fatalf("got unexpected event %+v", ev.Header) } - if i >= 2 { + if i >= 1 { break } } @@ -256,102 +254,6 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { cancel() } -func (t *testReaderSuite) TestParseFileRelayLogUpdatedOrNewCreated(c *C) { - var ( - filename = "test-mysql-bin.000001" - nextFilename = "test-mysql-bin.000002" - notUsedGTIDSetStr = t.lastGTID.String() - baseDir = c.MkDir() - offset int64 - firstParse = true - possibleLast = true - baseEvents, lastPos, lastGTID = t.genBinlogEvents(c, t.lastPos, t.lastGTID) - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - relayDir = filepath.Join(baseDir, currentUUID) - fullPath = filepath.Join(relayDir, filename) - nextPath = filepath.Join(relayDir, nextFilename) - s = newLocalStreamer() - cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) - ) - - // create the current relay log file and write some events - err := os.MkdirAll(relayDir, 0o700) - c.Assert(err, IsNil) - f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) - c.Assert(err, IsNil) - defer f.Close() - _, err = f.Write(replication.BinLogFileHeader) - c.Assert(err, IsNil) - for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) - } - t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - - // no valid update for relay sub dir, timeout, no error - ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) - defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - t.purgeStreamer(c, s) - - // current relay log file updated, need to re-parse it - var wg sync.WaitGroup - wg.Add(1) - extraEvents, _, _ := t.genBinlogEvents(c, lastPos, lastGTID) - go func() { - defer wg.Done() - time.Sleep(500 * time.Millisecond) // wait parseFile started - _, err2 := f.Write(extraEvents[0].RawData) - c.Assert(err2, IsNil) - }() - ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel2() - t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsTrue) - c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - wg.Wait() - t.purgeStreamer(c, s) - - // new relay log file created, need to re-collect files - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(500 * time.Millisecond) // wait parseFile started - err2 := os.WriteFile(nextPath, replication.BinLogFileHeader, 0o600) - c.Assert(err2, IsNil) - }() - ctx3, cancel3 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel3() - t.createMetaFile(c, relayDir, nextFilename, uint32(offset), notUsedGTIDSetStr) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(extraEvents[0].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - wg.Wait() - t.purgeStreamer(c, s) -} - func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { var ( filename = "test-mysql-bin.000001" @@ -361,7 +263,6 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { offset int64 firstParse = true possibleLast = true - baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" switchedUUID = "b60868af-5a6f-11e9-9ea3-0242ac160007.000002" relayDir = filepath.Join(baseDir, currentUUID) @@ -370,7 +271,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { nextFullPath = filepath.Join(nextRelayDir, nextFilename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) ) // create the current relay log file and write some events @@ -380,11 +281,8 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { c.Assert(err, IsNil) defer f.Close() _, err = f.Write(replication.BinLogFileHeader) + offset = 4 c.Assert(err, IsNil) - for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) - } t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) // invalid UUID in UUID list, error @@ -444,7 +342,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { fullPath = filepath.Join(relayDir, filename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) ) // create the current relay log file and write some events @@ -462,7 +360,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) + c.Assert(needReParse, IsTrue) c.Assert(latestPos, Equals, int64(0)) c.Assert(nextUUID, Equals, "") c.Assert(nextBinlogName, Equals, "") @@ -480,6 +378,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // meet `err EOF` error (when parsing binlog event) ignored ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() + r.en = newDummyEventNotifier(1) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) @@ -493,9 +392,10 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { func (t *testReaderSuite) TestUpdateUUIDs(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) ) c.Assert(r.uuids, HasLen, 0) @@ -520,6 +420,7 @@ func (t *testReaderSuite) TestUpdateUUIDs(c *C) { func (t *testReaderSuite) TestStartSyncByPos(c *C) { var ( + en = newDummyEventNotifier(1) filenamePrefix = "test-mysql-bin.00000" notUsedGTIDSetStr = t.lastGTID.String() baseDir = c.MkDir() @@ -531,7 +432,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { "b60868af-5a6f-11e9-9ea3-0242ac160008.000003", } cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -575,18 +476,8 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { // get events from the streamer ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - obtainBaseEvents := make([]*replication.BinlogEvent, 0, (1+2+3)*len(baseEvents)) - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - if len(obtainBaseEvents) == cap(obtainBaseEvents) { - break - } - } + + obtainBaseEvents := readNEvents(ctx, c, s, (1+2+3)*len(baseEvents)) t.verifyNoEventsInStreamer(c, s) // verify obtain base events for i := 0; i < len(obtainBaseEvents); i += len(baseEvents) { @@ -651,12 +542,29 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { r.Close() } +func readNEvents(ctx context.Context, c *C, s Streamer, l int) []*replication.BinlogEvent { + var result []*replication.BinlogEvent + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { + continue // ignore fake event + } + result = append(result, ev) + // start from the first format description event + if len(result) == l { + break + } + } + return result +} + func (t *testReaderSuite) TestStartSyncByGTID(c *C) { var ( baseDir = c.MkDir() events []*replication.BinlogEvent cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(0), log.L(), cfg) lastPos uint32 lastGTID gtid.Set previousGset, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "") @@ -824,19 +732,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var obtainBaseEvents []*replication.BinlogEvent - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - // start from the first format description event - if len(obtainBaseEvents) == len(allEvents) { - break - } - } + obtainBaseEvents := readNEvents(ctx, c, s, len(allEvents)) preGset, err := gmysql.ParseGTIDSet(gmysql.MySQLFlavor, "") c.Assert(err, IsNil) @@ -859,7 +755,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { } r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) excludeStrs := []string{} // exclude first uuid @@ -877,20 +773,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { // StartSyncByGtid exclude first uuid s, err = r.StartSyncByGTID(excludeGset) c.Assert(err, IsNil) - obtainBaseEvents = []*replication.BinlogEvent{} - - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 && ev.Header.LogPos == 0 { - continue // ignore fake event - } - obtainBaseEvents = append(obtainBaseEvents, ev) - // start from the first format description event - if len(obtainBaseEvents) == len(allEvents) { - break - } - } + obtainBaseEvents = readNEvents(ctx, c, s, len(allEvents)) gset := excludeGset.Clone() // every gtid event not from first uuid should become heartbeat event @@ -916,12 +799,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(excludeGset) // error because file has been purge c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue) @@ -930,12 +813,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) _, err = r.StartSyncByGTID(excludeGset) // error because subdir has been purge c.Assert(err, ErrorMatches, ".*no such file or directory.*") @@ -952,7 +835,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) - r := NewBinlogReader(log.L(), cfg) + r := NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) err := r.checkRelayPos(startPos) c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") @@ -971,7 +854,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) // write UUIDs into index file - r = NewBinlogReader(log.L(), cfg) // create a new reader + r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) // create a new reader uuidBytes := t.uuidListToBytes(c, UUIDs) err = os.WriteFile(r.indexPath, uuidBytes, 0o600) c.Assert(err, IsNil) @@ -1014,9 +897,10 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) mysqlGset, _ = gmysql.ParseMysqlGTIDSet("b60868af-5a6f-11e9-9ea3-0242ac160006:1-6") mariadbGset, _ = gmysql.ParseMariadbGTIDSet("0-1-5") ) @@ -1048,9 +932,10 @@ func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { func (t *testReaderSuite) TestReParseUsingGTID(c *C) { var ( + en = newDummyEventNotifier(1) baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = NewBinlogReader(log.L(), cfg) + r = NewBinlogReader(en, log.L(), cfg) uuid = "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001" gtidStr = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" file = "mysql.000001" @@ -1133,6 +1018,10 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) { time.Sleep(time.Second) _, err = f.Write(events[i].RawData) c.Assert(err, IsNil) + select { + case en.Notified() <- struct{}{}: + default: + } } wg.Wait() } diff --git a/syncer/streamer_controller_test.go b/syncer/streamer_controller_test.go index f9c34cd320..8afc75fd1e 100644 --- a/syncer/streamer_controller_test.go +++ b/syncer/streamer_controller_test.go @@ -20,7 +20,7 @@ func (s *testSyncerSuite) TestIsConnectionRefusedError(c *C) { } func (s *testSyncerSuite) TestCanErrorRetry(c *C) { - controller := NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, + controller := NewStreamerController(&Syncer{}, replication.BinlogSyncerConfig{}, true, nil, LocalBinlog, "", nil) mockErr := errors.New("test") @@ -37,7 +37,7 @@ func (s *testSyncerSuite) TestCanErrorRetry(c *C) { }() // test with remote binlog - controller = NewStreamerController(replication.BinlogSyncerConfig{}, true, nil, + controller = NewStreamerController(&Syncer{}, replication.BinlogSyncerConfig{}, true, nil, RemoteBinlog, "", nil) c.Assert(controller.CanRetry(mockErr), IsTrue) From 9df7eed10a9f9b252433a64f46d5982dade23c90 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Oct 2021 16:20:28 +0800 Subject: [PATCH 07/20] make lint happy --- dm/worker/subtask_holder.go | 5 +-- loader/lightning.go | 19 +++++------ pkg/streamer/file.go | 63 +++++++++++++++++++------------------ pkg/streamer/file_test.go | 3 +- relay/relay.go | 4 +-- syncer/syncer.go | 2 +- 6 files changed, 51 insertions(+), 45 deletions(-) diff --git a/dm/worker/subtask_holder.go b/dm/worker/subtask_holder.go index 96fc6e54b5..e90aa9fd98 100644 --- a/dm/worker/subtask_holder.go +++ b/dm/worker/subtask_holder.go @@ -15,8 +15,9 @@ package worker import ( "context" - "github.com/go-mysql-org/go-mysql/replication" "sync" + + "github.com/go-mysql-org/go-mysql/replication" ) // subTaskHolder holds subtask instances. @@ -92,7 +93,7 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask { // OnEvent implements relay.Listener // only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here -// as relay event need to broadcast to every syncer(most subtask have a syncer) +// as relay event need to broadcast to every syncer(most subtask have a syncer). func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) { h.mu.RLock() defer h.mu.RUnlock() diff --git a/loader/lightning.go b/loader/lightning.go index d87fb8005a..51cbc17a8b 100644 --- a/loader/lightning.go +++ b/loader/lightning.go @@ -18,16 +18,7 @@ import ( "path/filepath" "sync" - "github.com/docker/go-units" - "github.com/pingcap/failpoint" - "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/dbutil" - "github.com/pingcap/tidb/br/pkg/lightning" - "github.com/pingcap/tidb/br/pkg/lightning/common" - lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" - "go.etcd.io/etcd/clientv3" - "go.uber.org/atomic" - "go.uber.org/zap" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" @@ -37,6 +28,16 @@ import ( tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/utils" + + "github.com/docker/go-units" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/br/pkg/lightning" + "github.com/pingcap/tidb/br/pkg/lightning/common" + lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" + "go.etcd.io/etcd/clientv3" + "go.uber.org/atomic" + "go.uber.org/zap" ) const ( diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index 6f8cd09e45..ec0b20aa3f 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -15,17 +15,19 @@ package streamer import ( "context" - "github.com/BurntSushi/toml" - "github.com/pingcap/dm/pkg/binlog" - "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" - "go.uber.org/zap" "os" "path" "path/filepath" "sort" "time" + + "github.com/BurntSushi/toml" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // FileCmp is a compare condition used when collecting binlog files. @@ -46,7 +48,7 @@ type SwitchPath struct { nextBinlogName string } -// EventNotifier notifies whether there is new binlog event written to the file +// EventNotifier notifies whether there is new binlog event written to the file. type EventNotifier interface { // Notified returns a channel used to check whether there is new binlog event written to the file Notified() chan interface{} @@ -206,7 +208,7 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { } type relayLogFileChecker struct { - n EventNotifier + n EventNotifier relayDir, currentUUID string latestRelayLogDir, latestFilePath, latestFile string beginOffset, endOffset int64 @@ -225,9 +227,9 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u } if meta.BinLogName != r.latestFile { // we need check file size again, as the file may have been changed during our metafile check - cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) + cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset) + if err2 != nil { + errCh <- terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) return } switch { @@ -243,29 +245,30 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u updatePathCh <- nextFilePath } return - } else { - switchPath, err := r.getSwitchPath() + } + + // maybe UUID index file changed + switchPath, err := r.getSwitchPath() + if err != nil { + errCh <- err + return + } + if switchPath != nil { + // we need check file size again, as the file may have been changed during path check + cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) if err != nil { - errCh <- err + errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) return } - if switchPath != nil { - // we need check file size again, as the file may have been changed during path check - cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) - return - } - switch { - case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) - case cmp > 0: - updatePathCh <- r.latestFilePath - default: - switchCh <- *switchPath - } - return + switch { + case cmp < 0: + errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) + case cmp > 0: + updatePathCh <- r.latestFilePath + default: + switchCh <- *switchPath } + return } } diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 07524ad81f..76b6485837 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -24,9 +24,10 @@ import ( "github.com/BurntSushi/toml" . "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" - "github.com/pingcap/errors" ) var _ = Suite(&testFileSuite{}) diff --git a/relay/relay.go b/relay/relay.go index 6185b18f02..38767b6271 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -69,7 +69,7 @@ var NewRelay = NewRealRelay var _ Process = &Relay{} -// Listener defines a binlog event listener of relay log +// Listener defines a binlog event listener of relay log. type Listener interface { // OnEvent get called when relay processed an event successfully. OnEvent(e *replication.BinlogEvent) @@ -1106,7 +1106,7 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) func (r *Relay) notify(e *replication.BinlogEvent) { r.RLock() defer r.RUnlock() - for el, _ := range r.els { + for el := range r.els { el.OnEvent(e) } } diff --git a/syncer/syncer.go b/syncer/syncer.go index 03bef163cb..7c5b47cc70 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3568,4 +3568,4 @@ func (s *Syncer) delLoadTask() error { func (s *Syncer) Notified() chan interface{} { return s.relayNotifyCh -} \ No newline at end of file +} From 4b2aa47e67dc3763c515125461d6c170814ffc9f Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 15 Oct 2021 12:15:19 +0800 Subject: [PATCH 08/20] add comments to public if --- relay/relay.go | 2 ++ syncer/syncer.go | 1 + 2 files changed, 3 insertions(+) diff --git a/relay/relay.go b/relay/relay.go index 38767b6271..58caafc26f 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -1111,12 +1111,14 @@ func (r *Relay) notify(e *replication.BinlogEvent) { } } +// RegisterListener implements Process.RegisterListener func (r *Relay) RegisterListener(el Listener) { r.Lock() defer r.Unlock() r.els[el] = struct{}{} } +// UnRegisterListener implements Process.UnRegisterListener func (r *Relay) UnRegisterListener(el Listener) { r.Lock() defer r.Unlock() diff --git a/syncer/syncer.go b/syncer/syncer.go index 7c5b47cc70..efab5e69e5 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3566,6 +3566,7 @@ func (s *Syncer) delLoadTask() error { return nil } +// Notified implements streamer.EventNotifier func (s *Syncer) Notified() chan interface{} { return s.relayNotifyCh } From b865044545808b9fbce610d71329fcc26cd20611 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 15 Oct 2021 12:21:07 +0800 Subject: [PATCH 09/20] fix comments --- dm/worker/subtask.go | 2 +- relay/relay.go | 4 ++-- syncer/syncer.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index dca91bb12a..8c2afbd057 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -740,7 +740,7 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) func (st *SubTask) relayNotify() { if st.syncer != nil { - // skip if it's there's pending notify + // skip if there's pending notify select { case st.syncer.Notified() <- struct{}{}: default: diff --git a/relay/relay.go b/relay/relay.go index 58caafc26f..a602f9dda4 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -1111,14 +1111,14 @@ func (r *Relay) notify(e *replication.BinlogEvent) { } } -// RegisterListener implements Process.RegisterListener +// RegisterListener implements Process.RegisterListener. func (r *Relay) RegisterListener(el Listener) { r.Lock() defer r.Unlock() r.els[el] = struct{}{} } -// UnRegisterListener implements Process.UnRegisterListener +// UnRegisterListener implements Process.UnRegisterListener. func (r *Relay) UnRegisterListener(el Listener) { r.Lock() defer r.Unlock() diff --git a/syncer/syncer.go b/syncer/syncer.go index efab5e69e5..4f10d85bf2 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3566,7 +3566,7 @@ func (s *Syncer) delLoadTask() error { return nil } -// Notified implements streamer.EventNotifier +// Notified implements streamer.EventNotifier. func (s *Syncer) Notified() chan interface{} { return s.relayNotifyCh } From 3ab6475b1a626470c9d60b9c560d6d9f933b49b6 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Oct 2021 16:24:30 +0800 Subject: [PATCH 10/20] fix lint --- loader/lightning.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/loader/lightning.go b/loader/lightning.go index 51cbc17a8b..d87fb8005a 100644 --- a/loader/lightning.go +++ b/loader/lightning.go @@ -18,7 +18,16 @@ import ( "path/filepath" "sync" + "github.com/docker/go-units" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb/br/pkg/lightning" + "github.com/pingcap/tidb/br/pkg/lightning/common" + lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" + "go.etcd.io/etcd/clientv3" + "go.uber.org/atomic" + "go.uber.org/zap" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" @@ -28,16 +37,6 @@ import ( tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/utils" - - "github.com/docker/go-units" - "github.com/pingcap/failpoint" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/br/pkg/lightning" - "github.com/pingcap/tidb/br/pkg/lightning/common" - lcfg "github.com/pingcap/tidb/br/pkg/lightning/config" - "go.etcd.io/etcd/clientv3" - "go.uber.org/atomic" - "go.uber.org/zap" ) const ( From ea8fd6ee2ff3848bb714a8da6a9891b3f52d323f Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Oct 2021 17:09:13 +0800 Subject: [PATCH 11/20] abstruct notifier chan in syncer into a struct and pass in from subtask --- cmd/dm-syncer/main.go | 2 +- dm/worker/server_test.go | 4 ++- dm/worker/source_worker_test.go | 6 ++-- dm/worker/subtask.go | 46 +++++++++++++++--------------- dm/worker/subtask_test.go | 20 +++++++------ syncer/ddl_test.go | 12 ++++---- syncer/error_test.go | 2 +- syncer/filter_test.go | 6 ++-- syncer/handle_error_test.go | 2 +- syncer/streamer_controller.go | 8 +++--- syncer/streamer_controller_test.go | 4 +-- syncer/syncer.go | 16 ++++------- syncer/syncer_test.go | 22 +++++++------- 13 files changed, 75 insertions(+), 75 deletions(-) diff --git a/cmd/dm-syncer/main.go b/cmd/dm-syncer/main.go index 5414e0d876..d678944054 100644 --- a/cmd/dm-syncer/main.go +++ b/cmd/dm-syncer/main.go @@ -74,7 +74,7 @@ func main() { log.L().Info("", zap.Stringer("dm-syncer conf", conf)) }) - sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer. + sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL for singleton syncer. sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 31d52cb80a..fa76cdfdc8 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/pingcap/dm/pkg/streamer" + "github.com/go-mysql-org/go-mysql/mysql" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -120,7 +122,7 @@ func (t *testServer) TestServer(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/worker/source_worker_test.go b/dm/worker/source_worker_test.go index 4d401a414b..90bd5008f1 100644 --- a/dm/worker/source_worker_test.go +++ b/dm/worker/source_worker_test.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "github.com/pingcap/dm/pkg/streamer" + "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -241,7 +243,7 @@ var _ = Suite(&testWorkerFunctionalities{}) func (t *testWorkerFunctionalities) SetUpSuite(c *C) { NewRelayHolder = NewDummyRelayHolder NewSubTask = NewRealSubTask - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { atomic.AddInt32(&t.createUnitCount, 1) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) @@ -416,7 +418,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index 8c2afbd057..b15a315d82 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -18,6 +18,8 @@ import ( "sync" "time" + "github.com/pingcap/dm/pkg/streamer" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/failpoint" "github.com/prometheus/client_golang/prometheus" @@ -44,12 +46,22 @@ const ( waitRelayCatchupTimeout = 30 * time.Second ) +type relayNotifier struct { + // ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times + ch chan interface{} +} + +// Notified implements streamer.EventNotifier. +func (r relayNotifier) Notified() chan interface{} { + return r.ch +} + // createRealUnits is subtask units initializer // it can be used for testing. var createUnits = createRealUnits // createRealUnits creates process units base on task mode. -func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) []unit.Unit { +func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit { failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) { log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly")) failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)}) @@ -64,7 +76,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor } else { us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } - us = append(us, syncer.NewSyncer(cfg, etcdClient)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) case config.ModeFull: // NOTE: maybe need another checker in the future? us = append(us, dumpling.NewDumpling(cfg)) @@ -74,7 +86,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor us = append(us, loader.NewLightning(cfg, etcdClient, workerName)) } case config.ModeIncrement: - us = append(us, syncer.NewSyncer(cfg, etcdClient)) + us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier)) default: log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode)) } @@ -100,7 +112,6 @@ type SubTask struct { units []unit.Unit // units do job one by one currUnit unit.Unit prevUnit unit.Unit - syncer *syncer.Syncer resultWg sync.WaitGroup stage pb.Stage // stage of current sub task @@ -109,6 +120,8 @@ type SubTask struct { etcdClient *clientv3.Client workerName string + + notifier streamer.EventNotifier } // NewSubTask is subtask initializer @@ -131,6 +144,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * cancel: cancel, etcdClient: etcdClient, workerName: workerName, + notifier: &relayNotifier{ch: make(chan interface{}, 1)}, } updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName) return &st @@ -138,7 +152,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient * // initUnits initializes the sub task processing units. func (st *SubTask) initUnits() error { - st.units = createUnits(st.cfg, st.etcdClient, st.workerName) + st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier) if len(st.units) < 1 { return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode) } @@ -191,23 +205,11 @@ func (st *SubTask) initUnits() error { needCloseUnits = st.units[:skipIdx] st.units = st.units[skipIdx:] - st.postInitSyncer() st.setCurrUnit(st.units[0]) return nil } -func (st *SubTask) postInitSyncer() { - // TODO, right now initUnits create units first and then remove unnecessary units(before first non fresh unit) - // maybe can be refactored into check first, then create, so we don't need to loop all units to get syncer here - for _, u := range st.units { - if s, ok := u.(*syncer.Syncer); ok { - st.syncer = s - break - } - } -} - // Run runs the sub task. // TODO: check concurrent problems. func (st *SubTask) Run(expectStage pb.Stage) { @@ -739,11 +741,9 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string) } func (st *SubTask) relayNotify() { - if st.syncer != nil { - // skip if there's pending notify - select { - case st.syncer.Notified() <- struct{}{}: - default: - } + // skip if there's pending notify + select { + case st.notifier.Notified() <- struct{}{}: + default: } } diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index a2f94a18c8..a79e45d994 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -18,6 +18,8 @@ import ( "strings" "time" + "github.com/pingcap/dm/pkg/streamer" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" @@ -47,10 +49,10 @@ func (t *testSubTask) TestCreateUnits(c *C) { Mode: "xxx", } worker := "worker" - c.Assert(createUnits(cfg, nil, worker), HasLen, 0) + c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0) cfg.Mode = config.ModeFull - unitsFull := createUnits(cfg, nil, worker) + unitsFull := createUnits(cfg, nil, worker, nil) c.Assert(unitsFull, HasLen, 2) _, ok := unitsFull[0].(*dumpling.Dumpling) c.Assert(ok, IsTrue) @@ -58,13 +60,13 @@ func (t *testSubTask) TestCreateUnits(c *C) { c.Assert(ok, IsTrue) cfg.Mode = config.ModeIncrement - unitsIncr := createUnits(cfg, nil, worker) + unitsIncr := createUnits(cfg, nil, worker, nil) c.Assert(unitsIncr, HasLen, 1) _, ok = unitsIncr[0].(*syncer.Syncer) c.Assert(ok, IsTrue) cfg.Mode = config.ModeAll - unitsAll := createUnits(cfg, nil, worker) + unitsAll := createUnits(cfg, nil, worker, nil) c.Assert(unitsAll, HasLen, 3) _, ok = unitsAll[0].(*dumpling.Dumpling) c.Assert(ok, IsTrue) @@ -176,7 +178,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return nil } st.Run(pb.Stage_Running) @@ -185,7 +187,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -296,7 +298,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -420,7 +422,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -445,7 +447,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker") c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } diff --git a/syncer/ddl_test.go b/syncer/ddl_test.go index bc163f5d3a..7b60f14bd0 100644 --- a/syncer/ddl_test.go +++ b/syncer/ddl_test.go @@ -204,7 +204,7 @@ func (s *testDDLSuite) TestResolveDDLSQL(c *C) { }, } var err error - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -347,7 +347,7 @@ func (s *testDDLSuite) TestParseDDLSQL(c *C) { }, } var err error - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -435,7 +435,7 @@ func (s *testDDLSuite) TestResolveOnlineDDL(c *C) { for _, ca := range cases { plugin, err := onlineddl.NewRealOnlinePlugin(tctx, cfg) c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.onlineDDL = plugin c.Assert(plugin.Clear(tctx), IsNil) // real table @@ -529,7 +529,7 @@ func (s *testDDLSuite) TestMistakeOnlineDDLRegex(c *C) { for _, ca := range cases { plugin, err := onlineddl.NewRealOnlinePlugin(tctx, cfg) c.Assert(err, IsNil) - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) syncer.onlineDDL = plugin c.Assert(plugin.Clear(tctx), IsNil) @@ -576,7 +576,7 @@ func (s *testDDLSuite) TestDropSchemaInSharding(c *C) { dbCfg := config.GetDBConfigForTest() cfg := s.newSubTaskCfg(dbCfg) cfg.ShardMode = config.ShardPessimistic - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) // nolint:dogsled _, _, _, _, err := syncer.sgk.AddGroup(targetTable, []string{source1}, nil, true) c.Assert(err, IsNil) @@ -603,7 +603,7 @@ func (s *testDDLSuite) TestClearOnlineDDL(c *C) { dbCfg := config.GetDBConfigForTest() cfg := s.newSubTaskCfg(dbCfg) cfg.ShardMode = config.ShardPessimistic - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) mock := mockOnlinePlugin{ map[string]struct{}{key1: {}, key2: {}}, } diff --git a/syncer/error_test.go b/syncer/error_test.go index 70551cdd5f..9a68ef2b15 100644 --- a/syncer/error_test.go +++ b/syncer/error_test.go @@ -54,7 +54,7 @@ func (s *testSyncerSuite) TestIgnoreDDLError(c *C) { func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) { var ( - syncer = NewSyncer(s.cfg, nil) + syncer = NewSyncer(s.cfg, nil, nil) tctx = tcontext.Background() conn2 = &dbconn.DBConn{Cfg: s.cfg, ResetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { return nil, nil diff --git a/syncer/filter_test.go b/syncer/filter_test.go index 58ad3ce12f..55d120d333 100644 --- a/syncer/filter_test.go +++ b/syncer/filter_test.go @@ -56,7 +56,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { IgnoreTables: []*filter.Table{{Schema: "s1", Name: "test"}}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -171,7 +171,7 @@ func (s *testFilterSuite) TestFilterOneEvent(c *C) { IgnoreDBs: []string{"s1"}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) @@ -253,7 +253,7 @@ func (s *testFilterSuite) TestSkipByTable(c *C) { IgnoreDBs: []string{"s1"}, }, } - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) var err error syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) diff --git a/syncer/handle_error_test.go b/syncer/handle_error_test.go index 1720661c8a..a4405037ce 100644 --- a/syncer/handle_error_test.go +++ b/syncer/handle_error_test.go @@ -26,7 +26,7 @@ import ( func (s *testSyncerSuite) TestHandleError(c *C) { var ( - syncer = NewSyncer(s.cfg, nil) + syncer = NewSyncer(s.cfg, nil, nil) task = "test" ctx = context.Background() cases = []struct { diff --git a/syncer/streamer_controller.go b/syncer/streamer_controller.go index e426c125ad..c58dc89c3b 100644 --- a/syncer/streamer_controller.go +++ b/syncer/streamer_controller.go @@ -121,11 +121,11 @@ type StreamerController struct { // whether the server id is updated serverIDUpdated bool - s *Syncer + notifier streamer.EventNotifier } // NewStreamerController creates a new streamer controller. -func NewStreamerController(s *Syncer, +func NewStreamerController(notifier streamer.EventNotifier, syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, @@ -155,7 +155,7 @@ func NewStreamerController(s *Syncer, timezone: timezone, fromDB: fromDB, closed: true, - s: s, + notifier: notifier, } return streamerController @@ -234,7 +234,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(c.s, tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(c.notifier, tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) diff --git a/syncer/streamer_controller_test.go b/syncer/streamer_controller_test.go index 8afc75fd1e..16c87e9dda 100644 --- a/syncer/streamer_controller_test.go +++ b/syncer/streamer_controller_test.go @@ -20,7 +20,7 @@ func (s *testSyncerSuite) TestIsConnectionRefusedError(c *C) { } func (s *testSyncerSuite) TestCanErrorRetry(c *C) { - controller := NewStreamerController(&Syncer{}, replication.BinlogSyncerConfig{}, true, nil, + controller := NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, LocalBinlog, "", nil) mockErr := errors.New("test") @@ -37,7 +37,7 @@ func (s *testSyncerSuite) TestCanErrorRetry(c *C) { }() // test with remote binlog - controller = NewStreamerController(&Syncer{}, replication.BinlogSyncerConfig{}, true, nil, + controller = NewStreamerController(nil, replication.BinlogSyncerConfig{}, true, nil, RemoteBinlog, "", nil) c.Assert(controller.CanRetry(mockErr), IsTrue) diff --git a/syncer/syncer.go b/syncer/syncer.go index 4f10d85bf2..265ca0af16 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -223,12 +223,11 @@ type Syncer struct { workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time - // relayNotifyCh with size = 1, we only need to be notified whether binlog file of relay changed, not how many times - relayNotifyCh chan interface{} + notifier streamer.EventNotifier } // NewSyncer creates a new Syncer. -func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { +func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier streamer.EventNotifier) *Syncer { logger := log.With(zap.String("task", cfg.Name), zap.String("unit", "binlog replication")) syncer := &Syncer{ pessimist: shardddl.NewPessimist(&logger, etcdClient, cfg.Name, cfg.SourceID), @@ -267,7 +266,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer { syncer.workerJobTSArray[i] = atomic.NewInt64(0) } syncer.lastCheckpointFlushedTime = time.Time{} - syncer.relayNotifyCh = make(chan interface{}, 1) + syncer.notifier = notifier return syncer } @@ -326,7 +325,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { return terror.ErrSchemaTrackerInit.Delegate(err) } - s.streamerController = NewStreamerController(s, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + s.streamerController = NewStreamerController(s.notifier, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) if err != nil { @@ -3508,7 +3507,7 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { return false, nil } // set enableGTID to false for new streamerController - streamerController := NewStreamerController(s, s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + streamerController := NewStreamerController(s.notifier, s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) endPos := binlog.AdjustPosition(location.Position) startPos := mysql.Position{ @@ -3565,8 +3564,3 @@ func (s *Syncer) delLoadTask() error { s.tctx.Logger.Info("delete load worker in etcd for all mode", zap.String("task", s.cfg.Name), zap.String("source", s.cfg.SourceID)) return nil } - -// Notified implements streamer.EventNotifier. -func (s *Syncer) Notified() chan interface{} { - return s.relayNotifyCh -} diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7f56f481b3..8b60969d2c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -262,7 +262,7 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) err = syncer.genRouter() @@ -368,7 +368,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -403,7 +403,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -495,7 +495,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.baList, err = filter.New(syncer.cfg.CaseSensitive, syncer.cfg.BAList) c.Assert(err, IsNil) c.Assert(syncer.genRouter(), IsNil) @@ -587,7 +587,7 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { p := parser.New() var err error - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) c.Assert(syncer.genRouter(), IsNil) syncer.binlogFilter, err = bf.NewBinlogEvent(false, s.cfg.FilterRules) @@ -704,7 +704,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { } func (s *testSyncerSuite) TestcheckpointID(c *C) { - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) checkpointID := syncer.checkpointID() c.Assert(checkpointID, Equals, "101") } @@ -727,7 +727,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { } s.cfg.WorkerCount = 1 - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.jobs = []chan *job{make(chan *job, 1)} syncer.queueBucketMapping = []string{"queue_0", adminQueueName} @@ -816,7 +816,7 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.WorkerCount = 2 s.cfg.MaxRetry = 1 - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.cfg.CheckpointFlushInterval = 30 syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ @@ -1058,7 +1058,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { }, } - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db, func() {})} syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, @@ -1201,7 +1201,7 @@ func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) { cfg, err := s.cfg.Clone() c.Assert(err, IsNil) cfg.Mode = config.ModeAll - syncer := NewSyncer(cfg, nil) + syncer := NewSyncer(cfg, nil, nil) fresh, err := syncer.IsFreshTask(context.Background()) c.Assert(err, IsNil) c.Assert(fresh, IsTrue) @@ -1240,7 +1240,7 @@ func (s *testSyncerSuite) TestTrackDDL(c *C) { checkPointDBConn, err := checkPointDB.Conn(context.Background()) c.Assert(err, IsNil) - syncer := NewSyncer(s.cfg, nil) + syncer := NewSyncer(s.cfg, nil, nil) syncer.toDBConns = []*dbconn.DBConn{ {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, From 64d388f302e31084ee3fa160dca4511719702ab5 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Oct 2021 17:13:18 +0800 Subject: [PATCH 12/20] rename fields to make code more verbose --- pkg/streamer/file.go | 4 ++-- pkg/streamer/file_test.go | 24 ++++++++++++------------ pkg/streamer/reader.go | 8 ++++---- pkg/streamer/reader_test.go | 2 +- relay/relay.go | 16 ++++++++-------- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index ec0b20aa3f..a993350d66 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -208,7 +208,7 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { } type relayLogFileChecker struct { - n EventNotifier + notifier EventNotifier relayDir, currentUUID string latestRelayLogDir, latestFilePath, latestFile string beginOffset, endOffset int64 @@ -277,7 +277,7 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u select { case <-ctx.Done(): errCh <- terror.Annotate(ctx.Err(), "context meet error") - case <-r.n.Notified(): + case <-r.notifier.Notified(): // the notified event may not be the current relay file // in that case we may read 0 bytes and check again updatePathCh <- r.latestFilePath diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 76b6485837..73fef8f4be 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -334,7 +334,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { en := newDummyEventNotifier(0) // a. relay log dir not exist checker := &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: "/not-exists-directory", @@ -368,7 +368,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // meta not found checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: subDir, @@ -389,7 +389,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // relay file not found checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: subDir, @@ -415,7 +415,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { err = os.WriteFile(relayPaths[0], nil, 0o600) c.Assert(err, IsNil) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: subDir, @@ -433,7 +433,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // return changed file in meta checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: subDir, @@ -452,7 +452,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // file increased when checking meta err = os.WriteFile(relayPaths[0], data, 0o600) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: "", currentUUID: "", latestRelayLogDir: subDir, @@ -473,7 +473,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { relayDir := c.MkDir() t.writeUUIDs(c, relayDir, []string{"xxx.000001", "xxx.000002"}) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000002", latestRelayLogDir: subDir, @@ -493,7 +493,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { _ = os.MkdirAll(filepath.Join(relayDir, "xxx.000002"), 0o700) _ = os.WriteFile(filepath.Join(relayDir, "xxx.000002", "mysql.000001"), nil, 0o600) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000001", latestRelayLogDir: subDir, @@ -512,7 +512,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // binlog dir switched, but last file smaller err = os.WriteFile(relayPaths[1], nil, 0o600) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000001", latestRelayLogDir: subDir, @@ -531,7 +531,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // binlog dir switched, but last file bigger err = os.WriteFile(relayPaths[1], data, 0o600) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000001", latestRelayLogDir: subDir, @@ -550,7 +550,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // binlog dir switched, but last file not changed err = os.WriteFile(relayPaths[1], nil, 0o600) checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000001", latestRelayLogDir: subDir, @@ -572,7 +572,7 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { newCtx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() checker = &relayLogFileChecker{ - n: en, + notifier: en, relayDir: relayDir, currentUUID: "xxx.000002", latestRelayLogDir: subDir, diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 321be4f98c..35c38c5fbc 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -79,11 +79,11 @@ type BinlogReader struct { usingGTID bool prevGset, currGset mysql.GTIDSet - en EventNotifier + notifier EventNotifier } // NewBinlogReader creates a new BinlogReader. -func NewBinlogReader(en EventNotifier, logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { +func NewBinlogReader(notifier EventNotifier, logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader { ctx, cancel := context.WithCancel(context.Background()) // only can be canceled in `Close` parser := replication.NewBinlogParser() parser.SetVerifyChecksum(true) @@ -101,7 +101,7 @@ func NewBinlogReader(en EventNotifier, logger log.Logger, cfg *BinlogReaderConfi indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename), cancel: cancel, tctx: newtctx, - en: en, + notifier: notifier, } } @@ -597,7 +597,7 @@ func (r *BinlogReader) parseFile( go func(latestPos int64) { defer wg.Done() checker := relayLogFileChecker{ - n: r.en, + notifier: r.notifier, relayDir: r.cfg.RelayDir, currentUUID: currentUUID, latestRelayLogDir: relayLogDir, diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index cdfcd6b7ef..6fb7915e18 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -378,7 +378,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { // meet `err EOF` error (when parsing binlog event) ignored ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() - r.en = newDummyEventNotifier(1) + r.notifier = newDummyEventNotifier(1) needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) c.Assert(err, IsNil) diff --git a/relay/relay.go b/relay/relay.go index a602f9dda4..24d970b11f 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -127,16 +127,16 @@ type Relay struct { sync.RWMutex info *pkgstreamer.RelayLogInfo } - els map[Listener]struct{} + listeners map[Listener]struct{} // make it a set to make it easier to remove listener } // NewRealRelay creates an instance of Relay. func NewRealRelay(cfg *Config) Process { return &Relay{ - cfg: cfg, - meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), - logger: log.With(zap.String("component", "relay log")), - els: make(map[Listener]struct{}), + cfg: cfg, + meta: NewLocalMeta(cfg.Flavor, cfg.RelayDir), + logger: log.With(zap.String("component", "relay log")), + listeners: make(map[Listener]struct{}), } } @@ -1106,7 +1106,7 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) func (r *Relay) notify(e *replication.BinlogEvent) { r.RLock() defer r.RUnlock() - for el := range r.els { + for el := range r.listeners { el.OnEvent(e) } } @@ -1115,12 +1115,12 @@ func (r *Relay) notify(e *replication.BinlogEvent) { func (r *Relay) RegisterListener(el Listener) { r.Lock() defer r.Unlock() - r.els[el] = struct{}{} + r.listeners[el] = struct{}{} } // UnRegisterListener implements Process.UnRegisterListener. func (r *Relay) UnRegisterListener(el Listener) { r.Lock() defer r.Unlock() - delete(r.els, el) + delete(r.listeners, el) } From ac2f391dcb5415ffe231a7329a87205a6730bb94 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Oct 2021 17:32:57 +0800 Subject: [PATCH 13/20] add comment --- pkg/streamer/file.go | 5 +++++ pkg/streamer/reader.go | 4 +--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index a993350d66..29d97f3f6c 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -225,6 +225,11 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u errCh <- terror.Annotate(err, "decode relay meta toml file failed") return } + // current watched file size have no change means that no new writes have been made + // our relay meta file will be updated immediately after receive the rotate event, + // although we cannot ensure that the binlog filename in the meta is the next file after latestFile + // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) + // will find the right one if meta.BinLogName != r.latestFile { // we need check file size again, as the file may have been changed during our metafile check cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset) diff --git a/pkg/streamer/reader.go b/pkg/streamer/reader.go index 35c38c5fbc..189bdc4a0a 100644 --- a/pkg/streamer/reader.go +++ b/pkg/streamer/reader.go @@ -583,7 +583,6 @@ func (r *BinlogReader) parseFile( } switchCh := make(chan SwitchPath, 1) - switchErrCh := make(chan error, 1) updatePathCh := make(chan string, 1) updateErrCh := make(chan error, 1) newCtx, cancel := context.WithCancel(ctx) @@ -606,6 +605,7 @@ func (r *BinlogReader) parseFile( beginOffset: offset, endOffset: latestPos, } + // TODO no need to be a goroutine now, maybe refactored when refactoring parseFile itself. checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, updateErrCh) }(latestPos) @@ -625,8 +625,6 @@ func (r *BinlogReader) parseFile( } // need parse next relay log file or re-collect files return false, false, latestPos, "", "", false, nil - case err := <-switchErrCh: - return false, false, 0, "", "", false, err case err := <-updateErrCh: return false, false, 0, "", "", false, err } From af5f7e2d8f63deda79812140b3c414474be7efd3 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 18 Oct 2021 18:45:16 +0800 Subject: [PATCH 14/20] add comments --- pkg/streamer/file.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/streamer/file.go b/pkg/streamer/file.go index 29d97f3f6c..9dc8a32f70 100644 --- a/pkg/streamer/file.go +++ b/pkg/streamer/file.go @@ -289,7 +289,9 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u case <-timer.C: // for a task start after source shutdown or there's no new write, it'll not be notified, // and if it's reading from dir 000001 and there's need to switch dir to 000002, - // we stop waiting after watcherInterval to give it a chance to check again + // after the task read files in dir 000001, the read size > 0, so it goes to the select directly, + // since there is no notify, it blocks, that'll leave dir 000002 un-synced. + // so we stop waiting after watcherInterval to give it a chance to check again updatePathCh <- r.latestFilePath } } From 100788c732b4f692194311c0a8a22cbfd3a04b47 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Mon, 25 Oct 2021 17:51:03 +0800 Subject: [PATCH 15/20] remove unnecessiary lock operation --- dm/worker/relay.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 8378ec6b57..342bf3d9e4 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -312,14 +312,10 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo { } func (h *realRelayHolder) RegisterListener(el relay.Listener) { - h.Lock() - defer h.Unlock() h.relay.RegisterListener(el) } func (h *realRelayHolder) UnRegisterListener(el relay.Listener) { - h.Lock() - defer h.Unlock() h.relay.UnRegisterListener(el) } From 026d9fbd0ba2786678911e5e0497527eff4a2a59 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 26 Oct 2021 14:29:31 +0800 Subject: [PATCH 16/20] fix unstable integration test case ha_cases3 --- tests/_utils/run_sql | 2 +- tests/ha_cases3/run.sh | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/_utils/run_sql b/tests/_utils/run_sql index ed5609dc41..71e018db29 100755 --- a/tests/_utils/run_sql +++ b/tests/_utils/run_sql @@ -12,4 +12,4 @@ if [[ "$2" = $TIDB_PORT ]]; then fi echo "[$(date)] Executing SQL: $1" >"$TEST_DIR/sql_res.$TEST_NAME.txt" -mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" +mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" 2>&1 diff --git a/tests/ha_cases3/run.sh b/tests/ha_cases3/run.sh index 4345d63f70..22c695b83f 100755 --- a/tests/ha_cases3/run.sh +++ b/tests/ha_cases3/run.sh @@ -65,11 +65,11 @@ function test_stop_task() { task_config=(dm-task.yaml dm-task2.yaml) for name in ${task_name[@]}; do echo "stop tasks $name" - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task $name" \ "\"result\": true" 3 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name" \ "\"result\": false" 1 done @@ -80,13 +80,14 @@ function test_stop_task() { for idx in $(seq 0 1); do echo "start tasks $cur/conf/${task_config[$idx]}" + # if relay hasn't catch up with current sync pos(rarely happens), start-task may start with an error + # so we don't check "\"result\": true" here, and check it using query-status run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/${task_config[$idx]}" \ - "\"result\": true" 3 \ "\"source\": \"$SOURCE_ID1\"" 1 \ "\"source\": \"$SOURCE_ID2\"" 1 - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status ${task_name[$idx]}" \ "\"stage\": \"Running\"" 4 done From 11511a30f173700002f8132ee528a8303de504df Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 26 Oct 2021 14:36:06 +0800 Subject: [PATCH 17/20] fix comments --- tests/ha_cases3/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ha_cases3/run.sh b/tests/ha_cases3/run.sh index 22c695b83f..6e68f2d40d 100755 --- a/tests/ha_cases3/run.sh +++ b/tests/ha_cases3/run.sh @@ -80,7 +80,7 @@ function test_stop_task() { for idx in $(seq 0 1); do echo "start tasks $cur/conf/${task_config[$idx]}" - # if relay hasn't catch up with current sync pos(rarely happens), start-task may start with an error + # if relay hasn't catch up with current sync pos(such as when workload too high), start-task may start with an error # so we don't check "\"result\": true" here, and check it using query-status run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/${task_config[$idx]}" \ From 9e93be12805aacef914f24af7251f41dc64de999 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 26 Oct 2021 15:05:27 +0800 Subject: [PATCH 18/20] undo redirect stderr in run_sql, some case depends on the output --- tests/_utils/run_sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/_utils/run_sql b/tests/_utils/run_sql index 71e018db29..ed5609dc41 100755 --- a/tests/_utils/run_sql +++ b/tests/_utils/run_sql @@ -12,4 +12,4 @@ if [[ "$2" = $TIDB_PORT ]]; then fi echo "[$(date)] Executing SQL: $1" >"$TEST_DIR/sql_res.$TEST_NAME.txt" -mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" 2>&1 +mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>"$TEST_DIR/sql_res.$TEST_NAME.txt" From ebd1d2ba0cdce4e6e1456f54f98ebde6ba51792d Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 27 Oct 2021 10:52:11 +0800 Subject: [PATCH 19/20] add more unit test --- pkg/streamer/file_test.go | 24 +++++++++++++++++++++--- pkg/streamer/reader_test.go | 2 +- relay/relay_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/pkg/streamer/file_test.go b/pkg/streamer/file_test.go index 73fef8f4be..1a28229452 100644 --- a/pkg/streamer/file_test.go +++ b/pkg/streamer/file_test.go @@ -569,8 +569,6 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { // got notified en = newDummyEventNotifier(1) - newCtx, cancel = context.WithTimeout(ctx, time.Second) - defer cancel() checker = &relayLogFileChecker{ notifier: en, relayDir: relayDir, @@ -581,7 +579,27 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { beginOffset: 0, endOffset: size, } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) + checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) + c.Assert(len(errCh), Equals, 0) + c.Assert(len(updatePathCh), Equals, 1) + c.Assert(len(switchCh), Equals, 0) + up = <-updatePathCh + c.Assert(up, Equals, relayPaths[0]) + c.Assert(len(en.Notified()), Equals, 0) + + // got notified on timer + en = newDummyEventNotifier(0) + checker = &relayLogFileChecker{ + notifier: en, + relayDir: relayDir, + currentUUID: "xxx.000002", + latestRelayLogDir: subDir, + latestFilePath: relayPaths[0], + latestFile: relayFiles[0], + beginOffset: 0, + endOffset: size, + } + checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) c.Assert(len(errCh), Equals, 0) c.Assert(len(updatePathCh), Equals, 1) c.Assert(len(switchCh), Equals, 0) diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index 6fb7915e18..ddbcc1a1f7 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -274,7 +274,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { r = NewBinlogReader(newDummyEventNotifier(1), log.L(), cfg) ) - // create the current relay log file and write some events + // create the current relay log file and meta err := os.MkdirAll(relayDir, 0o700) c.Assert(err, IsNil) f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) diff --git a/relay/relay_test.go b/relay/relay_test.go index 28ad060d97..6cf3f3172e 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -319,6 +319,30 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { c.Assert(latestGTIDs.Equal(recoverGTIDSet), IsTrue) } +type dummyListener bool + +func (d *dummyListener) OnEvent(e *replication.BinlogEvent) { + *d = true +} + +func (t *testRelaySuite) TestListener(c *C) { + relay := NewRelay(&Config{}).(*Relay) + c.Assert(len(relay.listeners), Equals, 0) + + lis := dummyListener(false) + relay.RegisterListener(&lis) + c.Assert(len(relay.listeners), Equals, 1) + + relay.notify(nil) + c.Assert(bool(lis), Equals, true) + + relay.UnRegisterListener(&lis) + c.Assert(len(relay.listeners), Equals, 0) + lis = false + relay.notify(nil) + c.Assert(bool(lis), Equals, false) +} + // genBinlogEventsWithGTIDs generates some binlog events used by testFileUtilSuite and testFileWriterSuite. // now, its generated events including 3 DDL and 10 DML. func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, latestGTID2 gtid.Set) (*event.Generator, []*replication.BinlogEvent, []byte) { From 99b2ed7847a692900cdb159248d6140dcf3312b0 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 27 Oct 2021 12:17:04 +0800 Subject: [PATCH 20/20] fix import groups --- cmd/dm-portal/main.go | 5 +++-- cmd/dm-worker/main.go | 3 ++- dm/pbmock/dmmaster.go | 3 ++- dm/pbmock/dmworker.go | 3 ++- dm/worker/server_test.go | 3 +-- dm/worker/source_worker_test.go | 3 +-- dm/worker/subtask.go | 3 +-- dm/worker/subtask_test.go | 3 +-- tests/_dmctl_tools/check_master_online.go | 3 ++- tests/_dmctl_tools/check_worker_online.go | 3 ++- 10 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cmd/dm-portal/main.go b/cmd/dm-portal/main.go index 44eee7b83a..44d3772e66 100644 --- a/cmd/dm-portal/main.go +++ b/cmd/dm-portal/main.go @@ -18,11 +18,12 @@ import ( "net/http" "os" + "github.com/rakyll/statik/fs" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/portal" _ "github.com/pingcap/dm/dm/portal/statik" "github.com/pingcap/dm/pkg/log" - "github.com/rakyll/statik/fs" - "go.uber.org/zap" ) func main() { diff --git a/cmd/dm-worker/main.go b/cmd/dm-worker/main.go index 220c354b59..3085238931 100644 --- a/cmd/dm-worker/main.go +++ b/cmd/dm-worker/main.go @@ -25,12 +25,13 @@ import ( globalLog "github.com/pingcap/log" "go.uber.org/zap" + lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/dm/dm/ctl/common" "github.com/pingcap/dm/dm/worker" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" - lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log" ) func main() { diff --git a/dm/pbmock/dmmaster.go b/dm/pbmock/dmmaster.go index 39f9123c3f..8b3db82173 100644 --- a/dm/pbmock/dmmaster.go +++ b/dm/pbmock/dmmaster.go @@ -9,8 +9,9 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - pb "github.com/pingcap/dm/dm/pb" grpc "google.golang.org/grpc" + + pb "github.com/pingcap/dm/dm/pb" ) // MockMasterClient is a mock of MasterClient interface. diff --git a/dm/pbmock/dmworker.go b/dm/pbmock/dmworker.go index a7601b37b9..bd6e3710d1 100644 --- a/dm/pbmock/dmworker.go +++ b/dm/pbmock/dmworker.go @@ -9,8 +9,9 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - pb "github.com/pingcap/dm/dm/pb" grpc "google.golang.org/grpc" + + pb "github.com/pingcap/dm/dm/pb" ) // MockWorkerClient is a mock of WorkerClient interface. diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index fa76cdfdc8..b696f8d083 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -22,8 +22,6 @@ import ( "testing" "time" - "github.com/pingcap/dm/pkg/streamer" - "github.com/go-mysql-org/go-mysql/mysql" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -40,6 +38,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) diff --git a/dm/worker/source_worker_test.go b/dm/worker/source_worker_test.go index 90bd5008f1..e3977dc805 100644 --- a/dm/worker/source_worker_test.go +++ b/dm/worker/source_worker_test.go @@ -20,8 +20,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/dm/pkg/streamer" - "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -34,6 +32,7 @@ import ( "github.com/pingcap/dm/pkg/conn" "github.com/pingcap/dm/pkg/ha" "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" ) diff --git a/dm/worker/subtask.go b/dm/worker/subtask.go index b15a315d82..ae63704dcb 100644 --- a/dm/worker/subtask.go +++ b/dm/worker/subtask.go @@ -18,8 +18,6 @@ import ( "sync" "time" - "github.com/pingcap/dm/pkg/streamer" - "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/failpoint" "github.com/prometheus/client_golang/prometheus" @@ -36,6 +34,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/shardddl/pessimism" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer" diff --git a/dm/worker/subtask_test.go b/dm/worker/subtask_test.go index a79e45d994..53c961f33d 100644 --- a/dm/worker/subtask_test.go +++ b/dm/worker/subtask_test.go @@ -18,14 +18,13 @@ import ( "strings" "time" - "github.com/pingcap/dm/pkg/streamer" - "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" "github.com/pingcap/dm/dumpling" "github.com/pingcap/dm/loader" "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/streamer" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/syncer" diff --git a/tests/_dmctl_tools/check_master_online.go b/tests/_dmctl_tools/check_master_online.go index a1a986fb78..dec45991e8 100644 --- a/tests/_dmctl_tools/check_master_online.go +++ b/tests/_dmctl_tools/check_master_online.go @@ -20,9 +20,10 @@ import ( "google.golang.org/grpc" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use show-ddl-locks request to test DM-master is online diff --git a/tests/_dmctl_tools/check_worker_online.go b/tests/_dmctl_tools/check_worker_online.go index 0bcbb804c2..bd4b33e9dd 100644 --- a/tests/_dmctl_tools/check_worker_online.go +++ b/tests/_dmctl_tools/check_worker_online.go @@ -20,9 +20,10 @@ import ( "google.golang.org/grpc" + toolutils "github.com/pingcap/tidb-tools/pkg/utils" + "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/tests/utils" - toolutils "github.com/pingcap/tidb-tools/pkg/utils" ) // use query status request to test DM-worker is online