diff --git a/dm/dm/worker/relay_test.go b/dm/dm/worker/relay_test.go index 589843c80ca..9935b0a193c 100644 --- a/dm/dm/worker/relay_test.go +++ b/dm/dm/worker/relay_test.go @@ -47,6 +47,10 @@ type DummyRelay struct { reloadErr error } +func (d *DummyRelay) IsActive(uuid, filename string) (bool, int64) { + return false, 0 +} + func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader { return nil } diff --git a/dm/pkg/binlog/position.go b/dm/pkg/binlog/position.go index 91c8fc6ac9a..85de199999f 100644 --- a/dm/pkg/binlog/position.go +++ b/dm/pkg/binlog/position.go @@ -37,6 +37,8 @@ const ( posUUIDSuffixSeparator = "|" // MinUUIDSuffix is same as relay.MinUUIDSuffix. MinUUIDSuffix = 1 + // FileHeaderLen is the length of binlog file header. + FileHeaderLen = 4 ) // MinPosition is the min binlog position. diff --git a/dm/pkg/utils/util.go b/dm/pkg/utils/util.go index 619609cee12..d6975f668a0 100644 --- a/dm/pkg/utils/util.go +++ b/dm/pkg/utils/util.go @@ -263,3 +263,12 @@ func proxyFields() []zap.Field { } return fields } + +func NewStoppedTimer() *time.Timer { + // stopped timer should be Reset with correct duration, so use 0 here + t := time.NewTimer(0) + if !t.Stop() { + <-t.C + } + return t +} diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 622ed0801a8..89f19a48394 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -17,6 +17,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "sync" "github.com/pingcap/errors" @@ -33,6 +34,8 @@ type BinlogWriter struct { offset atomic.Int64 file *os.File + relayDir string + uuid string filename string logger log.Logger @@ -55,14 +58,16 @@ func (s *BinlogWriterStatus) String() string { } // NewBinlogWriter creates a BinlogWriter instance. -func NewBinlogWriter(logger log.Logger) *BinlogWriter { +func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter { return &BinlogWriter{ - logger: logger, + logger: logger, + relayDir: relayDir, } } -func (w *BinlogWriter) Open(filename string) error { - f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) +func (w *BinlogWriter) Open(uuid, filename string) error { + fullName := filepath.Join(w.relayDir, uuid, filename) + f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644) if err != nil { return terror.ErrBinlogWriterOpenFile.Delegate(err) } @@ -80,6 +85,7 @@ func (w *BinlogWriter) Open(filename string) error { w.offset.Store(fs.Size()) w.file = f + w.uuid = uuid w.filename = filename return nil @@ -100,6 +106,7 @@ func (w *BinlogWriter) Close() error { w.file = nil w.offset.Store(0) + w.uuid = "" w.filename = "" return err @@ -132,3 +139,9 @@ func (w *BinlogWriter) Status() *BinlogWriterStatus { func (w *BinlogWriter) Offset() int64 { return w.offset.Load() } + +func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { + w.mu.RLock() + defer w.mu.RUnlock() + return uuid == w.uuid && filename == w.filename, w.offset.Load() +} diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index a8c5644fe10..e21cbb0a2e9 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -30,16 +30,20 @@ type testBinlogWriterSuite struct{} func (t *testBinlogWriterSuite) TestWrite(c *C) { dir := c.MkDir() - filename := filepath.Join(dir, "test-mysql-bin.000001") + uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" + binlogDir := filepath.Join(dir, uuid) + c.Assert(os.Mkdir(binlogDir, 0o755), IsNil) + + filename := "test-mysql-bin.000001" var ( allData bytes.Buffer data1 = []byte("test-data") ) { - w := NewBinlogWriter(log.L()) + w := NewBinlogWriter(log.L(), dir) c.Assert(w, NotNil) - c.Assert(w.Open(filename), IsNil) + c.Assert(w.Open(uuid, filename), IsNil) fwStatus := w.Status() c.Assert(fwStatus.Filename, Equals, filename) c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) @@ -50,19 +54,19 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { { // not opened - w := NewBinlogWriter(log.L()) + w := NewBinlogWriter(log.L(), dir) err := w.Write(data1) c.Assert(err, ErrorMatches, "*not opened") // open non exist dir - err = w.Open(filepath.Join(dir, "not-exist", "bin.000001")) + err = w.Open("not-exist-uuid", "bin.000001") c.Assert(err, ErrorMatches, "*no such file or directory") } { // normal call flow - w := NewBinlogWriter(log.L()) - err := w.Open(filename) + w := NewBinlogWriter(log.L(), dir) + err := w.Open(uuid, filename) c.Assert(err, IsNil) c.Assert(w.file, NotNil) c.Assert(w.filename, Equals, filename) @@ -93,7 +97,8 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { c.Assert(w.Close(), IsNil) // noop // try to read the data back - dataInFile, err := os.ReadFile(filename) + fullName := filepath.Join(binlogDir, filename) + dataInFile, err := os.ReadFile(fullName) c.Assert(err, IsNil) c.Assert(dataInFile, DeepEquals, allData.Bytes()) } diff --git a/dm/relay/error.go b/dm/relay/error.go deleted file mode 100644 index e5284e33c44..00000000000 --- a/dm/relay/error.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package relay - -import ( - "context" - - "github.com/pingcap/errors" -) - -// isRetryableError checks whether the error is retryable. -func isRetryableError(err error) bool { - if err = errors.Cause(err); err == context.DeadlineExceeded { - return true - } - return false -} diff --git a/dm/relay/error_test.go b/dm/relay/error_test.go deleted file mode 100644 index e2ab04d2046..00000000000 --- a/dm/relay/error_test.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package relay - -import ( - "context" - - "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -var _ = check.Suite(&testErrorSuite{}) - -type testErrorSuite struct{} - -func (t *testErrorSuite) TestRetryable(c *check.C) { - err := errors.New("custom error") - c.Assert(isRetryableError(err), check.IsFalse) - - cases := []error{ - context.DeadlineExceeded, - errors.Annotate(context.DeadlineExceeded, "annotated"), - } - for _, cs := range cases { - c.Assert(isRetryableError(cs), check.IsTrue) - } -} diff --git a/dm/relay/file.go b/dm/relay/file.go index 0317b2f2fac..7f284a6eab0 100644 --- a/dm/relay/file.go +++ b/dm/relay/file.go @@ -14,14 +14,10 @@ package relay import ( - "context" "os" - "path" "path/filepath" "sort" - "time" - "github.com/BurntSushi/toml" "go.uber.org/zap" "github.com/pingcap/ticdc/dm/pkg/binlog" @@ -42,12 +38,6 @@ const ( FileCmpBigger ) -// SwitchPath represents next binlog file path which should be switched. -type SwitchPath struct { - nextUUID string - nextBinlogName string -} - // EventNotifier notifies whether there is new binlog event written to the file. type EventNotifier interface { // Notified returns a channel used to check whether there is new binlog event written to the file @@ -206,122 +196,3 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) { return -1, nil } } - -type relayLogFileChecker struct { - notifier EventNotifier - relayDir, currentUUID string - latestRelayLogDir, latestFilePath, latestFile string - beginOffset, endOffset int64 -} - -// relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created. -func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, updatePathCh chan string, switchCh chan SwitchPath, errCh chan error) { - // binlog file may have rotated if we read nothing last time(either it's the first read or after notified) - lastReadCnt := r.endOffset - r.beginOffset - if lastReadCnt == 0 { - meta := &LocalMeta{} - _, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta) - if err != nil { - errCh <- terror.Annotate(err, "decode relay meta toml file failed") - return - } - // current watched file size have no change means that no new writes have been made - // our relay meta file will be updated immediately after receive the rotate event, - // although we cannot ensure that the binlog filename in the meta is the next file after latestFile - // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) - // will find the right one - if meta.BinLogName != r.latestFile { - // we need check file size again, as the file may have been changed during our metafile check - cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset) - if err2 != nil { - errCh <- terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) - return - } - switch { - case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) - case cmp > 0: - updatePathCh <- r.latestFilePath - default: - nextFilePath := filepath.Join(r.latestRelayLogDir, meta.BinLogName) - log.L().Info("newer relay log file is already generated", - zap.String("now file path", r.latestFilePath), - zap.String("new file path", nextFilePath)) - updatePathCh <- nextFilePath - } - return - } - - // maybe UUID index file changed - switchPath, err := r.getSwitchPath() - if err != nil { - errCh <- err - return - } - if switchPath != nil { - // we need check file size again, as the file may have been changed during path check - cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset) - if err != nil { - errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset) - return - } - switch { - case cmp < 0: - errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath) - case cmp > 0: - updatePathCh <- r.latestFilePath - default: - log.L().Info("newer relay uuid path is already generated", - zap.String("current path", r.latestRelayLogDir), - zap.Any("new path", switchPath)) - switchCh <- *switchPath - } - return - } - } - - timer := time.NewTimer(watcherInterval) - defer timer.Stop() - select { - case <-ctx.Done(): - errCh <- terror.Annotate(ctx.Err(), "context meet error") - case <-r.notifier.Notified(): - // the notified event may not be the current relay file - // in that case we may read 0 bytes and check again - updatePathCh <- r.latestFilePath - case <-timer.C: - // for a task start after source shutdown or there's no new write, it'll not be notified, - // and if it's reading from dir 000001 and there's need to switch dir to 000002, - // after the task read files in dir 000001, the read size > 0, so it goes to the select directly, - // since there is no notify, it blocks, that'll leave dir 000002 un-synced. - // so we stop waiting after watcherInterval to give it a chance to check again - updatePathCh <- r.latestFilePath - } -} - -func (r *relayLogFileChecker) getSwitchPath() (*SwitchPath, error) { - // reload uuid - uuids, err := utils.ParseUUIDIndex(path.Join(r.relayDir, utils.UUIDIndexFilename)) - if err != nil { - return nil, err - } - nextUUID, _, err := getNextUUID(r.currentUUID, uuids) - if err != nil { - return nil, err - } - if len(nextUUID) == 0 { - return nil, nil - } - - // try to get the first binlog file in next subdirectory - nextBinlogName, err := getFirstBinlogName(r.relayDir, nextUUID) - if err != nil { - // because creating subdirectory and writing relay log file are not atomic - if terror.ErrBinlogFilesNotFound.Equal(err) { - return nil, nil - } - return nil, err - } - - return &SwitchPath{nextUUID, nextBinlogName}, nil -} diff --git a/dm/relay/file_test.go b/dm/relay/file_test.go index 22c5abd6acf..e1f96cfd66d 100644 --- a/dm/relay/file_test.go +++ b/dm/relay/file_test.go @@ -14,15 +14,9 @@ package relay import ( - "bytes" - "context" - "fmt" "os" - "path" "path/filepath" - "time" - "github.com/BurntSushi/toml" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -294,388 +288,6 @@ func (t *testFileSuite) TestFileSizeUpdated(c *C) { c.Assert(cmp, Equals, 1) } -type dummyEventNotifier struct { - ch chan interface{} -} - -func (d *dummyEventNotifier) Notified() chan interface{} { - return d.ch -} - -func newDummyEventNotifier(n int) EventNotifier { - d := &dummyEventNotifier{ - ch: make(chan interface{}, n), - } - for i := 0; i < n; i++ { - d.ch <- struct{}{} - } - return d -} - -func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { - var ( - relayFiles = []string{ - "mysql-bin.000001", - "mysql-bin.000002", - } - binlogPos = uint32(4) - binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" - relayPaths = make([]string, len(relayFiles)) - data = []byte("meaningless file content") - size = int64(len(data)) - updatePathCh = make(chan string, 1) - switchCh = make(chan SwitchPath, 1) - errCh = make(chan error, 1) - ) - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - en := newDummyEventNotifier(0) - // a. relay log dir not exist - checker := &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: "/not-exists-directory", - latestFilePath: "/not-exists-filepath", - latestFile: "not-exists-file", - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err := <-errCh - c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") - - // create relay log dir - subDir := c.MkDir() - // join the file path - for i, rf := range relayFiles { - relayPaths[i] = filepath.Join(subDir, rf) - } - - rotateRelayFile := func(filename string) { - meta := LocalMeta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} - metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) - c.Assert(err2, IsNil) - err = toml.NewEncoder(metaFile).Encode(&meta) - c.Assert(err, IsNil) - _ = metaFile.Close() - } - - // meta not found - checker = &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*no such file or directory*") - - // write meta - rotateRelayFile(relayFiles[0]) - - // relay file not found - checker = &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: "not-exists-file", - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*no such file or directory*") - - // create the first relay file - err = os.WriteFile(relayPaths[0], data, 0o600) - c.Assert(err, IsNil) - // rotate relay file - rotateRelayFile(relayFiles[1]) - - // file decreased when meta changed - err = os.WriteFile(relayPaths[0], nil, 0o600) - c.Assert(err, IsNil) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: size, - endOffset: size, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") - - // return changed file in meta - checker = &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - up := <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - - // file increased when checking meta - err = os.WriteFile(relayPaths[0], data, 0o600) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: "", - currentUUID: "", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(ctx, updatePathCh, switchCh, errCh) - up = <-updatePathCh - c.Assert(up, Equals, relayPaths[0]) - c.Assert(len(switchCh), Equals, 0) - c.Assert(len(errCh), Equals, 0) - - // context timeout (no new write) - newCtx, cancel := context.WithTimeout(ctx, time.Millisecond) - defer cancel() - relayDir := c.MkDir() - t.writeUUIDs(c, relayDir, []string{"xxx.000001", "xxx.000002"}) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000002", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: size, - } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err7 := <-errCh - c.Assert(err7, ErrorMatches, "context meet error:.*") - - // binlog dir switched, but last file not exists - _ = os.MkdirAll(filepath.Join(relayDir, "xxx.000002"), 0o700) - _ = os.WriteFile(filepath.Join(relayDir, "xxx.000002", "mysql.000001"), nil, 0o600) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000001", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[1], - latestFile: relayFiles[1], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*no such file or directory*") - - // binlog dir switched, but last file smaller - err = os.WriteFile(relayPaths[1], nil, 0o600) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000001", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[1], - latestFile: relayFiles[1], - beginOffset: size, - endOffset: size, - } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 1) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 0) - err = <-errCh - c.Assert(err, ErrorMatches, ".*file size of relay log.*become smaller.*") - - // binlog dir switched, but last file bigger - err = os.WriteFile(relayPaths[1], data, 0o600) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000001", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[1], - latestFile: relayFiles[1], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - up = <-updatePathCh - c.Assert(up, Equals, relayPaths[1]) - - // binlog dir switched, but last file not changed - err = os.WriteFile(relayPaths[1], nil, 0o600) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000001", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[1], - latestFile: relayFiles[1], - beginOffset: 0, - endOffset: 0, - } - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 0) - c.Assert(len(switchCh), Equals, 1) - sp := <-switchCh - c.Assert(sp.nextUUID, Equals, "xxx.000002") - c.Assert(sp.nextBinlogName, Equals, "mysql.000001") - - // got notified - en = newDummyEventNotifier(1) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000002", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: size, - } - checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - up = <-updatePathCh - c.Assert(up, Equals, relayPaths[0]) - c.Assert(len(en.Notified()), Equals, 0) - - // got notified on timer - en = newDummyEventNotifier(0) - checker = &relayLogFileChecker{ - notifier: en, - relayDir: relayDir, - currentUUID: "xxx.000002", - latestRelayLogDir: subDir, - latestFilePath: relayPaths[0], - latestFile: relayFiles[0], - beginOffset: 0, - endOffset: size, - } - checker.relayLogUpdatedOrNewCreated(context.Background(), updatePathCh, switchCh, errCh) - c.Assert(len(errCh), Equals, 0) - c.Assert(len(updatePathCh), Equals, 1) - c.Assert(len(switchCh), Equals, 0) - up = <-updatePathCh - c.Assert(up, Equals, relayPaths[0]) -} - -func (t *testFileSuite) TestGetSwitchPath(c *C) { - var ( - relayDir = c.MkDir() - UUIDs = []string{ - "53ea0ed1-9bf8-11e6-8bea-64006a897c73.000001", - "53ea0ed1-9bf8-11e6-8bea-64006a897c72.000002", - "53ea0ed1-9bf8-11e6-8bea-64006a897c71.000003", - } - currentUUID = UUIDs[len(UUIDs)-1] // no next UUID - ) - - // invalid UUID in UUIDs, error - UUIDs = append(UUIDs, "invalid.uuid") - t.writeUUIDs(c, relayDir, UUIDs) - checker := &relayLogFileChecker{ - relayDir: relayDir, - currentUUID: currentUUID, - } - switchPath, err := checker.getSwitchPath() - c.Assert(switchPath, IsNil) - c.Assert(err, ErrorMatches, ".*not valid.*") - - UUIDs = UUIDs[:len(UUIDs)-1] // remove the invalid UUID - t.writeUUIDs(c, relayDir, UUIDs) - - // no next sub directory - checker.currentUUID = UUIDs[0] - switchPath, err = checker.getSwitchPath() - c.Assert(switchPath, IsNil) - c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) - - // create next sub directory, block - err = os.Mkdir(filepath.Join(relayDir, UUIDs[1]), 0o700) - c.Assert(err, IsNil) - switchPath, err = checker.getSwitchPath() - c.Assert(switchPath, IsNil) - c.Assert(err, IsNil) - - // create a relay log file in the next sub directory - nextBinlogPath := filepath.Join(relayDir, UUIDs[1], "mysql-bin.000001") - err = os.MkdirAll(filepath.Dir(nextBinlogPath), 0o700) - c.Assert(err, IsNil) - err = os.WriteFile(nextBinlogPath, nil, 0o600) - c.Assert(err, IsNil) - - // switch to the next - switchPath, err = checker.getSwitchPath() - c.Assert(switchPath.nextUUID, Equals, UUIDs[1]) - c.Assert(switchPath.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) - c.Assert(err, IsNil) -} - -// nolint:unparam -func (t *testFileSuite) writeUUIDs(c *C, relayDir string, uuids []string) []byte { - indexPath := path.Join(relayDir, utils.UUIDIndexFilename) - var buf bytes.Buffer - for _, uuid := range uuids { - _, err := buf.WriteString(uuid) - c.Assert(err, IsNil) - _, err = buf.WriteString("\n") - c.Assert(err, IsNil) - } - - // write the index file - err := os.WriteFile(indexPath, buf.Bytes(), 0o600) - c.Assert(err, IsNil) - return buf.Bytes() -} - func (t *testFileSuite) TestReadSortedBinlogFromDir(c *C) { dir := c.MkDir() filenames := []string{ diff --git a/dm/relay/local_reader.go b/dm/relay/local_reader.go index 49f51d29e0a..7e6bd3f3a05 100644 --- a/dm/relay/local_reader.go +++ b/dm/relay/local_reader.go @@ -16,6 +16,7 @@ package relay import ( "context" "fmt" + "io" "os" "path" "path/filepath" @@ -23,6 +24,7 @@ import ( "sync" "time" + "github.com/BurntSushi/toml" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" "github.com/google/uuid" @@ -43,9 +45,6 @@ import ( // this is mainly happened when upstream master changed when relay log not finish reading a transaction. var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated") -// polling interval for watcher. -var watcherInterval = 100 * time.Millisecond - // BinlogReaderConfig is the configuration for BinlogReader. type BinlogReaderConfig struct { RelayDir string @@ -74,6 +73,8 @@ type BinlogReader struct { // ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times notifyCh chan interface{} relay Process + + currentUUID string // current UUID(with suffix) } // newBinlogReader creates a new BinlogReader. @@ -125,7 +126,7 @@ func (r *BinlogReader) checkRelayPos(pos mysql.Position) error { func (r *BinlogReader) IsGTIDCoverPreviousFiles(ctx context.Context, filePath string, gset mysql.GTIDSet) (bool, error) { fileReader := reader.NewFileReader(&reader.FileReaderConfig{Timezone: r.cfg.Timezone}) defer fileReader.Close() - err := fileReader.StartSyncByPos(mysql.Position{Name: filePath, Pos: 4}) + err := fileReader.StartSyncByPos(mysql.Position{Name: filePath, Pos: binlog.FileHeaderLen}) if err != nil { return false, err } @@ -202,7 +203,7 @@ func (r *BinlogReader) getPosByGTID(gset mysql.GTIDSet) (*mysql.Position, error) // Start at the beginning of the file return &mysql.Position{ Name: binlog.ConstructFilenameWithUUIDSuffix(fileName, utils.SuffixIntToStr(suffix)), - Pos: 4, + Pos: binlog.FileHeaderLen, }, nil } } @@ -293,69 +294,97 @@ func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (reader.Streamer, err return s, nil } +// SwitchPath represents next binlog file path which should be switched. +type SwitchPath struct { + nextUUID string + nextBinlogName string +} + // parseRelay parses relay root directory, it support master-slave switch (switching to next sub directory). func (r *BinlogReader) parseRelay(ctx context.Context, s *LocalStreamer, pos mysql.Position) error { - var ( - needSwitch bool - nextUUID string - nextBinlogName string - err error - ) - + currentUUID, _, realPos, err := binlog.ExtractPos(pos, r.uuids) + if err != nil { + return terror.Annotatef(err, "parse relay dir with pos %v", pos) + } + r.currentUUID = currentUUID for { select { case <-ctx.Done(): return ctx.Err() default: } - needSwitch, nextUUID, nextBinlogName, err = r.parseDirAsPossible(ctx, s, pos) + needSwitch, err := r.parseDirAsPossible(ctx, s, realPos) if err != nil { return err } if !needSwitch { return terror.ErrNoSubdirToSwitch.Generate() } - - _, suffixInt, err2 := utils.ParseSuffixForUUID(nextUUID) - if err2 != nil { - return terror.Annotatef(err2, "parse suffix for UUID %s", nextUUID) + // update new uuid + if err = r.updateUUIDs(); err != nil { + return err } - uuidSuffix := utils.SuffixIntToStr(suffixInt) - - parsed, err2 := binlog.ParseFilename(nextBinlogName) - if err2 != nil { - return terror.Annotatef(err2, "parse binlog file name %s", nextBinlogName) + switchPath, err := r.getSwitchPath() + if err != nil { + return err + } + if switchPath == nil { + // should not happen, we have just called it inside parseDirAsPossible successfully. + return errors.New("failed to get switch path") } + r.currentUUID = switchPath.nextUUID // update pos, so can switch to next sub directory - pos.Name = binlog.ConstructFilenameWithUUIDSuffix(parsed, uuidSuffix) - pos.Pos = 4 // start from pos 4 for next sub directory / file - r.tctx.L().Info("switching to next ready sub directory", zap.String("next uuid", nextUUID), zap.Stringer("position", pos)) + realPos.Name = switchPath.nextBinlogName + realPos.Pos = binlog.FileHeaderLen // start from pos 4 for next sub directory / file + r.tctx.L().Info("switching to next ready sub directory", zap.String("next uuid", r.currentUUID), zap.Stringer("position", pos)) } } -// parseDirAsPossible parses relay sub directory as far as possible. -func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, pos mysql.Position) (needSwitch bool, nextUUID string, nextBinlogName string, err error) { - currentUUID, _, realPos, err := binlog.ExtractPos(pos, r.uuids) +func (r *BinlogReader) getSwitchPath() (*SwitchPath, error) { + // reload uuid + uuids, err := utils.ParseUUIDIndex(r.indexPath) if err != nil { - return false, "", "", terror.Annotatef(err, "parse relay dir with pos %v", pos) + return nil, err } - pos = realPos // use realPos to do syncing + nextUUID, _, err := getNextUUID(r.currentUUID, uuids) + if err != nil { + return nil, err + } + if len(nextUUID) == 0 { + return nil, nil + } + + // try to get the first binlog file in next subdirectory + nextBinlogName, err := getFirstBinlogName(r.cfg.RelayDir, nextUUID) + if err != nil { + // because creating subdirectory and writing relay log file are not atomic + if terror.ErrBinlogFilesNotFound.Equal(err) { + return nil, nil + } + return nil, err + } + + return &SwitchPath{nextUUID, nextBinlogName}, nil +} + +// parseDirAsPossible parses relay sub directory as far as possible. +func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, pos mysql.Position) (needSwitch bool, err error) { firstParse := true // the first parse time for the relay log file - dir := path.Join(r.cfg.RelayDir, currentUUID) + dir := path.Join(r.cfg.RelayDir, r.currentUUID) r.tctx.L().Info("start to parse relay log files in sub directory", zap.String("directory", dir), zap.Stringer("position", pos)) for { select { case <-ctx.Done(): - return false, "", "", ctx.Err() + return false, ctx.Err() default: } files, err := CollectBinlogFilesCmp(dir, pos.Name, FileCmpBiggerEqual) if err != nil { - return false, "", "", terror.Annotatef(err, "parse relay dir %s with pos %s", dir, pos) + return false, terror.Annotatef(err, "parse relay dir %s with pos %s", dir, pos) } else if len(files) == 0 { - return false, "", "", terror.ErrNoRelayLogMatchPos.Generate(dir, pos) + return false, terror.ErrNoRelayLogMatchPos.Generate(dir, pos) } r.tctx.L().Debug("start read relay log files", zap.Strings("files", files), zap.String("directory", dir), zap.Stringer("position", pos)) @@ -365,28 +394,33 @@ func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, latestName string offset = int64(pos.Pos) ) + // TODO will this happen? + // previously, we use ParseFile which will handle offset < 4, now we use ParseReader which won't + if offset < binlog.FileHeaderLen { + offset = binlog.FileHeaderLen + } for i, relayLogFile := range files { select { case <-ctx.Done(): - return false, "", "", ctx.Err() + return false, ctx.Err() default: } if i == 0 { if !strings.HasSuffix(relayLogFile, pos.Name) { - return false, "", "", terror.ErrFirstRelayLogNotMatchPos.Generate(relayLogFile, pos) + return false, terror.ErrFirstRelayLogNotMatchPos.Generate(relayLogFile, pos) } } else { - offset = 4 // for other relay log file, start parse from 4 - firstParse = true // new relay log file need to parse + offset = binlog.FileHeaderLen // for other relay log file, start parse from 4 + firstParse = true // new relay log file need to parse } - needSwitch, latestPos, nextUUID, nextBinlogName, err = r.parseFileAsPossible(ctx, s, relayLogFile, offset, dir, firstParse, currentUUID, i == len(files)-1) + needSwitch, latestPos, err = r.parseFileAsPossible(ctx, s, relayLogFile, offset, dir, firstParse, i == len(files)-1) if err != nil { - return false, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, offset, dir) + return false, terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, offset, dir) } firstParse = false // already parsed if needSwitch { // need switch to next relay sub directory - return true, nextUUID, nextBinlogName, nil + return true, nil } latestName = relayLogFile // record the latest file name } @@ -397,29 +431,57 @@ func (r *BinlogReader) parseDirAsPossible(ctx context.Context, s *LocalStreamer, } } +type binlogFileParseState struct { + // readonly states + possibleLast bool + fullPath string + relayLogFile, relayLogDir string + + f *os.File + + // states may change + replaceWithHeartbeat bool + formatDescEventRead bool + latestPos int64 +} + // parseFileAsPossible parses single relay log file as far as possible. -func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, relayLogDir string, firstParse bool, currentUUID string, possibleLast bool) (needSwitch bool, latestPos int64, nextUUID string, nextBinlogName string, err error) { - var needReParse bool - latestPos = offset - replaceWithHeartbeat := false - r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", latestPos), zap.String("directory", relayLogDir)) +func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer, relayLogFile string, offset int64, relayLogDir string, firstParse bool, possibleLast bool) (bool, int64, error) { + r.tctx.L().Debug("start to parse relay log file", zap.String("file", relayLogFile), zap.Int64("position", offset), zap.String("directory", relayLogDir)) + + fullPath := filepath.Join(relayLogDir, relayLogFile) + f, err := os.Open(fullPath) + if err != nil { + return false, 0, errors.Trace(err) + } + defer f.Close() + + state := &binlogFileParseState{ + possibleLast: possibleLast, + fullPath: fullPath, + relayLogFile: relayLogFile, + relayLogDir: relayLogDir, + f: f, + latestPos: offset, + replaceWithHeartbeat: false, + } for { select { case <-ctx.Done(): - return false, 0, "", "", ctx.Err() + return false, 0, ctx.Err() default: } - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile(ctx, s, relayLogFile, latestPos, relayLogDir, firstParse, currentUUID, possibleLast, replaceWithHeartbeat) + needSwitch, needReParse, err := r.parseFile(ctx, s, firstParse, state) if err != nil { - return false, 0, "", "", terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, latestPos, relayLogDir) + return false, 0, terror.Annotatef(err, "parse relay log file %s from offset %d in dir %s", relayLogFile, state.latestPos, relayLogDir) } firstParse = false // set to false to handle the `continue` below if needReParse { r.tctx.L().Debug("continue to re-parse relay log file", zap.String("file", relayLogFile), zap.String("directory", relayLogDir)) continue // should continue to parse this file } - return needSwitch, latestPos, nextUUID, nextBinlogName, nil + return needSwitch, state.latestPos, nil } } @@ -428,43 +490,36 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer func (r *BinlogReader) parseFile( ctx context.Context, s *LocalStreamer, - relayLogFile string, - offset int64, - relayLogDir string, firstParse bool, - currentUUID string, - possibleLast bool, - replaceWithHeartbeat bool, -) (needSwitch, needReParse bool, latestPos int64, nextUUID, nextBinlogName string, currentReplaceFlag bool, err error) { - _, suffixInt, err := utils.ParseSuffixForUUID(currentUUID) + state *binlogFileParseState, +) (needSwitch, needReParse bool, err error) { + _, suffixInt, err := utils.ParseSuffixForUUID(r.currentUUID) if err != nil { - return false, false, 0, "", "", false, err + return false, false, err } - uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name - latestPos = offset // set to argument passed in + offset := state.latestPos onEventFunc := func(e *replication.BinlogEvent) error { - r.tctx.L().Debug("read event", zap.Reflect("header", e.Header)) + if ce := r.tctx.L().Check(zap.DebugLevel, ""); ce != nil { + r.tctx.L().Debug("read event", zap.Reflect("header", e.Header)) + } r.latestServerID = e.Header.ServerID // record server_id switch ev := e.Event.(type) { case *replication.FormatDescriptionEvent: - // go-mysql will send a duplicate FormatDescriptionEvent event when offset > 4, ignore it - if offset > 4 { - return nil - } - // else just update lastPos - latestPos = int64(e.Header.LogPos) + state.formatDescEventRead = true + state.latestPos = int64(e.Header.LogPos) case *replication.RotateEvent: // add master UUID suffix to pos.Name parsed, _ := binlog.ParseFilename(string(ev.NextLogName)) + uuidSuffix := utils.SuffixIntToStr(suffixInt) // current UUID's suffix, which will be added to binlog name nameWithSuffix := binlog.ConstructFilenameWithUUIDSuffix(parsed, uuidSuffix) ev.NextLogName = []byte(nameWithSuffix) if e.Header.Timestamp != 0 && e.Header.LogPos != 0 { // not fake rotate event, update file pos - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) } else { r.tctx.L().Debug("skip fake rotate event", zap.Reflect("header", e.Header)) } @@ -480,35 +535,35 @@ func (r *BinlogReader) parseFile( r.tctx.L().Info("rotate binlog", zap.Stringer("position", currentPos)) case *replication.GTIDEvent: if r.prevGset == nil { - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) break } u, _ := uuid.FromBytes(ev.SID) - replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO)) + state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO)) if err != nil { return errors.Trace(err) } - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) case *replication.MariadbGTIDEvent: if r.prevGset == nil { - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) break } GTID := ev.GTID - replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) if err != nil { return errors.Trace(err) } - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) case *replication.XIDEvent: ev.GSet = r.getCurrentGtidSet() - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) case *replication.QueryEvent: ev.GSet = r.getCurrentGtidSet() - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) default: // update file pos - latestPos = int64(e.Header.LogPos) + state.latestPos = int64(e.Header.LogPos) } // align with MySQL @@ -516,7 +571,7 @@ func (r *BinlogReader) parseFile( // replace it with HEARTBEAT event // for Mariadb, it will bee replaced with MARIADB_GTID_LIST_EVENT // In DM, we replace both of them with HEARTBEAT event - if replaceWithHeartbeat { + if state.replaceWithHeartbeat { switch e.Event.(type) { // Only replace transaction event // Other events such as FormatDescriptionEvent, RotateEvent, etc. should be the same as before @@ -534,29 +589,44 @@ func (r *BinlogReader) parseFile( return nil } - fullPath := filepath.Join(relayLogDir, relayLogFile) - if firstParse { // if the file is the first time to parse, send a fake ROTATE_EVENT before parse binlog file // ref: https://github.com/mysql/mysql-server/blob/4f1d7cf5fcb11a3f84cff27e37100d7295e7d5ca/sql/rpl_binlog_sender.cc#L248 - e, err2 := utils.GenFakeRotateEvent(relayLogFile, uint64(offset), r.latestServerID) + e, err2 := utils.GenFakeRotateEvent(state.relayLogFile, uint64(offset), r.latestServerID) if err2 != nil { - return false, false, 0, "", "", false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", relayLogFile, offset) + return false, false, terror.Annotatef(err2, "generate fake RotateEvent for (%s: %d)", state.relayLogFile, offset) } err2 = onEventFunc(e) if err2 != nil { - return false, false, 0, "", "", false, terror.Annotatef(err2, "send event %+v", e.Header) + return false, false, terror.Annotatef(err2, "send event %+v", e.Header) } - r.tctx.L().Info("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset)) + r.tctx.L().Info("start parse relay log file", zap.String("file", state.fullPath), zap.Int64("offset", offset)) } else { - r.tctx.L().Debug("start parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset)) + r.tctx.L().Debug("start parse relay log file", zap.String("file", state.fullPath), zap.Int64("offset", offset)) + } + + // parser needs the FormatDescriptionEvent to work correctly + // if we start parsing from the middle, we need to read FORMAT DESCRIPTION event first + if !state.formatDescEventRead && offset > binlog.FileHeaderLen { + if err = r.parseFormatDescEvent(state); err != nil { + if state.possibleLast && isIgnorableParseError(err) { + return r.waitBinlogChanged(ctx, state) + } + return false, false, terror.ErrParserParseRelayLog.Delegate(err, state.fullPath) + } + state.formatDescEventRead = true + } + + // we need to seek explicitly, as parser may read in-complete event and return error(ignorable) last time + // and offset may be messed up + if _, err = state.f.Seek(offset, io.SeekStart); err != nil { + return false, false, terror.ErrParserParseRelayLog.Delegate(err, state.fullPath) } - // use parser.ParseFile directly now, if needed we can change to use FileReader. - err = r.parser.ParseFile(fullPath, offset, onEventFunc) + err = r.parser.ParseReader(state.f, onEventFunc) if err != nil { - if possibleLast && isIgnorableParseError(err) { - r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) + if state.possibleLast && isIgnorableParseError(err) { + r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", state.fullPath), zap.Int64("offset", offset), zap.Error(err)) // the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer // TODO: should add a integration test for this e := &replication.BinlogEvent{ @@ -567,66 +637,126 @@ func (r *BinlogReader) parseFile( } s.ch <- e } else { - r.tctx.L().Error("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", offset), zap.Error(err)) - return false, false, 0, "", "", false, terror.ErrParserParseRelayLog.Delegate(err, fullPath) + r.tctx.L().Error("parse relay log file", zap.String("file", state.fullPath), zap.Int64("offset", offset), zap.Error(err)) + return false, false, terror.ErrParserParseRelayLog.Delegate(err, state.fullPath) } } - r.tctx.L().Debug("parse relay log file", zap.String("file", fullPath), zap.Int64("offset", latestPos)) + r.tctx.L().Debug("parse relay log file", zap.String("file", state.fullPath), zap.Int64("offset", state.latestPos)) - if !possibleLast { + if !state.possibleLast { // there are more relay log files in current sub directory, continue to re-collect them - r.tctx.L().Info("more relay log files need to parse", zap.String("directory", relayLogDir)) - return false, false, latestPos, "", "", false, nil + r.tctx.L().Info("more relay log files need to parse", zap.String("directory", state.relayLogDir)) + return false, false, nil } - switchCh := make(chan SwitchPath, 1) - updatePathCh := make(chan string, 1) - updateErrCh := make(chan error, 1) - newCtx, cancel := context.WithCancel(ctx) - var wg sync.WaitGroup - defer func() { - cancel() - wg.Wait() - }() + return r.waitBinlogChanged(ctx, state) +} - wg.Add(1) - go func(latestPos int64) { - defer wg.Done() - checker := relayLogFileChecker{ - notifier: r, - relayDir: r.cfg.RelayDir, - currentUUID: currentUUID, - latestRelayLogDir: relayLogDir, - latestFilePath: fullPath, - latestFile: relayLogFile, - beginOffset: offset, - endOffset: latestPos, - } - // TODO no need to be a goroutine now, maybe refactored when refactoring parseFile itself. - checker.relayLogUpdatedOrNewCreated(newCtx, updatePathCh, switchCh, updateErrCh) - }(latestPos) +func (r *BinlogReader) waitBinlogChanged(ctx context.Context, state *binlogFileParseState) (needSwitch, needReParse bool, err error) { + active, relayOffset := r.relay.IsActive(r.currentUUID, state.relayLogFile) + if active && relayOffset > state.latestPos { + return false, true, nil + } + if !active { + meta := &LocalMeta{} + _, err := toml.DecodeFile(filepath.Join(state.relayLogDir, utils.MetaFilename), meta) + if err != nil { + return false, false, terror.Annotate(err, "decode relay meta toml file failed") + } + // current watched file size have no change means that no new writes have been made + // our relay meta file will be updated immediately after receive the rotate event, + // although we cannot ensure that the binlog filename in the meta is the next file after latestFile + // but if we return a different filename with latestFile, the outer logic (parseDirAsPossible) + // will find the right one + if meta.BinLogName != state.relayLogFile { + // we need check file size again, as the file may have been changed during our metafile check + cmp, err2 := fileSizeUpdated(state.fullPath, state.latestPos) + if err2 != nil { + return false, false, terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", state.fullPath, state.latestPos) + } + switch { + case cmp < 0: + return false, false, terror.ErrRelayLogFileSizeSmaller.Generate(state.fullPath) + case cmp > 0: + return false, true, nil + default: + nextFilePath := filepath.Join(state.relayLogDir, meta.BinLogName) + log.L().Info("newer relay log file is already generated", + zap.String("now file path", state.fullPath), + zap.String("new file path", nextFilePath)) + return false, false, nil + } + } - select { - case <-ctx.Done(): - return false, false, 0, "", "", false, nil - case switchResp := <-switchCh: - // update new uuid - if err = r.updateUUIDs(); err != nil { - return false, false, 0, "", "", false, nil + // maybe UUID index file changed + switchPath, err := r.getSwitchPath() + if err != nil { + return false, false, err } - return true, false, 0, switchResp.nextUUID, switchResp.nextBinlogName, false, nil - case updatePath := <-updatePathCh: - if strings.HasSuffix(updatePath, relayLogFile) { - // current relay log file updated, need to re-parse it - return false, true, latestPos, "", "", replaceWithHeartbeat, nil + if switchPath != nil { + // we need check file size again, as the file may have been changed during path check + cmp, err := fileSizeUpdated(state.fullPath, state.latestPos) + if err != nil { + return false, false, terror.Annotatef(err, "latestFilePath=%s endOffset=%d", state.fullPath, state.latestPos) + } + switch { + case cmp < 0: + return false, false, terror.ErrRelayLogFileSizeSmaller.Generate(state.fullPath) + case cmp > 0: + return false, true, nil + default: + log.L().Info("newer relay uuid path is already generated", + zap.String("current path", state.relayLogDir), + zap.Any("new path", switchPath)) + return true, false, nil + } + } + } + + for { + select { + case <-ctx.Done(): + return false, false, nil + case <-r.Notified(): + active, relayOffset = r.relay.IsActive(r.currentUUID, state.relayLogFile) + if active { + if relayOffset > state.latestPos { + return false, true, nil + } + // already read to relayOffset, try again + continue + } + // file may have changed, try parse and check again + return false, true, nil } - // need parse next relay log file or re-collect files - return false, false, latestPos, "", "", false, nil - case err := <-updateErrCh: - return false, false, 0, "", "", false, err } } +func (r *BinlogReader) parseFormatDescEvent(state *binlogFileParseState) error { + // FORMAT_DESCRIPTION event should always be read by default (despite that fact passed offset may be higher than 4) + if _, err := state.f.Seek(binlog.FileHeaderLen, io.SeekStart); err != nil { + return errors.Errorf("seek to 4, error %v", err) + } + + onEvent := func(e *replication.BinlogEvent) error { + if _, ok := e.Event.(*replication.FormatDescriptionEvent); ok { + return nil + } + // the first event in binlog file must be FORMAT_DESCRIPTION event. + return errors.New("corrupted binlog file") + } + eofWhenReadHeader, err := r.parser.ParseSingleEvent(state.f, onEvent) + if err != nil { + return errors.Annotatef(err, "parse FormatDescriptionEvent") + } + if eofWhenReadHeader { + // when parser met EOF when reading event header, ParseSingleEvent returns nil error + // return EOF so isIgnorableParseError can capture + return io.EOF + } + return nil +} + // updateUUIDs re-parses UUID index file and updates UUID list. func (r *BinlogReader) updateUUIDs() error { uuids, err := utils.ParseUUIDIndex(r.indexPath) diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index f57456a72f8..2b9fb27491b 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -64,21 +64,58 @@ func (t *testReaderSuite) TearDownSuite(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval"), IsNil) } -func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify bool) *BinlogReader { +func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify bool, uuid string) *BinlogReader { relay := NewRealRelay(&Config{Flavor: gmysql.MySQLFlavor}) - reader := newBinlogReader(logger, cfg, relay) + r := newBinlogReader(logger, cfg, relay) if notify { - reader.notifyCh <- struct{}{} + r.notifyCh <- struct{}{} } - return reader + r.currentUUID = uuid + return r +} + +func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) { + relay := r.(*Relay) + writer := relay.writer.(*FileWriter) + writer.out.uuid, writer.out.filename = uuid, filename + writer.out.offset.Store(offset) +} + +func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLogFile string, offset int64, possibleLast bool) *binlogFileParseState { + fullPath := filepath.Join(relayLogDir, relayLogFile) + f, err := os.Open(fullPath) + c.Assert(err, IsNil) + + return &binlogFileParseState{ + possibleLast: possibleLast, + fullPath: fullPath, + relayLogFile: relayLogFile, + relayLogDir: relayLogDir, + f: f, + latestPos: offset, + replaceWithHeartbeat: false, + } +} + +func (t *testReaderSuite) TestparseFileAsPossibleFileNotExist(c *C) { + var ( + baseDir = c.MkDir() + currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" + filename = "test-mysql-bin.000001" + relayDir = path.Join(baseDir, currentUUID) + ) + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + needSwitch, lastestPos, err := r.parseFileAsPossible(context.Background(), nil, filename, 4, relayDir, true, false) + c.Assert(needSwitch, IsFalse) + c.Assert(lastestPos, Equals, int64(0)) + c.Assert(err, ErrorMatches, ".*no such file or directory.*") } func (t *testReaderSuite) TestParseFileBase(c *C) { var ( filename = "test-mysql-bin.000001" baseDir = c.MkDir() - offset int64 - firstParse = true possibleLast = false baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) s = newLocalStreamer() @@ -86,177 +123,168 @@ func (t *testReaderSuite) TestParseFileBase(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // no valid currentUUID provide, failed - currentUUID := "invalid-current-uuid" - relayDir := filepath.Join(baseDir, currentUUID) - cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r := newBinlogReaderForTest(log.L(), cfg, true) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, ErrorMatches, ".*invalid-current-uuid.*") - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - // change to valid currentUUID - currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" - relayDir = filepath.Join(baseDir, currentUUID) + currentUUID := "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" + relayDir := filepath.Join(baseDir, currentUUID) fullPath := filepath.Join(relayDir, filename) - cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) - - // relay log file not exists, failed - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the path specified).*") - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - - // empty relay log file, failed, got EOF - err = os.MkdirAll(relayDir, 0o700) - c.Assert(err, IsNil) - f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) - c.Assert(err, IsNil) + err1 := os.MkdirAll(relayDir, 0o700) + c.Assert(err1, IsNil) + f, err1 := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) + c.Assert(err1, IsNil) defer f.Close() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(errors.Cause(err), Equals, io.EOF) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) + + // empty relay log file, got EOF when reading format description event separately and possibleLast = false + { + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + t.setActiveRelayLog(r.relay, currentUUID, filename, 0) + state := t.createBinlogFileParseState(c, relayDir, filename, 100, possibleLast) + needSwitch, needReParse, err := r.parseFile(ctx, s, true, state) + c.Assert(errors.Cause(err), Equals, io.EOF) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsFalse) + c.Assert(state.latestPos, Equals, int64(100)) + c.Assert(state.formatDescEventRead, IsFalse) + c.Assert(state.replaceWithHeartbeat, Equals, false) + } // write some events to binlog file - _, err = f.Write(replication.BinLogFileHeader) - c.Assert(err, IsNil) + _, err1 = f.Write(replication.BinLogFileHeader) + c.Assert(err1, IsNil) for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) + _, err1 = f.Write(ev.RawData) + c.Assert(err1, IsNil) } + fileSize, _ := f.Seek(0, io.SeekCurrent) t.purgeStreamer(c, s) // base test with only one valid binlog file - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - - // try get events back, firstParse should have fake RotateEvent - var fakeRotateEventCount int - i := 0 - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { - if ev.Header.EventType == replication.ROTATE_EVENT { - fakeRotateEventCount++ + { + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + t.setActiveRelayLog(r.relay, currentUUID, filename, fileSize) + state := t.createBinlogFileParseState(c, relayDir, filename, 4, possibleLast) + needSwitch, needReParse, err := r.parseFile(ctx, s, true, state) + c.Assert(err, IsNil) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsFalse) + c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, IsFalse) + + // try get events back, firstParse should have fake RotateEvent + var fakeRotateEventCount int + i := 0 + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { + if ev.Header.EventType == replication.ROTATE_EVENT { + fakeRotateEventCount++ + } + continue // ignore fake event + } + c.Assert(ev, DeepEquals, baseEvents[i]) + i++ + if i >= len(baseEvents) { + break } - continue // ignore fake event - } - c.Assert(ev, DeepEquals, baseEvents[i]) - i++ - if i >= len(baseEvents) { - break } + c.Assert(fakeRotateEventCount, Equals, 1) + t.verifyNoEventsInStreamer(c, s) } - c.Assert(fakeRotateEventCount, Equals, 1) - t.verifyNoEventsInStreamer(c, s) - // try get events back, not firstParse should have no fake RotateEvent - firstParse = false - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - fakeRotateEventCount = 0 - i = 0 - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { - if ev.Header.EventType == replication.ROTATE_EVENT { - fakeRotateEventCount++ + // try get events back, since firstParse=false, should have no fake RotateEvent + { + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + t.setActiveRelayLog(r.relay, currentUUID, filename, fileSize) + state := t.createBinlogFileParseState(c, relayDir, filename, 4, possibleLast) + needSwitch, needReParse, err := r.parseFile(ctx, s, false, state) + c.Assert(err, IsNil) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsFalse) + c.Assert(state.latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, Equals, false) + fakeRotateEventCount := 0 + i := 0 + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 { + if ev.Header.EventType == replication.ROTATE_EVENT { + fakeRotateEventCount++ + } + continue // ignore fake event + } + c.Assert(ev, DeepEquals, baseEvents[i]) + i++ + if i >= len(baseEvents) { + break } - continue // ignore fake event - } - c.Assert(ev, DeepEquals, baseEvents[i]) - i++ - if i >= len(baseEvents) { - break } + c.Assert(fakeRotateEventCount, Equals, 0) + t.verifyNoEventsInStreamer(c, s) } - c.Assert(fakeRotateEventCount, Equals, 0) - t.verifyNoEventsInStreamer(c, s) // generate another non-fake RotateEvent - rotateEv, err := event.GenRotateEvent(baseEvents[0].Header, uint32(latestPos), []byte("mysql-bin.888888"), 4) + rotateEv, err := event.GenRotateEvent(baseEvents[0].Header, uint32(fileSize), []byte("mysql-bin.888888"), 4) c.Assert(err, IsNil) _, err = f.Write(rotateEv.RawData) c.Assert(err, IsNil) + fileSize, _ = f.Seek(0, io.SeekCurrent) // latest is still the end_log_pos of the last event, not the next relay file log file's position - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(rotateEv.Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - t.purgeStreamer(c, s) - - // parse from a non-zero offset - offset = int64(rotateEv.Header.LogPos - rotateEv.Header.EventSize) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(rotateEv.Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) + { + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + t.setActiveRelayLog(r.relay, currentUUID, filename, fileSize) + state := t.createBinlogFileParseState(c, relayDir, filename, 4, possibleLast) + needSwitch, needReParse, err := r.parseFile(ctx, s, true, state) + c.Assert(err, IsNil) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsFalse) + c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, Equals, false) + t.purgeStreamer(c, s) + } - // should only get a RotateEvent - i = 0 - for { - ev, err2 := s.GetEvent(ctx) - c.Assert(err2, IsNil) - switch ev.Header.EventType { - case replication.ROTATE_EVENT: - c.Assert(ev.RawData, DeepEquals, rotateEv.RawData) - i++ - default: - c.Fatalf("got unexpected event %+v", ev.Header) - } - if i >= 1 { - break + // parse from offset > 4 + { + cfg := &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + t.setActiveRelayLog(r.relay, currentUUID, filename, fileSize) + offset := int64(rotateEv.Header.LogPos - rotateEv.Header.EventSize) + state := t.createBinlogFileParseState(c, relayDir, filename, offset, possibleLast) + needSwitch, needReParse, err := r.parseFile(ctx, s, false, state) + c.Assert(err, IsNil) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsFalse) + c.Assert(state.latestPos, Equals, int64(rotateEv.Header.LogPos)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, Equals, false) + + // should only get a RotateEvent + i := 0 + for { + ev, err2 := s.GetEvent(ctx) + c.Assert(err2, IsNil) + switch ev.Header.EventType { + case replication.ROTATE_EVENT: + c.Assert(ev.RawData, DeepEquals, rotateEv.RawData) + i++ + default: + c.Fatalf("got unexpected event %+v", ev.Header) + } + if i >= 1 { + break + } } + t.verifyNoEventsInStreamer(c, s) } - t.verifyNoEventsInStreamer(c, s) - - cancel() } func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { @@ -266,7 +294,6 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { notUsedGTIDSetStr = t.lastGTID.String() baseDir = c.MkDir() offset int64 - firstParse = true possibleLast = true currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" switchedUUID = "b60868af-5a6f-11e9-9ea3-0242ac160007.000002" @@ -276,7 +303,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { nextFullPath = filepath.Join(nextRelayDir, nextFilename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, currentUUID) ) // create the current relay log file and meta @@ -290,23 +317,6 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { c.Assert(err, IsNil) t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - // invalid UUID in UUID list, error - r.uuids = []string{currentUUID, "invalid.uuid"} - t.writeUUIDs(c, baseDir, r.uuids) - ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, ErrorMatches, ".*not valid.*") - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) - t.purgeStreamer(c, s) - - // next sub dir exits, need to switch r.uuids = []string{currentUUID, switchedUUID} t.writeUUIDs(c, baseDir, r.uuids) err = os.MkdirAll(nextRelayDir, 0o700) @@ -318,15 +328,16 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) { ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) defer cancel2() t.createMetaFile(c, nextRelayDir, filename, uint32(offset), notUsedGTIDSetStr) - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) + state := t.createBinlogFileParseState(c, relayDir, filename, offset, possibleLast) + state.formatDescEventRead = true + t.setActiveRelayLog(r.relay, "next", "next", 4) + needSwitch, needReParse, err := r.parseFile(ctx2, s, true, state) c.Assert(err, IsNil) c.Assert(needSwitch, IsTrue) c.Assert(needReParse, IsFalse) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, switchedUUID) - c.Assert(nextBinlogName, Equals, nextFilename) - c.Assert(replaceWithHeartbeat, Equals, false) + c.Assert(state.latestPos, Equals, int64(4)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, Equals, false) t.purgeStreamer(c, s) // NOTE: if we want to test the returned `needReParse` of `needSwitchSubDir`, @@ -338,8 +349,6 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { filename = "test-mysql-bin.000001" notUsedGTIDSetStr = t.lastGTID.String() baseDir = c.MkDir() - offset int64 - firstParse = true possibleLast = true baseEvents, _, _ = t.genBinlogEvents(c, t.lastPos, t.lastGTID) currentUUID = "b60868af-5a6f-11e9-9ea3-0242ac160006.000001" @@ -347,7 +356,6 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { fullPath = filepath.Join(relayDir, filename) s = newLocalStreamer() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) ) // create the current relay log file and write some events @@ -356,50 +364,34 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) { f, err := os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY, 0o600) c.Assert(err, IsNil) defer f.Close() - t.createMetaFile(c, relayDir, filename, uint32(offset), notUsedGTIDSetStr) - - // file has no data, meet io.EOF error (when reading file header) and ignore it. - ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel1() - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err := r.parseFile( - ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsTrue) - c.Assert(latestPos, Equals, int64(0)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) + t.createMetaFile(c, relayDir, filename, 0, notUsedGTIDSetStr) _, err = f.Write(replication.BinLogFileHeader) c.Assert(err, IsNil) - for _, ev := range baseEvents { - _, err = f.Write(ev.RawData) - c.Assert(err, IsNil) - } - _, err = f.Write([]byte("some invalid binlog event data")) + _, err = f.Write(baseEvents[0].RawData[:replication.EventHeaderSize]) c.Assert(err, IsNil) - // meet `err EOF` error (when parsing binlog event) ignored - ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout) - defer cancel2() - r.notifyCh <- struct{}{} // notify again - needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, replaceWithHeartbeat, err = r.parseFile( - ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast, false) - c.Assert(err, IsNil) - c.Assert(needSwitch, IsFalse) - c.Assert(needReParse, IsTrue) - c.Assert(latestPos, Equals, int64(baseEvents[len(baseEvents)-1].Header.LogPos)) - c.Assert(nextUUID, Equals, "") - c.Assert(nextBinlogName, Equals, "") - c.Assert(replaceWithHeartbeat, Equals, false) + // meet io.EOF error when read event and ignore it. + { + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + state := t.createBinlogFileParseState(c, relayDir, filename, 4, possibleLast) + state.formatDescEventRead = true + t.setActiveRelayLog(r.relay, currentUUID, filename, 100) + needSwitch, needReParse, err := r.parseFile(context.Background(), s, true, state) + c.Assert(err, IsNil) + c.Assert(needSwitch, IsFalse) + c.Assert(needReParse, IsTrue) + c.Assert(state.latestPos, Equals, int64(4)) + c.Assert(state.formatDescEventRead, IsTrue) + c.Assert(state.replaceWithHeartbeat, Equals, false) + } } func (t *testReaderSuite) TestUpdateUUIDs(c *C) { var ( baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") ) c.Assert(r.uuids, HasLen, 0) @@ -435,7 +427,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { "b60868af-5a6f-11e9-9ea3-0242ac160008.000003", } cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, false) + r = newBinlogReaderForTest(log.L(), cfg, false, "") startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) @@ -497,6 +489,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { _, err = lastF.Write(ev.RawData) c.Assert(err, IsNil) } + r.notifyCh <- struct{}{} // read extra events back obtainExtraEvents := make([]*replication.BinlogEvent, 0, len(extraEvents)) @@ -521,6 +514,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { err = os.WriteFile(lastFilename, eventsBuf.Bytes(), 0o600) c.Assert(err, IsNil) t.createMetaFile(c, path.Join(baseDir, UUIDs[2]), lastFilename, lastPos, notUsedGTIDSetStr) + r.notifyCh <- struct{}{} obtainExtraEvents2 := make([]*replication.BinlogEvent, 0, len(baseEvents)-1) for { @@ -567,7 +561,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { baseDir = c.MkDir() events []*replication.BinlogEvent cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, false) + r = newBinlogReaderForTest(log.L(), cfg, false, "") lastPos uint32 lastGTID gtid.Set previousGset, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "") @@ -758,7 +752,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { } r.Close() - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") excludeStrs := []string{} // exclude first uuid @@ -802,12 +796,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.Remove(path.Join(baseDir, excludeUUID, "mysql.000001")), IsNil) r.Close() - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") _, err = r.StartSyncByGTID(excludeGset) // error because file has been purge c.Assert(terror.ErrNoRelayPosMatchGTID.Equal(err), IsTrue) @@ -816,12 +810,12 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { c.Assert(os.RemoveAll(path.Join(baseDir, excludeUUID)), IsNil) r.Close() - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") _, err = r.StartSyncByGTID(preGset) c.Assert(err, IsNil) r.Close() - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") _, err = r.StartSyncByGTID(excludeGset) // error because subdir has been purge c.Assert(err, ErrorMatches, ".*no such file or directory.*") @@ -838,7 +832,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { startPos = gmysql.Position{Name: "test-mysql-bin|000001.000001"} // from the first relay log file in the first sub directory ) - r := newBinlogReaderForTest(log.L(), cfg, true) + r := newBinlogReaderForTest(log.L(), cfg, true, "") err := r.checkRelayPos(startPos) c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*") @@ -857,7 +851,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) { c.Assert(s, IsNil) // write UUIDs into index file - r = newBinlogReaderForTest(log.L(), cfg, true) // create a new reader + r = newBinlogReaderForTest(log.L(), cfg, true, "") // create a new reader uuidBytes := t.uuidListToBytes(c, UUIDs) err = os.WriteFile(r.indexPath, uuidBytes, 0o600) c.Assert(err, IsNil) @@ -902,7 +896,7 @@ func (t *testReaderSuite) TestAdvanceCurrentGTIDSet(c *C) { var ( baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") mysqlGset, _ = gmysql.ParseMysqlGTIDSet("b60868af-5a6f-11e9-9ea3-0242ac160006:1-6") mariadbGset, _ = gmysql.ParseMariadbGTIDSet("0-1-5") ) @@ -936,7 +930,7 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) { var ( baseDir = c.MkDir() cfg = &BinlogReaderConfig{RelayDir: baseDir, Flavor: gmysql.MySQLFlavor} - r = newBinlogReaderForTest(log.L(), cfg, true) + r = newBinlogReaderForTest(log.L(), cfg, true, "") uuid = "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001" gtidStr = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" file = "mysql.000001" @@ -1190,3 +1184,342 @@ func (t *testReaderSuite) createMetaFile(c *C, relayDirPath, binlogFileName stri c.Assert(err, IsNil) metaFile.Close() } + +type mockActiveCase struct { + active bool + offset int64 +} + +type mockFileWriterForActiveTest struct { + cnt int + cases []mockActiveCase +} + +func (m *mockFileWriterForActiveTest) Init(uuid, filename string) { + panic("should be used") +} + +func (m *mockFileWriterForActiveTest) Close() error { + panic("should be used") +} + +func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { + panic("should be used") +} + +func (m *mockFileWriterForActiveTest) IsActive(uuid, filename string) (bool, int64) { + v := m.cases[m.cnt] + m.cnt++ + return v.active, v.offset +} + +func (t *testReaderSuite) TestwaitBinlogChanged(c *C) { + var ( + relayFiles = []string{ + "mysql-bin.000001", + "mysql-bin.000002", + } + binlogPos = uint32(4) + binlogGTID = "ba8f633f-1f15-11eb-b1c7-0242ac110002:1" + relayPaths = make([]string, len(relayFiles)) + data = []byte("meaningless file content") + size = int64(len(data)) + ) + + // create relay log dir + subDir := c.MkDir() + // join the file path + for i, rf := range relayFiles { + relayPaths[i] = filepath.Join(subDir, rf) + f, _ := os.Create(relayPaths[i]) + _ = f.Close() + } + + rotateRelayFile := func(filename string) { + meta := LocalMeta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} + metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) + c.Assert(err2, IsNil) + err := toml.NewEncoder(metaFile).Encode(&meta) + c.Assert(err, IsNil) + _ = metaFile.Close() + } + + // meta not found + { + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[0], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, ".*no such file or directory*") + } + + // write meta + rotateRelayFile(relayFiles[0]) + + // relay file not found + { + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := &binlogFileParseState{ + relayLogDir: subDir, + relayLogFile: "not-exist-file", + } + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, ".*no such file or directory*") + } + + // create the first relay file + err1 := os.WriteFile(relayPaths[0], data, 0o600) + c.Assert(err1, IsNil) + // rotate relay file + rotateRelayFile(relayFiles[1]) + + // file decreased when meta changed + { + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[0], size+100, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(err, NotNil) + c.Assert(terror.ErrRelayLogFileSizeSmaller.Equal(err), IsTrue) + } + + // return changed file in meta + { + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[0], size, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(err, IsNil) + } + + // file increased when checking meta + { + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[0], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsTrue) + c.Assert(err, IsNil) + } + + // context timeout (no new write) + { + newCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + cfg := &BinlogReaderConfig{RelayDir: "", Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "current-uuid") // no notify + t.setActiveRelayLog(r.relay, "current-uuid", relayFiles[0], 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[0], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(newCtx, state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(err, IsNil) + } + + // this dir is different from the dir of current binlog file, but for test it doesn't matter + relayDir := c.MkDir() + t.writeUUIDs(c, relayDir, []string{"xxx.000001", "invalid uuid"}) + + // getSwitchPath return error(invalid uuid file) + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "xxx.000001") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(terror.ErrRelayParseUUIDSuffix.Equal(err), IsTrue) + } + + t.writeUUIDs(c, relayDir, []string{"xxx.000001", "xxx.000002"}) + _ = os.MkdirAll(filepath.Join(relayDir, "xxx.000002"), 0o700) + _ = os.WriteFile(filepath.Join(relayDir, "xxx.000002", "mysql.000001"), nil, 0o600) + + // binlog dir switched, but last file not exists so failed to check change of file length + // should not happen in real, just for branch test + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "xxx.000001") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + _ = os.Remove(relayPaths[1]) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(terror.ErrGetRelayLogStat.Equal(err), IsTrue) + } + + err1 = os.WriteFile(relayPaths[1], nil, 0o600) + c.Assert(err1, IsNil) + + // binlog dir switched, but last file smaller + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "xxx.000001") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], size, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsFalse) + c.Assert(terror.ErrRelayLogFileSizeSmaller.Equal(err), IsTrue) + } + + err1 = os.WriteFile(relayPaths[1], data, 0o600) + c.Assert(err1, IsNil) + + // binlog dir switched, but last file bigger + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "xxx.000001") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsTrue) + c.Assert(err, IsNil) + } + + // binlog dir switched, but last file not changed + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, false, "xxx.000001") + t.setActiveRelayLog(r.relay, "next", "next", 0) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], size, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsTrue) + c.Assert(reParse, IsFalse) + c.Assert(err, IsNil) + } + + // got notified and active pos > current read pos + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, "xxx.000001") + t.setActiveRelayLog(r.relay, r.currentUUID, relayFiles[1], size) + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsTrue) + c.Assert(err, IsNil) + } + + // got notified but not active + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, "xxx.000001") + relay := r.relay.(*Relay) + relay.writer = &mockFileWriterForActiveTest{cases: []mockActiveCase{ + {true, 0}, + {false, 0}, + }} + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsTrue) + c.Assert(err, IsNil) + } + + // got notified, first notified is active but has already read to that offset + // second notify, got new data + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, "xxx.000001") + r.notifyCh = make(chan interface{}, 2) + r.notifyCh <- struct{}{} + r.notifyCh <- struct{}{} + relay := r.relay.(*Relay) + relay.writer = &mockFileWriterForActiveTest{cases: []mockActiveCase{ + {true, 0}, + {true, 0}, + {true, size}, + }} + state := t.createBinlogFileParseState(c, subDir, relayFiles[1], 0, true) + needSwitch, reParse, err := r.waitBinlogChanged(context.Background(), state) + c.Assert(needSwitch, IsFalse) + c.Assert(reParse, IsTrue) + c.Assert(err, IsNil) + } +} + +func (t *testReaderSuite) TestGetSwitchPath(c *C) { + var ( + relayDir = c.MkDir() + UUIDs = []string{ + "53ea0ed1-9bf8-11e6-8bea-64006a897c73.000001", + "53ea0ed1-9bf8-11e6-8bea-64006a897c72.000002", + "53ea0ed1-9bf8-11e6-8bea-64006a897c71.000003", + } + currentUUID = UUIDs[len(UUIDs)-1] // no next UUID + ) + + UUIDs = append(UUIDs, "invalid.uuid") + + // invalid UUID in UUIDs, error + t.writeUUIDs(c, relayDir, UUIDs) + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, currentUUID) + switchPath, err := r.getSwitchPath() + c.Assert(switchPath, IsNil) + c.Assert(terror.ErrRelayParseUUIDSuffix.Equal(err), IsTrue) + } + + UUIDs = UUIDs[:len(UUIDs)-1] // remove the invalid UUID + t.writeUUIDs(c, relayDir, UUIDs) + + // no next sub directory + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, UUIDs[0]) + switchPath, err := r.getSwitchPath() + c.Assert(switchPath, IsNil) + c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*(no such file or directory|The system cannot find the file specified).*", UUIDs[1])) + } + + err1 := os.Mkdir(filepath.Join(relayDir, UUIDs[1]), 0o700) + c.Assert(err1, IsNil) + + // uuid directory exist, but no binlog file inside + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, UUIDs[0]) + switchPath, err := r.getSwitchPath() + c.Assert(switchPath, IsNil) + c.Assert(err, IsNil) + } + + // create a relay log file in the next sub directory + nextBinlogPath := filepath.Join(relayDir, UUIDs[1], "mysql-bin.000001") + err1 = os.MkdirAll(filepath.Dir(nextBinlogPath), 0o700) + c.Assert(err1, IsNil) + err1 = os.WriteFile(nextBinlogPath, nil, 0o600) + c.Assert(err1, IsNil) + + // switch to the next + { + cfg := &BinlogReaderConfig{RelayDir: relayDir, Flavor: gmysql.MySQLFlavor} + r := newBinlogReaderForTest(log.L(), cfg, true, UUIDs[0]) + switchPath, err := r.getSwitchPath() + c.Assert(switchPath.nextUUID, Equals, UUIDs[1]) + c.Assert(switchPath.nextBinlogName, Equals, filepath.Base(nextBinlogPath)) + c.Assert(err, IsNil) + } +} diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 329c42fc2d1..908fa5ba796 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -107,6 +107,8 @@ type Process interface { UnRegisterListener(el Listener) // NewReader creates a new relay reader NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader + // IsActive check whether given uuid+filename is active binlog file, if true return current file offset + IsActive(uuid, filename string) (bool, int64) } // Relay relays mysql binlog to local file. @@ -138,7 +140,7 @@ func NewRealRelay(cfg *Config) Process { logger: log.With(zap.String("component", "relay log")), listeners: make(map[Listener]struct{}), } - r.writer = NewFileWriter(r.logger) + r.writer = NewFileWriter(r.logger, cfg.RelayDir) return r } @@ -288,7 +290,7 @@ func (r *Relay) process(ctx context.Context) error { }() uuid, pos := r.meta.Pos() - r.writer.Init(r.meta.Dir(), pos.Name) + r.writer.Init(uuid, pos.Name) r.logger.Info("started underlying writer", zap.String("UUID", uuid), zap.String("filename", pos.Name)) defer func() { err = r.writer.Close() @@ -686,7 +688,9 @@ func (r *Relay) handleEvents( // 3. save events into file writeTimer := time.Now() - r.logger.Debug("writing binlog event", zap.Reflect("header", e.Header)) + if ce := r.logger.Check(zap.DebugLevel, ""); ce != nil { + r.logger.Debug("writing binlog event", zap.Reflect("header", e.Header)) + } wResult, err := r.writer.WriteEvent(e) if err != nil { relayLogWriteErrorCounter.Inc() @@ -1017,6 +1021,10 @@ func (r *Relay) Close() { r.logger.Info("relay unit closed") } +func (r *Relay) IsActive(uuid, filename string) (bool, int64) { + return r.writer.IsActive(uuid, filename) +} + // Status implements the dm.Unit interface. func (r *Relay) Status(sourceStatus *binlog.SourceStatus) interface{} { r.RLock() diff --git a/dm/relay/relay_test.go b/dm/relay/relay_test.go index f9582e87292..154db3f36ee 100644 --- a/dm/relay/relay_test.go +++ b/dm/relay/relay_test.go @@ -122,6 +122,10 @@ type mockWriter struct { latestEvent *replication.BinlogEvent } +func (w *mockWriter) IsActive(uuid, filename string) (bool, int64) { + return false, 0 +} + func (w *mockWriter) Close() error { return nil } diff --git a/dm/relay/relay_writer.go b/dm/relay/relay_writer.go index 5d66063ae3a..bcabe681cf6 100644 --- a/dm/relay/relay_writer.go +++ b/dm/relay/relay_writer.go @@ -49,13 +49,15 @@ type WResult struct { // 5. rollback/discard unfinished binlog entries(events or transactions) type Writer interface { // Init inits the writer, should be called before any other method - Init(relayDir, filename string) + Init(uuid, filename string) // Close closes the writer and release the resource. Close() error // WriteEvent writes an binlog event's data into disk or any other places. // It is not safe for concurrent use by multiple goroutines. WriteEvent(ev *replication.BinlogEvent) (WResult, error) + // IsActive check whether given uuid+filename is active binlog file, if true return current file offset + IsActive(uuid, filename string) (bool, int64) } // FileWriter implements Writer interface. @@ -64,23 +66,25 @@ type FileWriter struct { // it will be created/started until needed. out *BinlogWriter - relayDir string // this dir contains the UUID + relayDir string // base directory of relay files, without UUID part + uuid string // with suffix, like 3ccc475b-2343-11e7-be21-6c0b84d59f30.000001 filename atomic.String // current binlog filename logger log.Logger } // NewFileWriter creates a FileWriter instances. -func NewFileWriter(logger log.Logger) Writer { +func NewFileWriter(logger log.Logger, relayDir string) Writer { w := &FileWriter{ - logger: logger.WithFields(zap.String("sub component", "relay writer")), + relayDir: relayDir, + logger: logger.WithFields(zap.String("sub component", "relay writer")), } - w.out = NewBinlogWriter(w.logger) + w.out = NewBinlogWriter(w.logger, relayDir) return w } -func (w *FileWriter) Init(relayDir, filename string) { - w.relayDir = relayDir +func (w *FileWriter) Init(uuid, filename string) { + w.uuid = uuid w.filename.Store(filename) } @@ -126,32 +130,32 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( } // open/create a new binlog file - filename := filepath.Join(w.relayDir, w.filename.Load()) - err = w.out.Open(filename) + fullName := filepath.Join(w.relayDir, w.uuid, w.filename.Load()) + err = w.out.Open(w.uuid, w.filename.Load()) if err != nil { - return WResult{}, terror.Annotatef(err, "start underlying binlog writer for %s", filename) + return WResult{}, terror.Annotatef(err, "start underlying binlog writer for %s", fullName) } w.logger.Info("open underlying binlog writer", zap.Reflect("status", w.out.Status())) // write the binlog file header if not exists - exist, err := checkBinlogHeaderExist(filename) + exist, err := checkBinlogHeaderExist(fullName) if err != nil { - return WResult{}, terror.Annotatef(err, "check binlog file header for %s", filename) + return WResult{}, terror.Annotatef(err, "check binlog file header for %s", fullName) } else if !exist { err = w.out.Write(replication.BinLogFileHeader) if err != nil { - return WResult{}, terror.Annotatef(err, "write binlog file header for %s", filename) + return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) } } // write the FormatDescriptionEvent if not exists one - exist, err = checkFormatDescriptionEventExist(filename) + exist, err = checkFormatDescriptionEventExist(fullName) if err != nil { - return WResult{}, terror.Annotatef(err, "check FormatDescriptionEvent for %s", filename) + return WResult{}, terror.Annotatef(err, "check FormatDescriptionEvent for %s", fullName) } else if !exist { err = w.out.Write(ev.RawData) if err != nil { - return WResult{}, terror.Annotatef(err, "write FormatDescriptionEvent %+v for %s", ev.Header, filename) + return WResult{}, terror.Annotatef(err, "write FormatDescriptionEvent %+v for %s", ev.Header, fullName) } } var reason string @@ -212,7 +216,7 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result WRes err = w.out.Write(ev.RawData) if err != nil { - return result, terror.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.relayDir, currFile)) + return result, terror.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.relayDir, w.uuid, currFile)) } return WResult{ @@ -302,7 +306,7 @@ func (w *FileWriter) handleFileHoleExist(ev *replication.BinlogEvent) (bool, err // handleDuplicateEventsExist tries to handle a potential duplicate event in the binlog file. func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (WResult, error) { - filename := filepath.Join(w.relayDir, w.filename.Load()) + filename := filepath.Join(w.relayDir, w.uuid, w.filename.Load()) duplicate, err := checkIsDuplicateEvent(filename, ev) if err != nil { return WResult{}, terror.Annotatef(err, "check event %+v whether duplicate in %s", ev.Header, filename) @@ -320,3 +324,7 @@ func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (WR IgnoreReason: reason, }, nil } + +func (w *FileWriter) IsActive(uuid, filename string) (bool, int64) { + return w.out.isActive(uuid, filename) +} diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index c6afcdaaa59..d387ba971f7 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -16,6 +16,7 @@ package relay import ( "bytes" "os" + "path" "path/filepath" "time" @@ -35,6 +36,7 @@ type testFileWriterSuite struct{} func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { var ( relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" filename = "test-mysql-bin.000001" header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -45,14 +47,16 @@ func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { ev, _ = event.GenFormatDescriptionEvent(header, latestPos) ) - w := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(path.Join(relayDir, uuid), 0o755), check.IsNil) + + w := NewFileWriter(log.L(), relayDir) c.Assert(w, check.NotNil) // not prepared _, err := w.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") - w.Init(relayDir, filename) + w.Init(uuid, filename) // write event res, err := w.WriteEvent(ev) @@ -65,7 +69,9 @@ func (t *testFileWriterSuite) TestInterfaceMethods(c *check.C) { func (t *testFileWriterSuite) TestRelayDir(c *check.C) { var ( - header = &replication.EventHeader{ + relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" + header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, Flags: 0x01, @@ -76,37 +82,38 @@ func (t *testFileWriterSuite) TestRelayDir(c *check.C) { c.Assert(err, check.IsNil) // not inited - w1 := NewFileWriter(log.L()) + w1 := NewFileWriter(log.L(), relayDir) defer w1.Close() _, err = w1.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") // invalid dir - w2 := NewFileWriter(log.L()) + w2 := NewFileWriter(log.L(), relayDir) defer w2.Close() - w2.Init("invalid\x00path", "bin.000001") + w2.Init("invalid\x00uuid", "bin.000001") _, err = w2.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*invalid argument.*") // valid directory, but no filename specified - tmpRelayDir := c.MkDir() - w3 := NewFileWriter(log.L()) + w3 := NewFileWriter(log.L(), relayDir) defer w3.Close() - w3.Init(tmpRelayDir, "") + w3.Init(uuid, "") _, err = w3.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") // valid directory, but invalid filename - w4 := NewFileWriter(log.L()) + w4 := NewFileWriter(log.L(), relayDir) defer w4.Close() - w4.Init(tmpRelayDir, "test-mysql-bin.666abc") + w4.Init(uuid, "test-mysql-bin.666abc") _, err = w4.WriteEvent(ev) c.Assert(err, check.ErrorMatches, ".*not valid.*") + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + // valid directory, valid filename - w5 := NewFileWriter(log.L()) + w5 := NewFileWriter(log.L(), relayDir) defer w5.Close() - w5.Init(tmpRelayDir, "test-mysql-bin.000001") + w5.Init(uuid, "test-mysql-bin.000001") result, err := w5.WriteEvent(ev) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) @@ -116,6 +123,7 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { var ( relayDir = c.MkDir() filename = "test-mysql-bin.000001" + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), ServerID: 11, @@ -125,11 +133,12 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { ) formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) c.Assert(err, check.IsNil) + c.Assert(os.Mkdir(path.Join(relayDir, uuid), 0o755), check.IsNil) // write FormatDescriptionEvent to empty file - w := NewFileWriter(log.L()) + w := NewFileWriter(log.L(), relayDir) defer w.Close() - w.Init(relayDir, filename) + w.Init(uuid, filename) result, err := w.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) @@ -171,7 +180,7 @@ func (t *testFileWriterSuite) TestFormatDescriptionEvent(c *check.C) { events = append(events, e) return nil } - fullName := filepath.Join(relayDir, filename) + fullName := filepath.Join(relayDir, uuid, filename) err = replication.NewBinlogParser().ParseFile(fullName, 0, onEventFunc) c.Assert(err, check.IsNil) c.Assert(events, check.HasLen, 2) @@ -189,6 +198,7 @@ func (t *testFileWriterSuite) verifyFilenameOffset(c *check.C, w Writer, filenam func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check.C) { var ( relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" filename = "test-mysql-bin.000001" nextFilename = "test-mysql-bin.000002" nextFilePos uint64 = 4 @@ -224,17 +234,18 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check c.Assert(holeRotateEv, check.NotNil) // 1: non-fake RotateEvent before FormatDescriptionEvent, invalid - w1 := NewFileWriter(log.L()) + w1 := NewFileWriter(log.L(), relayDir) defer w1.Close() - w1.Init(relayDir, filename) + w1.Init(uuid, filename) _, err = w1.WriteEvent(rotateEv) c.Assert(err, check.ErrorMatches, ".*file not opened.*") // 2. fake RotateEvent before FormatDescriptionEvent relayDir = c.MkDir() // use a new relay directory - w2 := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + w2 := NewFileWriter(log.L(), relayDir) defer w2.Close() - w2.Init(relayDir, filename) + w2.Init(uuid, filename) result, err := w2.WriteEvent(fakeRotateEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsTrue) // ignore fake RotateEvent @@ -248,8 +259,8 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check t.verifyFilenameOffset(c, w2, nextFilename, fileSize) // filename should be empty, next file should contain only one FormatDescriptionEvent - filename1 := filepath.Join(relayDir, filename) - filename2 := filepath.Join(relayDir, nextFilename) + filename1 := filepath.Join(relayDir, uuid, filename) + filename2 := filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename1) c.Assert(os.IsNotExist(err), check.IsTrue) data, err := os.ReadFile(filename2) @@ -260,9 +271,10 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check // 3. FormatDescriptionEvent before fake RotateEvent relayDir = c.MkDir() // use a new relay directory - w3 := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + w3 := NewFileWriter(log.L(), relayDir) defer w3.Close() - w3.Init(relayDir, filename) + w3.Init(uuid, filename) result, err = w3.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result, check.NotNil) @@ -277,8 +289,8 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check t.verifyFilenameOffset(c, w3, nextFilename, fileSize) // filename should contain only one FormatDescriptionEvent, next file should be empty - filename1 = filepath.Join(relayDir, filename) - filename2 = filepath.Join(relayDir, nextFilename) + filename1 = filepath.Join(relayDir, uuid, filename) + filename2 = filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) data, err = os.ReadFile(filename1) @@ -288,9 +300,10 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check // 4. FormatDescriptionEvent before non-fake RotateEvent relayDir = c.MkDir() // use a new relay directory - w4 := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + w4 := NewFileWriter(log.L(), relayDir) defer w4.Close() - w4.Init(relayDir, filename) + w4.Init(uuid, filename) result, err = w4.WriteEvent(formatDescEv) c.Assert(err, check.IsNil) c.Assert(result, check.NotNil) @@ -312,8 +325,8 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check c.Assert(err, check.ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*") // filename should contain both one FormatDescriptionEvent and one RotateEvent, next file should be empty - filename1 = filepath.Join(relayDir, filename) - filename2 = filepath.Join(relayDir, nextFilename) + filename1 = filepath.Join(relayDir, uuid, filename) + filename2 = filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) data, err = os.ReadFile(filename1) @@ -333,6 +346,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { latestXID uint64 = 10 relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" filename = "test-mysql-bin.000001" ) previousGTIDSet, err := gtid.ParserGTID(flavor, previousGTIDSetStr) @@ -375,9 +389,11 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { allEvents = append(allEvents, events...) allData.Write(data) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + // write the events to the file - w := NewFileWriter(log.L()) - w.Init(relayDir, filename) + w := NewFileWriter(log.L(), relayDir) + w.Init(uuid, filename) for _, ev := range allEvents { result, err2 := w.WriteEvent(ev) c.Assert(err2, check.IsNil) @@ -387,7 +403,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { t.verifyFilenameOffset(c, w, filename, int64(allData.Len())) // read the data back from the file - fullName := filepath.Join(relayDir, filename) + fullName := filepath.Join(relayDir, uuid, filename) obtainData, err := os.ReadFile(fullName) c.Assert(err, check.IsNil) c.Assert(obtainData, check.DeepEquals, allData.Bytes()) @@ -396,6 +412,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { var ( relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" filename = "test-mysql-bin.000001" header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -407,9 +424,11 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { c.Assert(err, check.IsNil) c.Assert(formatDescEv, check.NotNil) - w := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + + w := NewFileWriter(log.L(), relayDir) defer w.Close() - w.Init(relayDir, filename) + w.Init(uuid, filename) // write the FormatDescriptionEvent, no hole exists result, err := w.WriteEvent(formatDescEv) @@ -444,7 +463,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { events = append(events, e) return nil } - fullName := filepath.Join(relayDir, filename) + fullName := filepath.Join(relayDir, uuid, filename) err = replication.NewBinlogParser().ParseFile(fullName, 0, onEventFunc) c.Assert(err, check.IsNil) c.Assert(events, check.HasLen, 3) @@ -462,6 +481,7 @@ func (t *testFileWriterSuite) TestHandleDuplicateEventsExist(c *check.C) { var ( relayDir = c.MkDir() + uuid = "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" filename = "test-mysql-bin.000001" header = &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), @@ -469,9 +489,10 @@ func (t *testFileWriterSuite) TestHandleDuplicateEventsExist(c *check.C) { } latestPos uint32 = 4 ) - w := NewFileWriter(log.L()) + c.Assert(os.MkdirAll(filepath.Join(relayDir, uuid), 0o755), check.IsNil) + w := NewFileWriter(log.L(), relayDir) defer w.Close() - w.Init(relayDir, filename) + w.Init(uuid, filename) // write a FormatDescriptionEvent, not duplicate formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) diff --git a/dm/relay/streamer.go b/dm/relay/streamer.go index 891dbda1d3f..ed78feadce4 100644 --- a/dm/relay/streamer.go +++ b/dm/relay/streamer.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog/event" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" + "github.com/pingcap/ticdc/dm/pkg/utils" "github.com/go-mysql-org/go-mysql/replication" "github.com/pingcap/failpoint" @@ -34,9 +35,10 @@ var heartbeatInterval = common.MasterHeartbeatPeriod // LocalStreamer reads and parses binlog events from local binlog file. type LocalStreamer struct { - ch chan *replication.BinlogEvent - ech chan error - err error + ch chan *replication.BinlogEvent + ech chan error + heatBeatTimer *time.Timer + err error } // GetEvent gets the binlog event one by one, it will block until parser occurs some errors. @@ -57,8 +59,18 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, heartbeatInterval = time.Duration(i) * time.Second }) + fired := false + s.heatBeatTimer.Reset(heartbeatInterval) + defer func() { + if !fired { + if !s.heatBeatTimer.Stop() { + <-s.heatBeatTimer.C + } + } + }() select { - case <-time.After(heartbeatInterval): + case <-s.heatBeatTimer.C: + fired = true // MySQL will send heartbeat event 30s by default heartbeatHeader := &replication.EventHeader{} return event.GenHeartbeatEvent(heartbeatHeader), nil @@ -97,6 +109,7 @@ func newLocalStreamer() *LocalStreamer { s.ch = make(chan *replication.BinlogEvent, 10240) s.ech = make(chan error, 4) + s.heatBeatTimer = utils.NewStoppedTimer() return s } diff --git a/dm/relay/upstream_reader.go b/dm/relay/upstream_reader.go index 240cd26ca11..528d28e1112 100644 --- a/dm/relay/upstream_reader.go +++ b/dm/relay/upstream_reader.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/ticdc/dm/pkg/binlog/common" - br "github.com/pingcap/ticdc/dm/pkg/binlog/reader" + "github.com/pingcap/ticdc/dm/pkg/binlog/reader" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" @@ -67,7 +67,7 @@ type upstreamReader struct { mu sync.RWMutex stage common.Stage - in br.Reader // the underlying reader used to read binlog events. + in reader.Reader // the underlying reader used to read binlog events. out chan *replication.BinlogEvent logger log.Logger @@ -77,7 +77,7 @@ type upstreamReader struct { func NewUpstreamReader(cfg *RConfig) Reader { return &upstreamReader{ cfg: cfg, - in: br.NewTCPReader(cfg.SyncConfig), + in: reader.NewTCPReader(cfg.SyncConfig), out: make(chan *replication.BinlogEvent), logger: log.With(zap.String("component", "relay reader")), } @@ -133,19 +133,12 @@ func (r *upstreamReader) GetEvent(ctx context.Context) (RResult, error) { return result, terror.ErrRelayReaderNeedStart.Generate(r.stage, common.StagePrepared) } - for { - ctx2, cancel2 := context.WithTimeout(ctx, common.SlaveReadTimeout) - ev, err := r.in.GetEvent(ctx2) - cancel2() - - if err == nil { - result.Event = ev - } else if isRetryableError(err) { - r.logger.Info("get retryable error when reading binlog event", log.ShortError(err)) - continue - } - return result, err + ev, err := r.in.GetEvent(ctx) + + if err == nil { + result.Event = ev } + return result, err } func (r *upstreamReader) setUpReaderByGTID() error { diff --git a/dm/relay/upstream_reader_test.go b/dm/relay/upstream_reader_test.go index 3feb2392f18..1485db05ad3 100644 --- a/dm/relay/upstream_reader_test.go +++ b/dm/relay/upstream_reader_test.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" - br "github.com/pingcap/ticdc/dm/pkg/binlog/reader" + "github.com/pingcap/ticdc/dm/pkg/binlog/reader" ) var _ = check.Suite(&testRemoteReaderSuite{}) @@ -56,7 +56,7 @@ func (t *testRemoteReaderSuite) testInterfaceWithReader(c *check.C, r Reader, ca // replace underlying reader with a mock reader for testing concreteR := r.(*upstreamReader) c.Assert(concreteR, check.NotNil) - mockR := br.NewMockReader() + mockR := reader.NewMockReader() concreteR.in = mockR // start reader @@ -68,7 +68,7 @@ func (t *testRemoteReaderSuite) testInterfaceWithReader(c *check.C, r Reader, ca // getEvent by pushing event to mock reader ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - concreteMR := mockR.(*br.MockReader) + concreteMR := mockR.(*reader.MockReader) go func() { for _, cs := range cases { c.Assert(concreteMR.PushEvent(ctx, cs), check.IsNil) @@ -109,13 +109,12 @@ func (t *testRemoteReaderSuite) TestGetEventWithError(c *check.C) { // replace underlying reader with a mock reader for testing concreteR := r.(*upstreamReader) c.Assert(concreteR, check.NotNil) - mockR := br.NewMockReader() + mockR := reader.NewMockReader() concreteR.in = mockR errOther := errors.New("other error") in := []error{ context.Canceled, - context.DeadlineExceeded, // retried without return errOther, } expected := []error{ @@ -129,7 +128,7 @@ func (t *testRemoteReaderSuite) TestGetEventWithError(c *check.C) { // getEvent by pushing event to mock reader ctx, cancel := context.WithCancel(context.Background()) defer cancel() - concreteMR := mockR.(*br.MockReader) + concreteMR := mockR.(*reader.MockReader) go func() { for _, cs := range in { c.Assert(concreteMR.PushError(ctx, cs), check.IsNil) diff --git a/go.mod b/go.mod index 227afad2b8c..b4cdd5df0ed 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/frankban/quicktest v1.11.1 // indirect github.com/getkin/kin-openapi v0.80.0 github.com/gin-gonic/gin v1.7.4 - github.com/go-mysql-org/go-mysql v1.1.3-0.20210705101833-83965e516929 + github.com/go-mysql-org/go-mysql v1.1.3-0.20211111092245-d7f36836d6b7 github.com/go-sql-driver/mysql v1.6.0 github.com/gogo/gateway v1.1.0 github.com/gogo/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 01dd4640f42..9cebcb64c37 100644 --- a/go.sum +++ b/go.sum @@ -315,8 +315,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= -github.com/go-mysql-org/go-mysql v1.1.3-0.20210705101833-83965e516929 h1:cjv3hcFlmma66+fYTvhHt/sbwZWWJs09iv2ipVRIr0I= -github.com/go-mysql-org/go-mysql v1.1.3-0.20210705101833-83965e516929/go.mod h1:3lFZKf7l95Qo70+3XB2WpiSf9wu2s3na3geLMaIIrqQ= +github.com/go-mysql-org/go-mysql v1.1.3-0.20211111092245-d7f36836d6b7 h1:7e1SUamaZct05YYDhshX7Mi95F79O9U+O8CbCJL5Cx8= +github.com/go-mysql-org/go-mysql v1.1.3-0.20211111092245-d7f36836d6b7/go.mod h1:3lFZKf7l95Qo70+3XB2WpiSf9wu2s3na3geLMaIIrqQ= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=