diff --git a/dm/dm/worker/relay.go b/dm/dm/worker/relay.go index a4ff098ecad..b0ccc934293 100644 --- a/dm/dm/worker/relay.go +++ b/dm/dm/worker/relay.go @@ -28,13 +28,12 @@ import ( "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/relay" - "github.com/pingcap/ticdc/dm/relay/purger" ) // RelayHolder for relay unit. type RelayHolder interface { // Init initializes the holder - Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) + Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) // Start starts run the relay Start() // Close closes the holder @@ -94,11 +93,11 @@ func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder { } // Init initializes the holder. -func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) { +func (h *realRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) { h.closed.Store(false) // initial relay purger - operators := []purger.RelayOperator{ + operators := []relay.Operator{ h, streamer.GetReaderHub(), } @@ -107,7 +106,7 @@ func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeI return nil, terror.Annotate(err, "initial relay unit") } - return purger.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil + return relay.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil } // Start starts run the relay. @@ -306,7 +305,7 @@ func (h *realRelayHolder) Update(ctx context.Context, sourceCfg *config.SourceCo return nil } -// EarliestActiveRelayLog implements RelayOperator.EarliestActiveRelayLog. +// EarliestActiveRelayLog implements Operator.EarliestActiveRelayLog. func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo { return h.relay.ActiveRelayLog() } @@ -355,13 +354,13 @@ func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder { } // Init implements interface of RelayHolder. -func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) { +func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) { // initial relay purger - operators := []purger.RelayOperator{ + operators := []relay.Operator{ d, } - return purger.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError + return relay.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError } // Start implements interface of RelayHolder. diff --git a/dm/dm/worker/relay_test.go b/dm/dm/worker/relay_test.go index 6c447c40048..bbf12856de6 100644 --- a/dm/dm/worker/relay_test.go +++ b/dm/dm/worker/relay_test.go @@ -29,7 +29,6 @@ import ( pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/utils" "github.com/pingcap/ticdc/dm/relay" - "github.com/pingcap/ticdc/dm/relay/purger" ) type testRelay struct{} @@ -139,11 +138,11 @@ func (d *DummyRelay) PurgeRelayDir() error { func (t *testRelay) TestRelay(c *C) { originNewRelay := relay.NewRelay relay.NewRelay = NewDummyRelay - originNewPurger := purger.NewPurger - purger.NewPurger = purger.NewDummyPurger + originNewPurger := relay.NewPurger + relay.NewPurger = relay.NewDummyPurger defer func() { relay.NewRelay = originNewRelay - purger.NewPurger = originNewPurger + relay.NewPurger = originNewPurger }() cfg := loadSourceConfigWithoutPassword(c) diff --git a/dm/dm/worker/server_test.go b/dm/dm/worker/server_test.go index 20d620c8e3f..b9470f34634 100644 --- a/dm/dm/worker/server_test.go +++ b/dm/dm/worker/server_test.go @@ -38,9 +38,9 @@ import ( "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/ha" "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" ) // do not forget to update this path if the file removed/renamed. @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 9ff7097830d..d2fdf80aaf2 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -37,7 +37,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/purger" + "github.com/pingcap/ticdc/dm/relay" ) // SourceWorker manages a source(upstream) which is mainly related to subtasks and relay. @@ -76,7 +76,7 @@ type SourceWorker struct { relayCancel context.CancelFunc relayWg sync.WaitGroup relayHolder RelayHolder - relayPurger purger.Purger + relayPurger relay.Purger taskStatusChecker TaskStatusChecker @@ -321,7 +321,7 @@ func (w *SourceWorker) EnableRelay() (err error) { // 2. initial relay holder, the cfg's password need decrypt w.relayHolder = NewRelayHolder(w.cfg) - relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{ + relayPurger, err := w.relayHolder.Init(w.relayCtx, []relay.PurgeInterceptor{ w, }) if err != nil { diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 8fff70dfcca..7cf46630398 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -32,8 +32,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/conn" "github.com/pingcap/ticdc/dm/pkg/ha" "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" ) var emptyWorkerStatusInfoJSONLength = 25 @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{}) func (t *testWorkerFunctionalities) SetUpSuite(c *C) { NewRelayHolder = NewDummyRelayHolder NewSubTask = NewRealSubTask - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { atomic.AddInt32(&t.createUnitCount, 1) mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) { cfg.UseRelay = false return NewRealSubTask(cfg, etcdClient, worker) } - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) mockSync := NewMockUnit(pb.UnitType_Sync) diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index c8205cfcfc7..aa7cbeba221 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -34,9 +34,9 @@ import ( "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/shardddl/pessimism" - "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" "github.com/pingcap/ticdc/dm/syncer" ) @@ -60,7 +60,7 @@ func (r relayNotifier) Notified() chan interface{} { var createUnits = createRealUnits // createRealUnits creates process units base on task mode. -func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit { +func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit { failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) { log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly")) failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)}) @@ -120,7 +120,7 @@ type SubTask struct { workerName string - notifier streamer.EventNotifier + notifier relay.EventNotifier } // NewSubTask is subtask initializer diff --git a/dm/dm/worker/subtask_test.go b/dm/dm/worker/subtask_test.go index 06b77b2a9eb..111ab901c1f 100644 --- a/dm/dm/worker/subtask_test.go +++ b/dm/dm/worker/subtask_test.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/ticdc/dm/dumpling" "github.com/pingcap/ticdc/dm/loader" "github.com/pingcap/ticdc/dm/pkg/binlog" - "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" "github.com/pingcap/ticdc/dm/syncer" . "github.com/pingcap/check" @@ -177,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { return nil } st.Run(pb.Stage_Running) @@ -186,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) { mockDumper := NewMockUnit(pb.UnitType_Dump) mockLoader := NewMockUnit(pb.UnitType_Load) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -297,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -421,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { defer func() { createUnits = createRealUnits }() - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } @@ -446,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) { st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker") c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished) - createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit { + createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit { return []unit.Unit{mockDumper, mockLoader} } diff --git a/dm/pkg/binlog/writer/writer.go b/dm/pkg/binlog/writer/writer.go deleted file mode 100644 index bd9086fc0df..00000000000 --- a/dm/pkg/binlog/writer/writer.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -// Writer is a binlog event writer, it may write binlog events to a binlog file, an in-memory structure or a TCP stream. -type Writer interface { - // Start prepares the writer for writing binlog events. - Start() error - - // Close closes the writer and release the resource. - Close() error - - // Write writes/appends a binlog event's rawData. - Write(rawData []byte) error - - // Flush flushes the buffered data to a stable storage or sends through the network. - Flush() error - - // Status returns the status of the writer. - Status() interface{} -} diff --git a/dm/pkg/streamer/util.go b/dm/pkg/streamer/util.go deleted file mode 100644 index bdd29073e19..00000000000 --- a/dm/pkg/streamer/util.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package streamer - -import ( - "io" - "strings" - - "github.com/pingcap/errors" - - "github.com/pingcap/ticdc/dm/pkg/terror" - "github.com/pingcap/ticdc/dm/pkg/utils" -) - -// getNextUUID gets (the nextUUID and its suffix) after the current UUID. -func getNextUUID(currUUID string, uuids []string) (string, string, error) { - for i := len(uuids) - 2; i >= 0; i-- { - if uuids[i] == currUUID { - nextUUID := uuids[i+1] - _, suffixInt, err := utils.ParseSuffixForUUID(nextUUID) - if err != nil { - return "", "", terror.Annotatef(err, "UUID %s", nextUUID) - } - return nextUUID, utils.SuffixIntToStr(suffixInt), nil - } - } - return "", "", nil -} - -// isIgnorableParseError checks whether is a ignorable error for `BinlogParser.ParseFile`. -func isIgnorableParseError(err error) bool { - if err == nil { - return false - } - - if strings.Contains(err.Error(), "err EOF") { - // NOTE: go-mysql returned err not includes caused err, but as message, ref: parser.go `parseSingleEvent` - return true - } else if errors.Cause(err) == io.EOF { - return true - } - - return false -} diff --git a/dm/pkg/streamer/util_test.go b/dm/pkg/streamer/util_test.go deleted file mode 100644 index f82cfe5a727..00000000000 --- a/dm/pkg/streamer/util_test.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package streamer - -import ( - "io" - "testing" - - . "github.com/pingcap/check" - "github.com/pingcap/errors" -) - -var _ = Suite(&testUtilSuite{}) - -func TestSuite(t *testing.T) { - TestingT(t) -} - -type testUtilSuite struct{} - -func (t *testUtilSuite) TestGetNextUUID(c *C) { - UUIDs := []string{ - "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", - "7acfedb5-3008-4fa2-9776-6bac42b025fe.000002", - "92ffd03b-813e-4391-b16a-177524e8d531.000003", - "338513ce-b24e-4ff8-9ded-9ac5aa8f4d74.000004", - } - cases := []struct { - currUUID string - UUIDs []string - nextUUID string - nextUUIDSuffix string - errMsgReg string - }{ - { - // empty current and UUID list - }, - { - // non-empty current UUID, but empty UUID list - currUUID: "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", - }, - { - // empty current UUID, but non-empty UUID list - UUIDs: UUIDs, - }, - { - // current UUID in UUID list, has next UUID - currUUID: UUIDs[0], - UUIDs: UUIDs, - nextUUID: UUIDs[1], - nextUUIDSuffix: UUIDs[1][len(UUIDs[1])-6:], - }, - { - // current UUID in UUID list, but has no next UUID - currUUID: UUIDs[len(UUIDs)-1], - UUIDs: UUIDs, - }, - { - // current UUID not in UUID list - currUUID: "40ed16c1-f6f7-4012-aa9b-d360261d2b22.666666", - UUIDs: UUIDs, - }, - { - // invalid next UUID in UUID list - currUUID: UUIDs[len(UUIDs)-1], - UUIDs: append(UUIDs, "invalid-uuid"), - errMsgReg: ".*invalid-uuid.*", - }, - } - - for _, cs := range cases { - nu, nus, err := getNextUUID(cs.currUUID, cs.UUIDs) - if len(cs.errMsgReg) > 0 { - c.Assert(err, ErrorMatches, cs.errMsgReg) - } else { - c.Assert(err, IsNil) - } - c.Assert(nu, Equals, cs.nextUUID) - c.Assert(nus, Equals, cs.nextUUIDSuffix) - } -} - -func (t *testUtilSuite) TestIsIgnorableParseError(c *C) { - cases := []struct { - err error - ignorable bool - }{ - { - err: nil, - ignorable: false, - }, - { - err: io.EOF, - ignorable: true, - }, - { - err: errors.Annotate(io.EOF, "annotated end of file"), - ignorable: true, - }, - { - err: errors.New("get event header err EOF xxxx"), - ignorable: true, - }, - { - err: errors.New("some other error"), - ignorable: false, - }, - } - - for _, cs := range cases { - c.Assert(isIgnorableParseError(cs.err), Equals, cs.ignorable) - } -} diff --git a/dm/pkg/binlog/writer/file.go b/dm/relay/binlog_writer.go similarity index 80% rename from dm/pkg/binlog/writer/file.go rename to dm/relay/binlog_writer.go index 8295e89d223..2a98638532a 100644 --- a/dm/pkg/binlog/writer/file.go +++ b/dm/relay/binlog_writer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "encoding/json" @@ -27,9 +27,9 @@ import ( "github.com/pingcap/ticdc/dm/pkg/terror" ) -// FileWriter is a binlog event writer which writes binlog events to a file. -type FileWriter struct { - cfg *FileWriterConfig +// BinlogWriter is a binlog event writer which writes binlog events to a file. +type BinlogWriter struct { + cfg *BinlogWriterConfig mu sync.RWMutex stage common.Stage @@ -40,15 +40,15 @@ type FileWriter struct { logger log.Logger } -// FileWriterStatus represents the status of a FileWriter. -type FileWriterStatus struct { +// BinlogWriterStatus represents the status of a BinlogWriter. +type BinlogWriterStatus struct { Stage string `json:"stage"` Filename string `json:"filename"` Offset int64 `json:"offset"` } // String implements Stringer.String. -func (s *FileWriterStatus) String() string { +func (s *BinlogWriterStatus) String() string { data, err := json.Marshal(s) if err != nil { // do not use %v/%+v for `s`, it will call this `String` recursively @@ -57,21 +57,21 @@ func (s *FileWriterStatus) String() string { return string(data) } -// FileWriterConfig is the configuration used by a FileWriter. -type FileWriterConfig struct { +// BinlogWriterConfig is the configuration used by a BinlogWriter. +type BinlogWriterConfig struct { Filename string } -// NewFileWriter creates a FileWriter instance. -func NewFileWriter(logger log.Logger, cfg *FileWriterConfig) Writer { - return &FileWriter{ +// NewBinlogWriter creates a BinlogWriter instance. +func NewBinlogWriter(logger log.Logger, cfg *BinlogWriterConfig) *BinlogWriter { + return &BinlogWriter{ cfg: cfg, logger: logger, } } // Start implements Writer.Start. -func (w *FileWriter) Start() error { +func (w *BinlogWriter) Start() error { w.mu.Lock() defer w.mu.Unlock() @@ -99,7 +99,7 @@ func (w *FileWriter) Start() error { } // Close implements Writer.Close. -func (w *FileWriter) Close() error { +func (w *BinlogWriter) Close() error { w.mu.Lock() defer w.mu.Unlock() @@ -122,7 +122,7 @@ func (w *FileWriter) Close() error { } // Write implements Writer.Write. -func (w *FileWriter) Write(rawData []byte) error { +func (w *BinlogWriter) Write(rawData []byte) error { w.mu.RLock() defer w.mu.RUnlock() @@ -137,7 +137,7 @@ func (w *FileWriter) Write(rawData []byte) error { } // Flush implements Writer.Flush. -func (w *FileWriter) Flush() error { +func (w *BinlogWriter) Flush() error { w.mu.RLock() defer w.mu.RUnlock() @@ -149,12 +149,12 @@ func (w *FileWriter) Flush() error { } // Status implements Writer.Status. -func (w *FileWriter) Status() interface{} { +func (w *BinlogWriter) Status() interface{} { w.mu.RLock() stage := w.stage w.mu.RUnlock() - return &FileWriterStatus{ + return &BinlogWriterStatus{ Stage: stage.String(), Filename: w.cfg.Filename, Offset: w.offset.Load(), @@ -162,7 +162,7 @@ func (w *FileWriter) Status() interface{} { } // flush flushes the buffered data to the disk. -func (w *FileWriter) flush() error { +func (w *BinlogWriter) flush() error { if w.file == nil { return terror.ErrBinlogWriterFileNotOpened.Generate(w.cfg.Filename) } diff --git a/dm/pkg/binlog/writer/file_test.go b/dm/relay/binlog_writer_test.go similarity index 87% rename from dm/pkg/binlog/writer/file_test.go rename to dm/relay/binlog_writer_test.go index 3b8ba49fb23..2d6bb0cbfd7 100644 --- a/dm/pkg/binlog/writer/file_test.go +++ b/dm/relay/binlog_writer_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "bytes" @@ -27,30 +27,30 @@ import ( "github.com/pingcap/ticdc/dm/pkg/log" ) -func TestSuite(t *testing.T) { +func TestBinlogWriterSuite(t *testing.T) { TestingT(t) } -var _ = Suite(&testFileWriterSuite{}) +var _ = Suite(&testBinlogWriterSuite{}) -type testFileWriterSuite struct{} +type testBinlogWriterSuite struct{} -func (t *testFileWriterSuite) TestWrite(c *C) { +func (t *testBinlogWriterSuite) TestWrite(c *C) { dir := c.MkDir() filename := filepath.Join(dir, "test-mysql-bin.000001") var ( - cfg = &FileWriterConfig{ + cfg = &BinlogWriterConfig{ Filename: filename, } allData bytes.Buffer ) - w := NewFileWriter(log.L(), cfg) + w := NewBinlogWriter(log.L(), cfg) c.Assert(w, NotNil) // check status, stageNew status := w.Status() - fwStatus, ok := status.(*FileWriterStatus) + fwStatus, ok := status.(*BinlogWriterStatus) c.Assert(ok, IsTrue) c.Assert(fwStatus.Stage, Equals, common.StageNew.String()) c.Assert(fwStatus.Filename, Equals, filename) @@ -71,7 +71,7 @@ func (t *testFileWriterSuite) TestWrite(c *C) { // check status, stagePrepared status = w.Status() - fwStatus, ok = status.(*FileWriterStatus) + fwStatus, ok = status.(*BinlogWriterStatus) c.Assert(ok, IsTrue) c.Assert(fwStatus.Stage, Equals, common.StagePrepared.String()) c.Assert(fwStatus.Filename, Equals, filename) @@ -101,7 +101,7 @@ func (t *testFileWriterSuite) TestWrite(c *C) { // check status, stageClosed status = w.Status() - fwStatus, ok = status.(*FileWriterStatus) + fwStatus, ok = status.(*BinlogWriterStatus) c.Assert(ok, IsTrue) c.Assert(fwStatus.Stage, Equals, common.StageClosed.String()) c.Assert(fwStatus.Filename, Equals, filename) diff --git a/dm/relay/config.go b/dm/relay/config.go index 5efac22e0ad..aa75d8ef527 100644 --- a/dm/relay/config.go +++ b/dm/relay/config.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/ticdc/dm/dm/config" "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/relay/retry" ) // Config is the configuration for Relay. @@ -38,7 +37,7 @@ type Config struct { UUIDSuffix int `toml:"-" json:"-"` // for binlog reader retry - ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` + ReaderRetry ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` } func (c *Config) String() string { @@ -63,7 +62,7 @@ func FromSourceCfg(sourceCfg *config.SourceConfig) *Config { BinLogName: clone.RelayBinLogName, BinlogGTID: clone.RelayBinlogGTID, UUIDSuffix: clone.UUIDSuffix, - ReaderRetry: retry.ReaderRetryConfig{ // we use config from TaskChecker now + ReaderRetry: ReaderRetryConfig{ // we use config from TaskChecker now BackoffRollback: clone.Checker.BackoffRollback.Duration, BackoffMax: clone.Checker.BackoffMax.Duration, BackoffMin: clone.Checker.BackoffMin.Duration, diff --git a/dm/relay/reader/error.go b/dm/relay/error.go similarity index 98% rename from dm/relay/reader/error.go rename to dm/relay/error.go index 0587a30a961..e5284e33c44 100644 --- a/dm/relay/reader/error.go +++ b/dm/relay/error.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package reader +package relay import ( "context" diff --git a/dm/relay/reader/error_test.go b/dm/relay/error_test.go similarity index 98% rename from dm/relay/reader/error_test.go rename to dm/relay/error_test.go index 38200a22bfc..e2ab04d2046 100644 --- a/dm/relay/reader/error_test.go +++ b/dm/relay/error_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package reader +package relay import ( "context" diff --git a/dm/pkg/streamer/file.go b/dm/relay/file.go similarity index 99% rename from dm/pkg/streamer/file.go rename to dm/relay/file.go index 8a426075efd..0317b2f2fac 100644 --- a/dm/pkg/streamer/file.go +++ b/dm/relay/file.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "context" @@ -219,7 +219,7 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u // binlog file may have rotated if we read nothing last time(either it's the first read or after notified) lastReadCnt := r.endOffset - r.beginOffset if lastReadCnt == 0 { - meta := &Meta{} + meta := &LocalMeta{} _, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta) if err != nil { errCh <- terror.Annotate(err, "decode relay meta toml file failed") diff --git a/dm/pkg/streamer/file_test.go b/dm/relay/file_test.go similarity index 99% rename from dm/pkg/streamer/file_test.go rename to dm/relay/file_test.go index f02a89043ae..22c5abd6acf 100644 --- a/dm/pkg/streamer/file_test.go +++ b/dm/relay/file_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "bytes" @@ -358,10 +358,10 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) { } rotateRelayFile := func(filename string) { - meta := Meta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} + meta := LocalMeta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID} metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename)) c.Assert(err2, IsNil) - err = toml.NewEncoder(metaFile).Encode(meta) + err = toml.NewEncoder(metaFile).Encode(&meta) c.Assert(err, IsNil) _ = metaFile.Close() } diff --git a/dm/relay/writer/file_util.go b/dm/relay/file_util.go similarity index 99% rename from dm/relay/writer/file_util.go rename to dm/relay/file_util.go index c8cf5a01d2d..a1190b99d30 100644 --- a/dm/relay/writer/file_util.go +++ b/dm/relay/file_util.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "bytes" diff --git a/dm/relay/writer/file_util_test.go b/dm/relay/file_util_test.go similarity index 90% rename from dm/relay/writer/file_util_test.go rename to dm/relay/file_util_test.go index 7b1d158a827..8f7584dbb4d 100644 --- a/dm/relay/writer/file_util_test.go +++ b/dm/relay/file_util_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "bytes" @@ -402,72 +402,6 @@ func (t *testFileUtilSuite) testGetTxnPosGTIDs(c *check.C, filename, flavor, pre c.Assert(gSet, check.DeepEquals, expectedGTIDs) } -// genBinlogEventsWithGTIDs generates some binlog events used by testFileUtilSuite and testFileWriterSuite. -// now, its generated events including 3 DDL and 10 DML. -// nolint:unparam -func genBinlogEventsWithGTIDs(c *check.C, flavor string, previousGTIDSet, latestGTID1, latestGTID2 gtid.Set) (*event.Generator, []*replication.BinlogEvent, []byte) { - var ( - serverID uint32 = 11 - latestPos uint32 - latestXID uint64 = 10 - - allEvents = make([]*replication.BinlogEvent, 0, 50) - allData bytes.Buffer - ) - - // use a binlog event generator to generate some binlog events. - g, err := event.NewGenerator(flavor, serverID, latestPos, latestGTID1, previousGTIDSet, latestXID) - c.Assert(err, check.IsNil) - - // file header with FormatDescriptionEvent and PreviousGTIDsEvent - events, data, err := g.GenFileHeader() - c.Assert(err, check.IsNil) - allEvents = append(allEvents, events...) - allData.Write(data) - - // CREATE DATABASE/TABLE, 3 DDL - queries := []string{ - "CREATE DATABASE `db`", - "CREATE TABLE `db`.`tbl1` (c1 INT)", - "CREATE TABLE `db`.`tbl2` (c1 INT)", - } - for _, query := range queries { - events, data, err = g.GenDDLEvents("db", query) - c.Assert(err, check.IsNil) - allEvents = append(allEvents, events...) - allData.Write(data) - } - - // DMLs, 10 DML - g.LatestGTID = latestGTID2 // use another latest GTID with different SID/DomainID - var ( - tableID uint64 = 8 - columnType = []byte{gmysql.MYSQL_TYPE_LONG} - eventType = replication.WRITE_ROWS_EVENTv2 - schema = "db" - table = "tbl1" - ) - for i := 0; i < 10; i++ { - insertRows := make([][]interface{}, 0, 1) - insertRows = append(insertRows, []interface{}{int32(i)}) - dmlData := []*event.DMLData{ - { - TableID: tableID, - Schema: schema, - Table: table, - ColumnType: columnType, - Rows: insertRows, - }, - } - events, data, err = g.GenDMLEvents(eventType, dmlData) - c.Assert(err, check.IsNil) - allEvents = append(allEvents, events...) - allData.Write(data) - } - - return g, allEvents, allData.Bytes() -} - func (t *testFileUtilSuite) TestGetTxnPosGTIDsNoGTID(c *check.C) { // generate some events but without GTID enabled var ( diff --git a/dm/pkg/streamer/reader.go b/dm/relay/local_reader.go similarity index 97% rename from dm/pkg/streamer/reader.go rename to dm/relay/local_reader.go index 4d01ea0cd8c..ad05656e4b5 100644 --- a/dm/pkg/streamer/reader.go +++ b/dm/relay/local_reader.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "context" @@ -43,14 +43,6 @@ import ( // this is mainly happened when upstream master changed when relay log not finish reading a transaction. var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated") -// Meta represents binlog meta information in relay.meta. -type Meta struct { - BinLogName string `toml:"binlog-name" json:"binlog-name"` - BinLogPos uint32 `toml:"binlog-pos" json:"binlog-pos"` - BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` - UUID string `toml:"-" json:"-"` -} - // polling interval for watcher. var watcherInterval = 100 * time.Millisecond @@ -215,7 +207,7 @@ func (r *BinlogReader) getPosByGTID(gset mysql.GTIDSet) (*mysql.Position, error) // StartSyncByPos start sync by pos // TODO: thread-safe? -func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) { +func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (reader.Streamer, error) { if pos.Name == "" { return nil, terror.ErrBinlogFileNotSpecified.Generate() } @@ -255,7 +247,7 @@ func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (Streamer, error) { } // StartSyncByGTID start sync by gtid. -func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (Streamer, error) { +func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (reader.Streamer, error) { r.tctx.L().Info("begin to sync binlog", zap.Stringer("GTID Set", gset)) r.usingGTID = true diff --git a/dm/pkg/streamer/reader_test.go b/dm/relay/local_reader_test.go similarity index 98% rename from dm/pkg/streamer/reader_test.go rename to dm/relay/local_reader_test.go index 5e7942ac509..f77d96b02b4 100644 --- a/dm/pkg/streamer/reader_test.go +++ b/dm/relay/local_reader_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "bytes" @@ -37,6 +37,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/ticdc/dm/pkg/binlog/event" + "github.com/pingcap/ticdc/dm/pkg/binlog/reader" "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" @@ -61,11 +62,11 @@ func (t *testReaderSuite) SetUpSuite(c *C) { t.lastPos = 0 t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0") c.Assert(err, IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval", "return(10000)"), IsNil) } func (t *testReaderSuite) TearDownSuite(c *C) { - c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval"), IsNil) } func (t *testReaderSuite) TestParseFileBase(c *C) { @@ -542,7 +543,7 @@ func (t *testReaderSuite) TestStartSyncByPos(c *C) { r.Close() } -func readNEvents(ctx context.Context, c *C, s Streamer, l int) []*replication.BinlogEvent { +func readNEvents(ctx context.Context, c *C, s reader.Streamer, l int) []*replication.BinlogEvent { var result []*replication.BinlogEvent for { ev, err2 := s.GetEvent(ctx) @@ -957,10 +958,10 @@ func (t *testReaderSuite) TestReParseUsingGTID(c *C) { _, err = f.Write(replication.BinLogFileHeader) c.Assert(err, IsNil) - meta := Meta{BinLogName: file, BinLogPos: latestPos, BinlogGTID: startGTID.String()} + meta := LocalMeta{BinLogName: file, BinLogPos: latestPos, BinlogGTID: startGTID.String()} metaFile, err := os.Create(path.Join(uuidDir, utils.MetaFilename)) c.Assert(err, IsNil) - c.Assert(toml.NewEncoder(metaFile).Encode(meta), IsNil) + c.Assert(toml.NewEncoder(metaFile).Encode(&meta), IsNil) c.Assert(metaFile.Close(), IsNil) // prepare some regular events, @@ -1125,7 +1126,7 @@ func (t *testReaderSuite) genEvents(c *C, eventTypes []replication.EventType, la return events, latestPos, latestGTID, pGset } -func (t *testReaderSuite) purgeStreamer(c *C, s Streamer) { +func (t *testReaderSuite) purgeStreamer(c *C, s reader.Streamer) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() @@ -1142,7 +1143,7 @@ func (t *testReaderSuite) purgeStreamer(c *C, s Streamer) { } } -func (t *testReaderSuite) verifyNoEventsInStreamer(c *C, s Streamer) { +func (t *testReaderSuite) verifyNoEventsInStreamer(c *C, s reader.Streamer) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() @@ -1181,10 +1182,10 @@ func (t *testReaderSuite) writeUUIDs(c *C, relayDir string, uuids []string) []by } func (t *testReaderSuite) createMetaFile(c *C, relayDirPath, binlogFileName string, pos uint32, gtid string) { - meta := Meta{BinLogName: binlogFileName, BinLogPos: pos, BinlogGTID: gtid} + meta := LocalMeta{BinLogName: binlogFileName, BinLogPos: pos, BinlogGTID: gtid} metaFile, err2 := os.Create(path.Join(relayDirPath, utils.MetaFilename)) c.Assert(err2, IsNil) - err := toml.NewEncoder(metaFile).Encode(meta) + err := toml.NewEncoder(metaFile).Encode(&meta) c.Assert(err, IsNil) metaFile.Close() } diff --git a/dm/relay/purge_strategy.go b/dm/relay/purge_strategy.go new file mode 100644 index 00000000000..0fa535dcf57 --- /dev/null +++ b/dm/relay/purge_strategy.go @@ -0,0 +1,360 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package relay + +import ( + "fmt" + "strings" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/pingcap/ticdc/dm/pkg/log" + "github.com/pingcap/ticdc/dm/pkg/streamer" + "github.com/pingcap/ticdc/dm/pkg/terror" + "github.com/pingcap/ticdc/dm/pkg/utils" +) + +type strategyType uint32 + +const ( + strategyNone strategyType = iota + strategyInactive + strategyFilename + strategyTime + strategySpace +) + +func (s strategyType) String() string { + switch s { + case strategyInactive: + return "inactive strategy" + case strategyFilename: + return "filename strategy" + case strategyTime: + return "time strategy" + case strategySpace: + return "space strategy" + default: + return "unknown strategy" + } +} + +// PurgeStrategy represents a relay log purge strategy +// two purge behaviors +// 1. purge in the background +// 2. do one time purge process +// a strategy can support both or one of them. +type PurgeStrategy interface { + // Check checks whether need to do the purge in the background automatically + Check(args interface{}) (bool, error) + + // Do does the purge process one time + Do(args interface{}) error + + // Purging indicates whether is doing purge + Purging() bool + + // Type returns the strategy type + Type() strategyType +} + +// StrategyArgs represents args needed by purge strategy. +type StrategyArgs interface { + // SetActiveRelayLog sets active relay log info in args + // this should be called before do the purging + SetActiveRelayLog(active *streamer.RelayLogInfo) +} + +var fakeStrategyTaskName = strategyFilename.String() + +// filenameArgs represents args needed by filenameStrategy +// NOTE: should handle master-slave switch. +type filenameArgs struct { + relayBaseDir string + filename string // specified end safe filename + subDir string // sub dir for @filename, empty indicates latest sub dir + uuids []string + safeRelayLog *streamer.RelayLogInfo // all relay log files prior to this should be purged +} + +func (fa *filenameArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { + uuid := fa.subDir + if len(uuid) == 0 && len(fa.uuids) > 0 { + // no sub dir specified, use the latest one + uuid = fa.uuids[len(fa.uuids)-1] + } + _, endSuffix, _ := utils.ParseSuffixForUUID(uuid) + + safeRelayLog := &streamer.RelayLogInfo{ + TaskName: fakeStrategyTaskName, + UUID: uuid, + UUIDSuffix: endSuffix, + Filename: fa.filename, + } + + if active.Earlier(safeRelayLog) { + safeRelayLog = active + } + + fa.safeRelayLog = safeRelayLog + + // discard newer UUIDs + uuids := make([]string, 0, len(fa.uuids)) + for _, uuid := range fa.uuids { + _, suffix, _ := utils.ParseSuffixForUUID(uuid) + if suffix > endSuffix { + break + } + uuids = append(uuids, uuid) + } + fa.uuids = uuids +} + +func (fa *filenameArgs) String() string { + return fmt.Sprintf("(RelayBaseDir: %s, Filename: %s, SubDir: %s, UUIDs: %s, SafeRelayLog: %s)", + fa.relayBaseDir, fa.filename, fa.subDir, strings.Join(fa.uuids, ";"), fa.safeRelayLog) +} + +// filenameStrategy represents a relay purge strategy by filename +// similar to `PURGE BINARY LOGS TO`. +type filenameStrategy struct { + purging atomic.Bool + + logger log.Logger +} + +func newFilenameStrategy() PurgeStrategy { + return &filenameStrategy{ + logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "file name")), + } +} + +func (s *filenameStrategy) Check(args interface{}) (bool, error) { + // do not support purge in the background + return false, nil +} + +func (s *filenameStrategy) Do(args interface{}) error { + if !s.purging.CAS(false, true) { + return terror.ErrRelayThisStrategyIsPurging.Generate() + } + defer s.purging.Store(false) + + fa, ok := args.(*filenameArgs) + if !ok { + return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) + } + + return purgeRelayFilesBeforeFile(s.logger, fa.relayBaseDir, fa.uuids, fa.safeRelayLog) +} + +func (s *filenameStrategy) Purging() bool { + return s.purging.Load() +} + +func (s *filenameStrategy) Type() strategyType { + return strategyFilename +} + +// inactiveArgs represents args needed by inactiveStrategy. +type inactiveArgs struct { + relayBaseDir string + uuids []string + activeRelayLog *streamer.RelayLogInfo // earliest active relay log info +} + +func (ia *inactiveArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { + ia.activeRelayLog = active +} + +func (ia *inactiveArgs) String() string { + return fmt.Sprintf("(RelayBaseDir: %s, UUIDs: %s, ActiveRelayLog: %s)", + ia.relayBaseDir, strings.Join(ia.uuids, ";"), ia.activeRelayLog) +} + +// inactiveStrategy represents a relay purge strategy which purge all inactive relay log files +// definition of inactive relay log files: +// * not writing by relay unit +// * not reading by sync unit and will not be read by any running tasks +// TODO zxc: judge tasks are running dumper / loader +type inactiveStrategy struct { + purging atomic.Bool + + logger log.Logger +} + +func newInactiveStrategy() PurgeStrategy { + return &inactiveStrategy{ + logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "inactive binlog file")), + } +} + +func (s *inactiveStrategy) Check(args interface{}) (bool, error) { + // do not support purge in the background + return false, nil +} + +func (s *inactiveStrategy) Do(args interface{}) error { + if !s.purging.CAS(false, true) { + return terror.ErrRelayThisStrategyIsPurging.Generate() + } + defer s.purging.Store(false) + + ia, ok := args.(*inactiveArgs) + if !ok { + return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) + } + + return purgeRelayFilesBeforeFile(s.logger, ia.relayBaseDir, ia.uuids, ia.activeRelayLog) +} + +func (s *inactiveStrategy) Purging() bool { + return s.purging.Load() +} + +func (s *inactiveStrategy) Type() strategyType { + return strategyInactive +} + +// spaceArgs represents args needed by spaceStrategy. +type spaceArgs struct { + relayBaseDir string + remainSpace int64 // if remain space (GB) in @RelayBaseDir less than this, then it can be purged + uuids []string + activeRelayLog *streamer.RelayLogInfo // earliest active relay log info +} + +func (sa *spaceArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { + sa.activeRelayLog = active +} + +func (sa *spaceArgs) String() string { + return fmt.Sprintf("(RelayBaseDir: %s, AllowMinRemainSpace: %dGB, UUIDs: %s, ActiveRelayLog: %s)", + sa.relayBaseDir, sa.remainSpace, strings.Join(sa.uuids, ";"), sa.activeRelayLog) +} + +// spaceStrategy represents a relay purge strategy by remain space in dm-worker node. +type spaceStrategy struct { + purging atomic.Bool + + logger log.Logger +} + +func newSpaceStrategy() PurgeStrategy { + return &spaceStrategy{ + logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "space")), + } +} + +func (s *spaceStrategy) Check(args interface{}) (bool, error) { + sa, ok := args.(*spaceArgs) + if !ok { + return false, terror.ErrRelayPurgeArgsNotValid.Generate(args, args) + } + + storageSize, err := utils.GetStorageSize(sa.relayBaseDir) + if err != nil { + return false, terror.Annotatef(err, "get storage size for directory %s", sa.relayBaseDir) + } + + requiredBytes := uint64(sa.remainSpace) * 1024 * 1024 * 1024 + return storageSize.Available < requiredBytes, nil +} + +func (s *spaceStrategy) Do(args interface{}) error { + if !s.purging.CAS(false, true) { + return terror.ErrRelayThisStrategyIsPurging.Generate() + } + defer s.purging.Store(false) + + sa, ok := args.(*spaceArgs) + if !ok { + return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) + } + + // NOTE: we purge all inactive relay log files when available space less than @remainSpace + // maybe we can refine this to purge only part of this files every time + return purgeRelayFilesBeforeFile(s.logger, sa.relayBaseDir, sa.uuids, sa.activeRelayLog) +} + +func (s *spaceStrategy) Purging() bool { + return s.purging.Load() +} + +func (s *spaceStrategy) Type() strategyType { + return strategySpace +} + +// timeArgs represents args needed by timeStrategy. +type timeArgs struct { + relayBaseDir string + safeTime time.Time // if file's modified time is older than this, then it can be purged + uuids []string + activeRelayLog *streamer.RelayLogInfo // earliest active relay log info +} + +func (ta *timeArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { + ta.activeRelayLog = active +} + +func (ta *timeArgs) String() string { + return fmt.Sprintf("(RelayBaseDir: %s, SafeTime: %s, UUIDs: %s, ActiveRelayLog: %s)", + ta.relayBaseDir, ta.safeTime, strings.Join(ta.uuids, ";"), ta.activeRelayLog) +} + +// timeStrategy represents a relay purge strategy by time +// similar to `PURGE BINARY LOGS BEFORE` in MySQL. +type timeStrategy struct { + purging atomic.Bool + + logger log.Logger +} + +func newTimeStrategy() PurgeStrategy { + return &timeStrategy{ + logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "time")), + } +} + +func (s *timeStrategy) Check(args interface{}) (bool, error) { + // for time strategy, we always try to do the purging + return true, nil +} + +func (s *timeStrategy) Stop() { +} + +func (s *timeStrategy) Do(args interface{}) error { + if !s.purging.CAS(false, true) { + return terror.ErrRelayThisStrategyIsPurging.Generate() + } + defer s.purging.Store(false) + + ta, ok := args.(*timeArgs) + if !ok { + return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) + } + + return purgeRelayFilesBeforeFileAndTime(s.logger, ta.relayBaseDir, ta.uuids, ta.activeRelayLog, ta.safeTime) +} + +func (s *timeStrategy) Purging() bool { + return s.purging.Load() +} + +func (s *timeStrategy) Type() strategyType { + return strategyTime +} diff --git a/dm/relay/purger/purger.go b/dm/relay/purger.go similarity index 90% rename from dm/relay/purger/purger.go rename to dm/relay/purger.go index c869748536b..95e46e0eca7 100644 --- a/dm/relay/purger/purger.go +++ b/dm/relay/purger.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package purger +package relay import ( "context" @@ -30,8 +30,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/utils" ) -// RelayOperator represents an operator for relay log files, like writer, reader. -type RelayOperator interface { +// Operator represents an operator for relay log files, like writer, reader. +type Operator interface { // EarliestActiveRelayLog returns the earliest active relay log info in this operator EarliestActiveRelayLog() *streamer.RelayLogInfo } @@ -63,8 +63,8 @@ type Purger interface { // NewPurger creates a new purger. var NewPurger = NewRelayPurger -// RelayPurger purges relay log according to some strategies. -type RelayPurger struct { +// relayPurger purges relay log according to some strategies. +type relayPurger struct { lock sync.RWMutex wg sync.WaitGroup cancel context.CancelFunc @@ -74,7 +74,7 @@ type RelayPurger struct { cfg config.PurgeConfig baseRelayDir string indexPath string // server-uuid.index file path - operators []RelayOperator + operators []Operator interceptors []PurgeInterceptor strategies map[strategyType]PurgeStrategy @@ -82,8 +82,8 @@ type RelayPurger struct { } // NewRelayPurger creates a new purger. -func NewRelayPurger(cfg config.PurgeConfig, baseRelayDir string, operators []RelayOperator, interceptors []PurgeInterceptor) Purger { - p := &RelayPurger{ +func NewRelayPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger { + p := &relayPurger{ cfg: cfg, baseRelayDir: baseRelayDir, indexPath: filepath.Join(baseRelayDir, utils.UUIDIndexFilename), @@ -103,7 +103,7 @@ func NewRelayPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Rel } // Start starts strategies by config. -func (p *RelayPurger) Start() { +func (p *relayPurger) Start() { if !p.running.CAS(stageNew, stageRunning) { return } @@ -124,7 +124,7 @@ func (p *RelayPurger) Start() { // run starts running the process // NOTE: ensure run is called at most once of a Purger. -func (p *RelayPurger) run() { +func (p *relayPurger) run() { ticker := time.NewTicker(time.Duration(p.cfg.Interval) * time.Second) defer ticker.Stop() @@ -143,7 +143,7 @@ func (p *RelayPurger) run() { } // Close stops the started strategies. -func (p *RelayPurger) Close() { +func (p *relayPurger) Close() { if !p.running.CAS(stageRunning, stageClosed) { return } @@ -159,12 +159,12 @@ func (p *RelayPurger) Close() { } // Purging returns whether the purger is purging. -func (p *RelayPurger) Purging() bool { +func (p *relayPurger) Purging() bool { return p.purgingStrategy.Load() != uint32(strategyNone) } // Do does the purge process one time. -func (p *RelayPurger) Do(ctx context.Context, req *pb.PurgeRelayRequest) error { +func (p *relayPurger) Do(ctx context.Context, req *pb.PurgeRelayRequest) error { uuids, err := utils.ParseUUIDIndex(p.indexPath) if err != nil { return terror.Annotatef(err, "parse UUID index file %s", p.indexPath) @@ -201,7 +201,7 @@ func (p *RelayPurger) Do(ctx context.Context, req *pb.PurgeRelayRequest) error { } // tryPurge tries to do purge by check condition first. -func (p *RelayPurger) tryPurge() { +func (p *relayPurger) tryPurge() { strategy, args, err := p.check() if err != nil { p.logger.Error("check whether need to purge relay log files in background", zap.Error(err)) @@ -217,7 +217,7 @@ func (p *RelayPurger) tryPurge() { } // doPurge does the purging operation. -func (p *RelayPurger) doPurge(ps PurgeStrategy, args StrategyArgs) error { +func (p *relayPurger) doPurge(ps PurgeStrategy, args StrategyArgs) error { if !p.purgingStrategy.CAS(uint32(strategyNone), uint32(ps.Type())) { return terror.ErrRelayOtherStrategyIsPurging.Generate(ps.Type()) } @@ -241,7 +241,7 @@ func (p *RelayPurger) doPurge(ps PurgeStrategy, args StrategyArgs) error { return ps.Do(args) } -func (p *RelayPurger) check() (PurgeStrategy, StrategyArgs, error) { +func (p *relayPurger) check() (PurgeStrategy, StrategyArgs, error) { p.logger.Info("checking whether needing to purge relay log files") uuids, err := utils.ParseUUIDIndex(p.indexPath) @@ -292,7 +292,7 @@ func (p *RelayPurger) check() (PurgeStrategy, StrategyArgs, error) { } // earliestActiveRelayLog returns the current earliest active relay log info. -func (p *RelayPurger) earliestActiveRelayLog() *streamer.RelayLogInfo { +func (p *relayPurger) earliestActiveRelayLog() *streamer.RelayLogInfo { var earliest *streamer.RelayLogInfo for _, op := range p.operators { info := op.EarliestActiveRelayLog() @@ -309,7 +309,7 @@ func (p *RelayPurger) earliestActiveRelayLog() *streamer.RelayLogInfo { type dummyPurger struct{} // NewDummyPurger returns a dummy purger. -func NewDummyPurger(cfg config.PurgeConfig, baseRelayDir string, operators []RelayOperator, interceptors []PurgeInterceptor) Purger { +func NewDummyPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger { return &dummyPurger{} } diff --git a/dm/relay/purger/strategy.go b/dm/relay/purger/strategy.go deleted file mode 100644 index 2ee1e5955f8..00000000000 --- a/dm/relay/purger/strategy.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package purger - -import "github.com/pingcap/ticdc/dm/pkg/streamer" - -type strategyType uint32 - -const ( - strategyNone strategyType = iota - strategyInactive - strategyFilename - strategyTime - strategySpace -) - -func (s strategyType) String() string { - switch s { - case strategyInactive: - return "inactive strategy" - case strategyFilename: - return "filename strategy" - case strategyTime: - return "time strategy" - case strategySpace: - return "space strategy" - default: - return "unknown strategy" - } -} - -// PurgeStrategy represents a relay log purge strategy -// two purge behaviors -// 1. purge in the background -// 2. do one time purge process -// a strategy can support both or one of them. -type PurgeStrategy interface { - // Check checks whether need to do the purge in the background automatically - Check(args interface{}) (bool, error) - - // Do does the purge process one time - Do(args interface{}) error - - // Purging indicates whether is doing purge - Purging() bool - - // Type returns the strategy type - Type() strategyType -} - -// StrategyArgs represents args needed by purge strategy. -type StrategyArgs interface { - // SetActiveRelayLog sets active relay log info in args - // this should be called before do the purging - SetActiveRelayLog(active *streamer.RelayLogInfo) -} diff --git a/dm/relay/purger/strategy_filename.go b/dm/relay/purger/strategy_filename.go deleted file mode 100644 index 6d346b1139d..00000000000 --- a/dm/relay/purger/strategy_filename.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package purger - -import ( - "fmt" - "strings" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" - "github.com/pingcap/ticdc/dm/pkg/terror" - "github.com/pingcap/ticdc/dm/pkg/utils" -) - -var fakeTaskName = strategyFilename.String() - -// filenameArgs represents args needed by filenameStrategy -// NOTE: should handle master-slave switch. -type filenameArgs struct { - relayBaseDir string - filename string // specified end safe filename - subDir string // sub dir for @filename, empty indicates latest sub dir - uuids []string - safeRelayLog *streamer.RelayLogInfo // all relay log files prior to this should be purged -} - -func (fa *filenameArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { - uuid := fa.subDir - if len(uuid) == 0 && len(fa.uuids) > 0 { - // no sub dir specified, use the latest one - uuid = fa.uuids[len(fa.uuids)-1] - } - _, endSuffix, _ := utils.ParseSuffixForUUID(uuid) - - safeRelayLog := &streamer.RelayLogInfo{ - TaskName: fakeTaskName, - UUID: uuid, - UUIDSuffix: endSuffix, - Filename: fa.filename, - } - - if active.Earlier(safeRelayLog) { - safeRelayLog = active - } - - fa.safeRelayLog = safeRelayLog - - // discard newer UUIDs - uuids := make([]string, 0, len(fa.uuids)) - for _, uuid := range fa.uuids { - _, suffix, _ := utils.ParseSuffixForUUID(uuid) - if suffix > endSuffix { - break - } - uuids = append(uuids, uuid) - } - fa.uuids = uuids -} - -func (fa *filenameArgs) String() string { - return fmt.Sprintf("(RelayBaseDir: %s, Filename: %s, SubDir: %s, UUIDs: %s, SafeRelayLog: %s)", - fa.relayBaseDir, fa.filename, fa.subDir, strings.Join(fa.uuids, ";"), fa.safeRelayLog) -} - -// filenameStrategy represents a relay purge strategy by filename -// similar to `PURGE BINARY LOGS TO`. -type filenameStrategy struct { - purging atomic.Bool - - logger log.Logger -} - -func newFilenameStrategy() PurgeStrategy { - return &filenameStrategy{ - logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "file name")), - } -} - -func (s *filenameStrategy) Check(args interface{}) (bool, error) { - // do not support purge in the background - return false, nil -} - -func (s *filenameStrategy) Do(args interface{}) error { - if !s.purging.CAS(false, true) { - return terror.ErrRelayThisStrategyIsPurging.Generate() - } - defer s.purging.Store(false) - - fa, ok := args.(*filenameArgs) - if !ok { - return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) - } - - return purgeRelayFilesBeforeFile(s.logger, fa.relayBaseDir, fa.uuids, fa.safeRelayLog) -} - -func (s *filenameStrategy) Purging() bool { - return s.purging.Load() -} - -func (s *filenameStrategy) Type() strategyType { - return strategyFilename -} diff --git a/dm/relay/purger/strategy_inactive.go b/dm/relay/purger/strategy_inactive.go deleted file mode 100644 index 0c404a2b29d..00000000000 --- a/dm/relay/purger/strategy_inactive.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package purger - -import ( - "fmt" - "strings" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" - "github.com/pingcap/ticdc/dm/pkg/terror" -) - -// inactiveArgs represents args needed by inactiveStrategy. -type inactiveArgs struct { - relayBaseDir string - uuids []string - activeRelayLog *streamer.RelayLogInfo // earliest active relay log info -} - -func (ia *inactiveArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { - ia.activeRelayLog = active -} - -func (ia *inactiveArgs) String() string { - return fmt.Sprintf("(RelayBaseDir: %s, UUIDs: %s, ActiveRelayLog: %s)", - ia.relayBaseDir, strings.Join(ia.uuids, ";"), ia.activeRelayLog) -} - -// inactiveStrategy represents a relay purge strategy which purge all inactive relay log files -// definition of inactive relay log files: -// * not writing by relay unit -// * not reading by sync unit and will not be read by any running tasks -// TODO zxc: judge tasks are running dumper / loader -type inactiveStrategy struct { - purging atomic.Bool - - logger log.Logger -} - -func newInactiveStrategy() PurgeStrategy { - return &inactiveStrategy{ - logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "inactive binlog file")), - } -} - -func (s *inactiveStrategy) Check(args interface{}) (bool, error) { - // do not support purge in the background - return false, nil -} - -func (s *inactiveStrategy) Do(args interface{}) error { - if !s.purging.CAS(false, true) { - return terror.ErrRelayThisStrategyIsPurging.Generate() - } - defer s.purging.Store(false) - - ia, ok := args.(*inactiveArgs) - if !ok { - return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) - } - - return purgeRelayFilesBeforeFile(s.logger, ia.relayBaseDir, ia.uuids, ia.activeRelayLog) -} - -func (s *inactiveStrategy) Purging() bool { - return s.purging.Load() -} - -func (s *inactiveStrategy) Type() strategyType { - return strategyInactive -} diff --git a/dm/relay/purger/strategy_space.go b/dm/relay/purger/strategy_space.go deleted file mode 100644 index ce281b01a35..00000000000 --- a/dm/relay/purger/strategy_space.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package purger - -import ( - "fmt" - "strings" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" - "github.com/pingcap/ticdc/dm/pkg/terror" - "github.com/pingcap/ticdc/dm/pkg/utils" -) - -// spaceArgs represents args needed by spaceStrategy. -type spaceArgs struct { - relayBaseDir string - remainSpace int64 // if remain space (GB) in @RelayBaseDir less than this, then it can be purged - uuids []string - activeRelayLog *streamer.RelayLogInfo // earliest active relay log info -} - -func (sa *spaceArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { - sa.activeRelayLog = active -} - -func (sa *spaceArgs) String() string { - return fmt.Sprintf("(RelayBaseDir: %s, AllowMinRemainSpace: %dGB, UUIDs: %s, ActiveRelayLog: %s)", - sa.relayBaseDir, sa.remainSpace, strings.Join(sa.uuids, ";"), sa.activeRelayLog) -} - -// spaceStrategy represents a relay purge strategy by remain space in dm-worker node. -type spaceStrategy struct { - purging atomic.Bool - - logger log.Logger -} - -func newSpaceStrategy() PurgeStrategy { - return &spaceStrategy{ - logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "space")), - } -} - -func (s *spaceStrategy) Check(args interface{}) (bool, error) { - sa, ok := args.(*spaceArgs) - if !ok { - return false, terror.ErrRelayPurgeArgsNotValid.Generate(args, args) - } - - storageSize, err := utils.GetStorageSize(sa.relayBaseDir) - if err != nil { - return false, terror.Annotatef(err, "get storage size for directory %s", sa.relayBaseDir) - } - - requiredBytes := uint64(sa.remainSpace) * 1024 * 1024 * 1024 - return storageSize.Available < requiredBytes, nil -} - -func (s *spaceStrategy) Do(args interface{}) error { - if !s.purging.CAS(false, true) { - return terror.ErrRelayThisStrategyIsPurging.Generate() - } - defer s.purging.Store(false) - - sa, ok := args.(*spaceArgs) - if !ok { - return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) - } - - // NOTE: we purge all inactive relay log files when available space less than @remainSpace - // maybe we can refine this to purge only part of this files every time - return purgeRelayFilesBeforeFile(s.logger, sa.relayBaseDir, sa.uuids, sa.activeRelayLog) -} - -func (s *spaceStrategy) Purging() bool { - return s.purging.Load() -} - -func (s *spaceStrategy) Type() strategyType { - return strategySpace -} diff --git a/dm/relay/purger/strategy_time.go b/dm/relay/purger/strategy_time.go deleted file mode 100644 index b52f143c37d..00000000000 --- a/dm/relay/purger/strategy_time.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package purger - -import ( - "fmt" - "strings" - "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/pingcap/ticdc/dm/pkg/log" - "github.com/pingcap/ticdc/dm/pkg/streamer" - "github.com/pingcap/ticdc/dm/pkg/terror" -) - -// timeArgs represents args needed by timeStrategy. -type timeArgs struct { - relayBaseDir string - safeTime time.Time // if file's modified time is older than this, then it can be purged - uuids []string - activeRelayLog *streamer.RelayLogInfo // earliest active relay log info -} - -func (ta *timeArgs) SetActiveRelayLog(active *streamer.RelayLogInfo) { - ta.activeRelayLog = active -} - -func (ta *timeArgs) String() string { - return fmt.Sprintf("(RelayBaseDir: %s, SafeTime: %s, UUIDs: %s, ActiveRelayLog: %s)", - ta.relayBaseDir, ta.safeTime, strings.Join(ta.uuids, ";"), ta.activeRelayLog) -} - -// timeStrategy represents a relay purge strategy by time -// similar to `PURGE BINARY LOGS BEFORE` in MySQL. -type timeStrategy struct { - purging atomic.Bool - - logger log.Logger -} - -func newTimeStrategy() PurgeStrategy { - return &timeStrategy{ - logger: log.With(zap.String("component", "relay purger"), zap.String("strategy", "time")), - } -} - -func (s *timeStrategy) Check(args interface{}) (bool, error) { - // for time strategy, we always try to do the purging - return true, nil -} - -func (s *timeStrategy) Stop() { -} - -func (s *timeStrategy) Do(args interface{}) error { - if !s.purging.CAS(false, true) { - return terror.ErrRelayThisStrategyIsPurging.Generate() - } - defer s.purging.Store(false) - - ta, ok := args.(*timeArgs) - if !ok { - return terror.ErrRelayPurgeArgsNotValid.Generate(args, args) - } - - return purgeRelayFilesBeforeFileAndTime(s.logger, ta.relayBaseDir, ta.uuids, ta.activeRelayLog, ta.safeTime) -} - -func (s *timeStrategy) Purging() bool { - return s.purging.Load() -} - -func (s *timeStrategy) Type() strategyType { - return strategyTime -} diff --git a/dm/relay/purger/file.go b/dm/relay/purger_helper.go similarity index 97% rename from dm/relay/purger/file.go rename to dm/relay/purger_helper.go index 5e8c2cdb4d7..a840146ebc9 100644 --- a/dm/relay/purger/file.go +++ b/dm/relay/purger_helper.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package purger +package relay import ( "os" @@ -107,7 +107,7 @@ func collectRelayFilesBeforeFileAndTime(logger log.Logger, relayBaseDir string, ) if i+1 == len(uuids) { // same sub dir, only collect relay files newer than safeRelay.filename - shortFiles, err = streamer.CollectBinlogFilesCmp(dir, safeFilename, streamer.FileCmpLess) + shortFiles, err = CollectBinlogFilesCmp(dir, safeFilename, FileCmpLess) if err != nil { return nil, terror.Annotatef(err, "dir %s", dir) } @@ -117,7 +117,7 @@ func collectRelayFilesBeforeFileAndTime(logger log.Logger, relayBaseDir string, continue } // earlier sub dir, collect all relay files - shortFiles, err = streamer.CollectAllBinlogFiles(dir) + shortFiles, err = CollectAllBinlogFiles(dir) if err != nil { return nil, terror.Annotatef(err, "dir %s", dir) } diff --git a/dm/relay/purger/file_test.go b/dm/relay/purger_helper_test.go similarity index 99% rename from dm/relay/purger/file_test.go rename to dm/relay/purger_helper_test.go index c926bd47a35..cebc77abc62 100644 --- a/dm/relay/purger/file_test.go +++ b/dm/relay/purger_helper_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package purger +package relay import ( "os" diff --git a/dm/relay/purger/purger_test.go b/dm/relay/purger_test.go similarity index 95% rename from dm/relay/purger/purger_test.go rename to dm/relay/purger_test.go index c43def46daa..0919077c60f 100644 --- a/dm/relay/purger/purger_test.go +++ b/dm/relay/purger_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package purger +package relay import ( "bytes" @@ -42,14 +42,14 @@ var _ = Suite(&testPurgerSuite{ {"mysql-bin.000001", "mysql-bin.000002", "mysql-bin.000003"}, }, activeRelayLog: &streamer.RelayLogInfo{ - TaskName: fakeTaskName, + TaskName: fakeStrategyTaskName, UUID: "e9540a0d-f16d-11e8-8cb7-0242ac130008.000002", UUIDSuffix: 2, Filename: "mysql-bin.000003", // last in second sub dir }, }) -func TestSuite(t *testing.T) { +func TestPurgerSuite(t *testing.T) { TestingT(t) } @@ -82,7 +82,7 @@ func (t *testPurgerSuite) TestPurgeManuallyInactive(c *C) { Interval: 0, // disable automatically } - purger := NewPurger(cfg, baseDir, []RelayOperator{t}, nil) + purger := NewPurger(cfg, baseDir, []Operator{t}, nil) req := &pb.PurgeRelayRequest{ Inactive: true, @@ -121,7 +121,7 @@ func (t *testPurgerSuite) TestPurgeManuallyTime(c *C) { Interval: 0, // disable automatically } - purger := NewPurger(cfg, baseDir, []RelayOperator{t}, nil) + purger := NewPurger(cfg, baseDir, []Operator{t}, nil) req := &pb.PurgeRelayRequest{ Time: safeTime.Unix(), @@ -160,7 +160,7 @@ func (t *testPurgerSuite) TestPurgeManuallyFilename(c *C) { Interval: 0, // disable automatically } - purger := NewPurger(cfg, baseDir, []RelayOperator{t}, nil) + purger := NewPurger(cfg, baseDir, []Operator{t}, nil) req := &pb.PurgeRelayRequest{ Filename: t.relayFiles[0][2], @@ -214,7 +214,7 @@ func (t *testPurgerSuite) TestPurgeAutomaticallyTime(c *C) { } } - purger := NewPurger(cfg, baseDir, []RelayOperator{t}, nil) + purger := NewPurger(cfg, baseDir, []Operator{t}, nil) purger.Start() time.Sleep(2 * time.Second) // sleep enough time to purge all inactive relay log files purger.Close() @@ -254,7 +254,7 @@ func (t *testPurgerSuite) TestPurgeAutomaticallySpace(c *C) { RemainSpace: int64(storageSize.Available)/1024/1024/1024 + 1024, // always trigger purge } - purger := NewPurger(cfg, baseDir, []RelayOperator{t}, nil) + purger := NewPurger(cfg, baseDir, []Operator{t}, nil) purger.Start() time.Sleep(2 * time.Second) // sleep enough time to purge all inactive relay log files purger.Close() @@ -336,7 +336,7 @@ func (t *testPurgerSuite) TestPurgerInterceptor(c *C) { cfg := config.PurgeConfig{} interceptor := newFakeInterceptor() - purger := NewPurger(cfg, "", []RelayOperator{t}, []PurgeInterceptor{interceptor}) + purger := NewPurger(cfg, "", []Operator{t}, []PurgeInterceptor{interceptor}) req := &pb.PurgeRelayRequest{ Inactive: true, diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 3db05c27de1..e0322a4ef41 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -44,14 +44,11 @@ import ( pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/reader" - "github.com/pingcap/ticdc/dm/relay/retry" "github.com/pingcap/ticdc/dm/relay/transformer" - "github.com/pingcap/ticdc/dm/relay/writer" ) // used to fill RelayLogInfo. -var fakeTaskName = "relay" +var fakeRelayTaskName = "relay" const ( flushMetaInterval = 30 * time.Second @@ -295,7 +292,7 @@ func (r *Relay) process(ctx context.Context) error { } }() - readerRetry, err := retry.NewReaderRetry(r.cfg.ReaderRetry) + readerRetry, err := NewReaderRetry(r.cfg.ReaderRetry) if err != nil { return err } @@ -395,11 +392,11 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser } // setup a special writer to do the recovering - cfg := &writer.FileConfig{ + cfg := &FileConfig{ RelayDir: r.meta.Dir(), Filename: latestPos.Name, } - writer2 := writer.NewFileWriter(r.logger, cfg, parser2) + writer2 := NewFileWriter(r.logger, cfg, parser2) err := writer2.Start() if err != nil { return terror.Annotatef(err, "start recover writer for UUID %s with config %+v", uuid, cfg) @@ -462,9 +459,9 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser // the first return value is the index of last read rows event if the transaction is not finished. func (r *Relay) handleEvents( ctx context.Context, - reader2 reader.Reader, + reader2 Reader, transformer2 transformer.Transformer, - writer2 writer.Writer, + writer2 Writer, ) (int, error) { var ( _, lastPos = r.meta.Pos() @@ -811,7 +808,7 @@ func (r *Relay) doIntervalOps(ctx context.Context) { } // setUpReader setups the underlying reader used to read binlog events from the upstream master server. -func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { +func (r *Relay) setUpReader(ctx context.Context) (Reader, error) { ctx2, cancel := context.WithTimeout(ctx, utils.DefaultDBTimeout) defer cancel() @@ -826,7 +823,7 @@ func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { uuid, pos := r.meta.Pos() _, gs := r.meta.GTID() - cfg := &reader.Config{ + cfg := &RConfig{ SyncConfig: r.syncerCfg, Pos: pos, GTIDs: gs, @@ -834,7 +831,7 @@ func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { EnableGTID: r.cfg.EnableGTID, } - reader2 := reader.NewReader(cfg) + reader2 := NewUpstreamReader(cfg) err = reader2.Start() if err != nil { // do not log the whole config to protect the password in `SyncConfig`. @@ -847,13 +844,13 @@ func (r *Relay) setUpReader(ctx context.Context) (reader.Reader, error) { } // setUpWriter setups the underlying writer used to writer binlog events into file or other places. -func (r *Relay) setUpWriter(parser2 *parser.Parser) (writer.Writer, error) { +func (r *Relay) setUpWriter(parser2 *parser.Parser) (Writer, error) { uuid, pos := r.meta.Pos() - cfg := &writer.FileConfig{ + cfg := &FileConfig{ RelayDir: r.meta.Dir(), Filename: pos.Name, } - writer2 := writer.NewFileWriter(r.logger, cfg, parser2) + writer2 := NewFileWriter(r.logger, cfg, parser2) if err := writer2.Start(); err != nil { return nil, terror.Annotatef(err, "start writer for UUID %s with config %+v", uuid, cfg) } @@ -1027,7 +1024,7 @@ func (r *Relay) setActiveRelayLog(filename string) { uuid := r.meta.UUID() _, suffix, _ := utils.ParseSuffixForUUID(uuid) rli := &pkgstreamer.RelayLogInfo{ - TaskName: fakeTaskName, + TaskName: fakeRelayTaskName, UUID: uuid, UUIDSuffix: suffix, Filename: filename, diff --git a/dm/relay/relay_test.go b/dm/relay/relay_test.go index 3cb01ec80be..30a5176777c 100644 --- a/dm/relay/relay_test.go +++ b/dm/relay/relay_test.go @@ -36,12 +36,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog/event" "github.com/pingcap/ticdc/dm/pkg/conn" "github.com/pingcap/ticdc/dm/pkg/gtid" - "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/utils" - "github.com/pingcap/ticdc/dm/relay/reader" - "github.com/pingcap/ticdc/dm/relay/retry" "github.com/pingcap/ticdc/dm/relay/transformer" - "github.com/pingcap/ticdc/dm/relay/writer" ) var _ = Suite(&testRelaySuite{}) @@ -52,10 +48,6 @@ func TestSuite(t *testing.T) { type testRelaySuite struct{} -func (t *testRelaySuite) SetUpSuite(c *C) { - c.Assert(log.InitLogger(&log.Config{}), IsNil) -} - func newRelayCfg(c *C, flavor string) *Config { dbCfg := getDBConfigForTest() return &Config{ @@ -69,7 +61,7 @@ func newRelayCfg(c *C, flavor string) *Config { User: dbCfg.User, Password: dbCfg.Password, }, - ReaderRetry: retry.ReaderRetryConfig{ + ReaderRetry: ReaderRetryConfig{ BackoffRollback: 200 * time.Millisecond, BackoffMax: 1 * time.Second, BackoffMin: 1 * time.Millisecond, @@ -103,7 +95,7 @@ func getDBConfigForTest() config.DBConfig { // mockReader is used only for relay testing. type mockReader struct { - result reader.Result + result RResult err error } @@ -115,10 +107,10 @@ func (r *mockReader) Close() error { return nil } -func (r *mockReader) GetEvent(ctx context.Context) (reader.Result, error) { +func (r *mockReader) GetEvent(ctx context.Context) (RResult, error) { select { case <-ctx.Done(): - return reader.Result{}, ctx.Err() + return RResult{}, ctx.Err() default: } return r.result, r.err @@ -126,7 +118,7 @@ func (r *mockReader) GetEvent(ctx context.Context) (reader.Result, error) { // mockWriter is used only for relay testing. type mockWriter struct { - result writer.Result + result WResult err error latestEvent *replication.BinlogEvent } @@ -139,11 +131,11 @@ func (w *mockWriter) Close() error { return nil } -func (w *mockWriter) Recover(ctx context.Context) (writer.RecoverResult, error) { - return writer.RecoverResult{}, nil +func (w *mockWriter) Recover(ctx context.Context) (RecoverResult, error) { + return RecoverResult{}, nil } -func (w *mockWriter) WriteEvent(ev *replication.BinlogEvent) (writer.Result, error) { +func (w *mockWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { w.latestEvent = ev // hold it return w.result, w.err } diff --git a/dm/relay/writer/file.go b/dm/relay/relay_writer.go similarity index 77% rename from dm/relay/writer/file.go rename to dm/relay/relay_writer.go index fc444245299..0cb0b52ac06 100644 --- a/dm/relay/writer/file.go +++ b/dm/relay/relay_writer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "context" @@ -30,12 +30,62 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog" "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/binlog/event" - bw "github.com/pingcap/ticdc/dm/pkg/binlog/writer" + "github.com/pingcap/ticdc/dm/pkg/gtid" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" ) +const ( + ignoreReasonAlreadyExists = "already exists" + ignoreReasonFakeRotate = "fake rotate event" +) + +// WResult represents a write result. +type WResult struct { + Ignore bool // whether the event ignored by the writer + IgnoreReason string // why the writer ignore the event +} + +// RecoverResult represents a result for a binlog recover operation. +type RecoverResult struct { + // if truncate trailing incomplete events during recovering in relay log + Truncated bool + // the latest binlog position after recover operation has done. + LatestPos gmysql.Position + // the latest binlog GTID set after recover operation has done. + LatestGTIDs gtid.Set +} + +// Writer writes binlog events into disk or any other memory structure. +// The writer should support: +// 1. write binlog events and report the operation result +// 2. skip any obsolete binlog events +// 3. generate dummy events to fill the gap if needed +// 4. rotate binlog(relay) file if needed +// 5. rollback/discard unfinished binlog entries(events or transactions) +type Writer interface { + // Start prepares the writer for writing binlog events. + Start() error + + // Close closes the writer and release the resource. + Close() error + + // Recover tries to recover the binlog file or any other memory structure associate with this writer. + // It is often used to recover a binlog file with some corrupt/incomplete binlog events/transactions at the end of the file. + // It is not safe for concurrent use by multiple goroutines. + // It should be called before writing to the file. + Recover(ctx context.Context) (RecoverResult, error) + + // WriteEvent writes an binlog event's data into disk or any other places. + // It is not safe for concurrent use by multiple goroutines. + WriteEvent(ev *replication.BinlogEvent) (WResult, error) + + // Flush flushes the buffered data to a stable storage or sends through the network. + // It is not safe for concurrent use by multiple goroutines. + Flush() error +} + // FileConfig is the configuration used by the FileWriter. type FileConfig struct { RelayDir string // directory to store relay log files. @@ -51,7 +101,7 @@ type FileWriter struct { // underlying binlog writer, // it will be created/started until needed. - out *bw.FileWriter + out *BinlogWriter // the parser often used to verify events's statement through parsing them. parser *parser.Parser @@ -116,12 +166,12 @@ func (w *FileWriter) Recover(ctx context.Context) (RecoverResult, error) { } // WriteEvent implements Writer.WriteEvent. -func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (Result, error) { +func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { w.mu.Lock() defer w.mu.Unlock() if w.stage != common.StagePrepared { - return Result{}, terror.ErrRelayWriterNeedStart.Generate(w.stage, common.StagePrepared) + return WResult{}, terror.ErrRelayWriterNeedStart.Generate(w.stage, common.StagePrepared) } switch ev.Event.(type) { @@ -155,7 +205,7 @@ func (w *FileWriter) offset() int64 { if w.out == nil { return 0 } - status := w.out.Status().(*bw.FileWriterStatus) + status := w.out.Status().(*BinlogWriterStatus) return status.Offset } @@ -164,53 +214,53 @@ func (w *FileWriter) offset() int64 { // 2. open/create a new binlog file // 3. write the binlog file header if not exists // 4. write the FormatDescriptionEvent if not exists one -func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (Result, error) { +func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (WResult, error) { // close the previous binlog file if w.out != nil { w.logger.Info("closing previous underlying binlog writer", zap.Reflect("status", w.out.Status())) err := w.out.Close() if err != nil { - return Result{}, terror.Annotate(err, "close previous underlying binlog writer") + return WResult{}, terror.Annotate(err, "close previous underlying binlog writer") } } // verify filename if !binlog.VerifyFilename(w.filename.Load()) { - return Result{}, terror.ErrRelayBinlogNameNotValid.Generatef("binlog filename %s not valid", w.filename.Load()) + return WResult{}, terror.ErrRelayBinlogNameNotValid.Generatef("binlog filename %s not valid", w.filename.Load()) } // open/create a new binlog file filename := filepath.Join(w.cfg.RelayDir, w.filename.Load()) - outCfg := &bw.FileWriterConfig{ + outCfg := &BinlogWriterConfig{ Filename: filename, } - out := bw.NewFileWriter(w.logger, outCfg) + out := NewBinlogWriter(w.logger, outCfg) err := out.Start() if err != nil { - return Result{}, terror.Annotatef(err, "start underlying binlog writer for %s", filename) + return WResult{}, terror.Annotatef(err, "start underlying binlog writer for %s", filename) } - w.out = out.(*bw.FileWriter) + w.out = out w.logger.Info("open underlying binlog writer", zap.Reflect("status", w.out.Status())) // write the binlog file header if not exists exist, err := checkBinlogHeaderExist(filename) if err != nil { - return Result{}, terror.Annotatef(err, "check binlog file header for %s", filename) + return WResult{}, terror.Annotatef(err, "check binlog file header for %s", filename) } else if !exist { err = w.out.Write(replication.BinLogFileHeader) if err != nil { - return Result{}, terror.Annotatef(err, "write binlog file header for %s", filename) + return WResult{}, terror.Annotatef(err, "write binlog file header for %s", filename) } } // write the FormatDescriptionEvent if not exists one exist, err = checkFormatDescriptionEventExist(filename) if err != nil { - return Result{}, terror.Annotatef(err, "check FormatDescriptionEvent for %s", filename) + return WResult{}, terror.Annotatef(err, "check FormatDescriptionEvent for %s", filename) } else if !exist { err = w.out.Write(ev.RawData) if err != nil { - return Result{}, terror.Annotatef(err, "write FormatDescriptionEvent %+v for %s", ev.Header, filename) + return WResult{}, terror.Annotatef(err, "write FormatDescriptionEvent %+v for %s", ev.Header, filename) } } var reason string @@ -218,7 +268,7 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( reason = ignoreReasonAlreadyExists } - return Result{ + return WResult{ Ignore: exist, // ignore if exists IgnoreReason: reason, }, nil @@ -232,7 +282,7 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( // NOTE: we do not create a new binlog file when received a RotateEvent, // instead, we create a new binlog file when received a FormatDescriptionEvent. // because a binlog file without any events has no meaning. -func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result Result, err error) { +func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result WResult, err error) { rotateEv, ok := ev.Event.(*replication.RotateEvent) if !ok { return result, terror.ErrRelayWriterExpectRotateEv.Generate(ev.Header) @@ -256,7 +306,7 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result Resu // write the RotateEvent if not fake if utils.IsFakeRotateEvent(ev.Header) { // skip fake rotate event - return Result{ + return WResult{ Ignore: true, IgnoreReason: ignoreReasonFakeRotate, }, nil @@ -277,7 +327,7 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result Resu return result, terror.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.cfg.RelayDir, currFile)) } - return Result{ + return WResult{ Ignore: false, }, nil } @@ -286,27 +336,27 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result Resu // 1. handle a potential hole if exists // 2. handle any duplicate events if exist // 3. write the non-duplicate event -func (w *FileWriter) handleEventDefault(ev *replication.BinlogEvent) (Result, error) { +func (w *FileWriter) handleEventDefault(ev *replication.BinlogEvent) (WResult, error) { result, err := w.handlePotentialHoleOrDuplicate(ev) if err != nil { - return Result{}, err + return WResult{}, err } else if result.Ignore { return result, nil } // write the non-duplicate event err = w.out.Write(ev.RawData) - return Result{ + return WResult{ Ignore: false, }, terror.Annotatef(err, "write event %+v", ev.Header) } // handlePotentialHoleOrDuplicate combines handleFileHoleExist and handleDuplicateEventsExist. -func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) (Result, error) { +func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) (WResult, error) { // handle a potential hole mayDuplicate, err := w.handleFileHoleExist(ev) if err != nil { - return Result{}, terror.Annotatef(err, "handle a potential hole in %s before %+v", + return WResult{}, terror.Annotatef(err, "handle a potential hole in %s before %+v", w.filename.Load(), ev.Header) } @@ -314,7 +364,7 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) // handle any duplicate events if exist result, err2 := w.handleDuplicateEventsExist(ev) if err2 != nil { - return Result{}, terror.Annotatef(err2, "handle a potential duplicate event %+v in %s", + return WResult{}, terror.Annotatef(err2, "handle a potential duplicate event %+v in %s", ev.Header, w.filename.Load()) } if result.Ignore { @@ -323,7 +373,7 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) } } - return Result{ + return WResult{ Ignore: false, }, nil } @@ -335,7 +385,7 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) func (w *FileWriter) handleFileHoleExist(ev *replication.BinlogEvent) (bool, error) { // 1. detect whether a hole exists evStartPos := int64(ev.Header.LogPos - ev.Header.EventSize) - outFs, ok := w.out.Status().(*bw.FileWriterStatus) + outFs, ok := w.out.Status().(*BinlogWriterStatus) if !ok { return false, terror.ErrRelayWriterStatusNotValid.Generate(w.out.Status()) } @@ -367,11 +417,11 @@ func (w *FileWriter) handleFileHoleExist(ev *replication.BinlogEvent) (bool, err } // handleDuplicateEventsExist tries to handle a potential duplicate event in the binlog file. -func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (Result, error) { +func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (WResult, error) { filename := filepath.Join(w.cfg.RelayDir, w.filename.Load()) duplicate, err := checkIsDuplicateEvent(filename, ev) if err != nil { - return Result{}, terror.Annotatef(err, "check event %+v whether duplicate in %s", ev.Header, filename) + return WResult{}, terror.Annotatef(err, "check event %+v whether duplicate in %s", ev.Header, filename) } else if duplicate { w.logger.Info("event is duplicate", zap.Reflect("header", ev.Header), zap.String("file", w.filename.Load())) } @@ -381,7 +431,7 @@ func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (Re reason = ignoreReasonAlreadyExists } - return Result{ + return WResult{ Ignore: duplicate, IgnoreReason: reason, }, nil diff --git a/dm/relay/writer/file_test.go b/dm/relay/relay_writer_test.go similarity index 99% rename from dm/relay/writer/file_test.go rename to dm/relay/relay_writer_test.go index 685672facb8..3779afd420f 100644 --- a/dm/relay/writer/file_test.go +++ b/dm/relay/relay_writer_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package writer +package relay import ( "bytes" @@ -35,7 +35,7 @@ import ( var _ = check.Suite(&testFileWriterSuite{}) -func TestSuite(t *testing.T) { +func TestFileWriterSuite(t *testing.T) { check.TestingT(t) } diff --git a/dm/relay/retry/reader.go b/dm/relay/remote_retry.go similarity index 99% rename from dm/relay/retry/reader.go rename to dm/relay/remote_retry.go index c9e37e00c95..8b93c6b91c8 100644 --- a/dm/relay/retry/reader.go +++ b/dm/relay/remote_retry.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package retry +package relay import ( "context" diff --git a/dm/relay/retry/reader_test.go b/dm/relay/remote_retry_test.go similarity index 97% rename from dm/relay/retry/reader_test.go rename to dm/relay/remote_retry_test.go index 4f03db06314..20754274d09 100644 --- a/dm/relay/retry/reader_test.go +++ b/dm/relay/remote_retry_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package retry +package relay import ( "context" @@ -23,7 +23,7 @@ import ( . "github.com/pingcap/check" ) -func TestSuite(t *testing.T) { +func TestReaderRetrySuite(t *testing.T) { TestingT(t) } diff --git a/dm/pkg/streamer/streamer.go b/dm/relay/streamer.go similarity index 94% rename from dm/pkg/streamer/streamer.go rename to dm/relay/streamer.go index a7e4b67db06..891dbda1d3f 100644 --- a/dm/pkg/streamer/streamer.go +++ b/dm/relay/streamer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "bytes" @@ -20,7 +20,6 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog/common" "github.com/pingcap/ticdc/dm/pkg/binlog/event" - "github.com/pingcap/ticdc/dm/pkg/binlog/reader" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" @@ -33,9 +32,6 @@ var heartbeatInterval = common.MasterHeartbeatPeriod // TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer. -// Streamer provides the ability to get binlog event from remote server or local file. -type Streamer reader.Streamer - // LocalStreamer reads and parses binlog events from local binlog file. type LocalStreamer struct { ch chan *replication.BinlogEvent diff --git a/dm/pkg/streamer/streamer_test.go b/dm/relay/streamer_test.go similarity index 86% rename from dm/pkg/streamer/streamer_test.go rename to dm/relay/streamer_test.go index 279d874fb92..a3c59e33cd8 100644 --- a/dm/pkg/streamer/streamer_test.go +++ b/dm/relay/streamer_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package streamer +package relay import ( "context" @@ -31,9 +31,9 @@ var _ = Suite(&testStreamerSuite{}) type testStreamerSuite struct{} func (t *testStreamerSuite) TestStreamer(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval", "return(10000)"), IsNil) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval"), IsNil) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -105,9 +105,9 @@ func (t *testStreamerSuite) TestStreamer(c *C) { } func (t *testStreamerSuite) TestHeartbeat(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval", "return(1)"), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval", "return(1)"), IsNil) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval"), IsNil) }() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) diff --git a/dm/relay/reader/reader.go b/dm/relay/upstream_reader.go similarity index 85% rename from dm/relay/reader/reader.go rename to dm/relay/upstream_reader.go index e928ccdd823..240cd26ca11 100644 --- a/dm/relay/reader/reader.go +++ b/dm/relay/upstream_reader.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package reader +package relay import ( "context" @@ -28,8 +28,8 @@ import ( "github.com/pingcap/ticdc/dm/pkg/terror" ) -// Result represents a read operation result. -type Result struct { +// RResult represents a read operation result. +type RResult struct { Event *replication.BinlogEvent } @@ -48,11 +48,11 @@ type Reader interface { // GetEvent gets the binlog event one by one, it will block if no event can be read. // You can pass a context (like Cancel) to break the block. - GetEvent(ctx context.Context) (Result, error) + GetEvent(ctx context.Context) (RResult, error) } -// Config is the configuration used by the Reader. -type Config struct { +// RConfig is the configuration used by the Reader. +type RConfig struct { SyncConfig replication.BinlogSyncerConfig Pos mysql.Position GTIDs gtid.Set @@ -61,8 +61,8 @@ type Config struct { } // reader implements Reader interface. -type reader struct { - cfg *Config +type upstreamReader struct { + cfg *RConfig mu sync.RWMutex stage common.Stage @@ -73,9 +73,9 @@ type reader struct { logger log.Logger } -// NewReader creates a Reader instance. -func NewReader(cfg *Config) Reader { - return &reader{ +// NewUpstreamReader creates a Reader instance. +func NewUpstreamReader(cfg *RConfig) Reader { + return &upstreamReader{ cfg: cfg, in: br.NewTCPReader(cfg.SyncConfig), out: make(chan *replication.BinlogEvent), @@ -84,7 +84,7 @@ func NewReader(cfg *Config) Reader { } // Start implements Reader.Start. -func (r *reader) Start() error { +func (r *upstreamReader) Start() error { r.mu.Lock() defer r.mu.Unlock() @@ -109,7 +109,7 @@ func (r *reader) Start() error { } // Close implements Reader.Close. -func (r *reader) Close() error { +func (r *upstreamReader) Close() error { r.mu.Lock() defer r.mu.Unlock() @@ -124,11 +124,11 @@ func (r *reader) Close() error { // GetEvent implements Reader.GetEvent. // NOTE: can only close the reader after this returned. -func (r *reader) GetEvent(ctx context.Context) (Result, error) { +func (r *upstreamReader) GetEvent(ctx context.Context) (RResult, error) { r.mu.RLock() defer r.mu.RUnlock() - var result Result + var result RResult if r.stage != common.StagePrepared { return result, terror.ErrRelayReaderNeedStart.Generate(r.stage, common.StagePrepared) } @@ -148,13 +148,13 @@ func (r *reader) GetEvent(ctx context.Context) (Result, error) { } } -func (r *reader) setUpReaderByGTID() error { +func (r *upstreamReader) setUpReaderByGTID() error { gs := r.cfg.GTIDs r.logger.Info("start sync", zap.String("master", r.cfg.MasterID), zap.Stringer("from GTID set", gs)) return r.in.StartSyncByGTID(gs) } -func (r *reader) setUpReaderByPos() error { +func (r *upstreamReader) setUpReaderByPos() error { pos := r.cfg.Pos r.logger.Info("start sync", zap.String("master", r.cfg.MasterID), zap.Stringer("from position", pos)) return r.in.StartSyncByPos(pos) diff --git a/dm/relay/reader/reader_test.go b/dm/relay/upstream_reader_test.go similarity index 84% rename from dm/relay/reader/reader_test.go rename to dm/relay/upstream_reader_test.go index e45f1d4eb76..81cd77232c7 100644 --- a/dm/relay/reader/reader_test.go +++ b/dm/relay/upstream_reader_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package reader +package relay import ( "context" @@ -25,22 +25,22 @@ import ( br "github.com/pingcap/ticdc/dm/pkg/binlog/reader" ) -var _ = check.Suite(&testReaderSuite{}) +var _ = check.Suite(&testRemoteReaderSuite{}) -func TestSuite(t *testing.T) { +func TestRemoteReaderSuite(t *testing.T) { check.TestingT(t) } -type testReaderSuite struct{} +type testRemoteReaderSuite struct{} -func (t *testReaderSuite) TestInterface(c *check.C) { +func (t *testRemoteReaderSuite) TestInterface(c *check.C) { cases := []*replication.BinlogEvent{ {RawData: []byte{1}}, {RawData: []byte{2}}, {RawData: []byte{3}}, } - cfg := &Config{ + cfg := &RConfig{ SyncConfig: replication.BinlogSyncerConfig{ ServerID: 101, }, @@ -48,18 +48,18 @@ func (t *testReaderSuite) TestInterface(c *check.C) { } // test with position - r := NewReader(cfg) + r := NewUpstreamReader(cfg) t.testInterfaceWithReader(c, r, cases) // test with GTID cfg.EnableGTID = true - r = NewReader(cfg) + r = NewUpstreamReader(cfg) t.testInterfaceWithReader(c, r, cases) } -func (t *testReaderSuite) testInterfaceWithReader(c *check.C, r Reader, cases []*replication.BinlogEvent) { +func (t *testRemoteReaderSuite) testInterfaceWithReader(c *check.C, r Reader, cases []*replication.BinlogEvent) { // replace underlying reader with a mock reader for testing - concreteR := r.(*reader) + concreteR := r.(*upstreamReader) c.Assert(concreteR, check.NotNil) mockR := br.NewMockReader() concreteR.in = mockR @@ -102,17 +102,17 @@ func (t *testReaderSuite) testInterfaceWithReader(c *check.C, r Reader, cases [] c.Assert(result.Event, check.IsNil) } -func (t *testReaderSuite) TestGetEventWithError(c *check.C) { - cfg := &Config{ +func (t *testRemoteReaderSuite) TestGetEventWithError(c *check.C) { + cfg := &RConfig{ SyncConfig: replication.BinlogSyncerConfig{ ServerID: 101, }, MasterID: "test-master", } - r := NewReader(cfg) + r := NewUpstreamReader(cfg) // replace underlying reader with a mock reader for testing - concreteR := r.(*reader) + concreteR := r.(*upstreamReader) c.Assert(concreteR, check.NotNil) mockR := br.NewMockReader() concreteR.in = mockR diff --git a/dm/relay/util.go b/dm/relay/util.go index 0f1b5e67906..dddb75d13d4 100644 --- a/dm/relay/util.go +++ b/dm/relay/util.go @@ -16,8 +16,12 @@ package relay import ( "context" "database/sql" + "io" "strings" + "github.com/pingcap/errors" + + "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" ) @@ -37,3 +41,34 @@ func isNewServer(ctx context.Context, prevUUID string, db *sql.DB, flavor string } return true, nil } + +// getNextUUID gets (the nextUUID and its suffix) after the current UUID. +func getNextUUID(currUUID string, uuids []string) (string, string, error) { + for i := len(uuids) - 2; i >= 0; i-- { + if uuids[i] == currUUID { + nextUUID := uuids[i+1] + _, suffixInt, err := utils.ParseSuffixForUUID(nextUUID) + if err != nil { + return "", "", terror.Annotatef(err, "UUID %s", nextUUID) + } + return nextUUID, utils.SuffixIntToStr(suffixInt), nil + } + } + return "", "", nil +} + +// isIgnorableParseError checks whether is a ignorable error for `BinlogParser.ParseFile`. +func isIgnorableParseError(err error) bool { + if err == nil { + return false + } + + if strings.Contains(err.Error(), "err EOF") { + // NOTE: go-mysql returned err not includes caused err, but as message, ref: parser.go `parseSingleEvent` + return true + } else if errors.Cause(err) == io.EOF { + return true + } + + return false +} diff --git a/dm/relay/util_test.go b/dm/relay/util_test.go index cef391a13b1..834051054fd 100644 --- a/dm/relay/util_test.go +++ b/dm/relay/util_test.go @@ -16,6 +16,9 @@ package relay import ( "context" "fmt" + "io" + + "github.com/pingcap/errors" "github.com/DATA-DOG/go-sqlmock" gmysql "github.com/go-mysql-org/go-mysql/mysql" @@ -74,3 +77,97 @@ func mockGetRandomServerID(mockDB sqlmock.Sqlmock) { mockDB.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'server_id'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("server_id", "1")) } + +func (t *testUtilSuite) TestGetNextUUID(c *C) { + UUIDs := []string{ + "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", + "7acfedb5-3008-4fa2-9776-6bac42b025fe.000002", + "92ffd03b-813e-4391-b16a-177524e8d531.000003", + "338513ce-b24e-4ff8-9ded-9ac5aa8f4d74.000004", + } + cases := []struct { + currUUID string + UUIDs []string + nextUUID string + nextUUIDSuffix string + errMsgReg string + }{ + { + // empty current and UUID list + }, + { + // non-empty current UUID, but empty UUID list + currUUID: "b60868af-5a6f-11e9-9ea3-0242ac160006.000001", + }, + { + // empty current UUID, but non-empty UUID list + UUIDs: UUIDs, + }, + { + // current UUID in UUID list, has next UUID + currUUID: UUIDs[0], + UUIDs: UUIDs, + nextUUID: UUIDs[1], + nextUUIDSuffix: UUIDs[1][len(UUIDs[1])-6:], + }, + { + // current UUID in UUID list, but has no next UUID + currUUID: UUIDs[len(UUIDs)-1], + UUIDs: UUIDs, + }, + { + // current UUID not in UUID list + currUUID: "40ed16c1-f6f7-4012-aa9b-d360261d2b22.666666", + UUIDs: UUIDs, + }, + { + // invalid next UUID in UUID list + currUUID: UUIDs[len(UUIDs)-1], + UUIDs: append(UUIDs, "invalid-uuid"), + errMsgReg: ".*invalid-uuid.*", + }, + } + + for _, cs := range cases { + nu, nus, err := getNextUUID(cs.currUUID, cs.UUIDs) + if len(cs.errMsgReg) > 0 { + c.Assert(err, ErrorMatches, cs.errMsgReg) + } else { + c.Assert(err, IsNil) + } + c.Assert(nu, Equals, cs.nextUUID) + c.Assert(nus, Equals, cs.nextUUIDSuffix) + } +} + +func (t *testUtilSuite) TestIsIgnorableParseError(c *C) { + cases := []struct { + err error + ignorable bool + }{ + { + err: nil, + ignorable: false, + }, + { + err: io.EOF, + ignorable: true, + }, + { + err: errors.Annotate(io.EOF, "annotated end of file"), + ignorable: true, + }, + { + err: errors.New("get event header err EOF xxxx"), + ignorable: true, + }, + { + err: errors.New("some other error"), + ignorable: false, + }, + } + + for _, cs := range cases { + c.Assert(isIgnorableParseError(cs.err), Equals, cs.ignorable) + } +} diff --git a/dm/relay/writer/writer.go b/dm/relay/writer/writer.go deleted file mode 100644 index c06939f92ae..00000000000 --- a/dm/relay/writer/writer.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -import ( - "context" - - gmysql "github.com/go-mysql-org/go-mysql/mysql" - "github.com/go-mysql-org/go-mysql/replication" - - "github.com/pingcap/ticdc/dm/pkg/gtid" -) - -const ( - ignoreReasonAlreadyExists = "already exists" - ignoreReasonFakeRotate = "fake rotate event" -) - -// Result represents a write result. -type Result struct { - Ignore bool // whether the event ignored by the writer - IgnoreReason string // why the writer ignore the event -} - -// RecoverResult represents a result for a binlog recover operation. -type RecoverResult struct { - // if truncate trailing incomplete events during recovering in relay log - Truncated bool - // the latest binlog position after recover operation has done. - LatestPos gmysql.Position - // the latest binlog GTID set after recover operation has done. - LatestGTIDs gtid.Set -} - -// Writer writes binlog events into disk or any other memory structure. -// The writer should support: -// 1. write binlog events and report the operation result -// 2. skip any obsolete binlog events -// 3. generate dummy events to fill the gap if needed -// 4. rotate binlog(relay) file if needed -// 5. rollback/discard unfinished binlog entries(events or transactions) -type Writer interface { - // Start prepares the writer for writing binlog events. - Start() error - - // Close closes the writer and release the resource. - Close() error - - // Recover tries to recover the binlog file or any other memory structure associate with this writer. - // It is often used to recover a binlog file with some corrupt/incomplete binlog events/transactions at the end of the file. - // It is not safe for concurrent use by multiple goroutines. - // It should be called before writing to the file. - Recover(ctx context.Context) (RecoverResult, error) - - // WriteEvent writes an binlog event's data into disk or any other places. - // It is not safe for concurrent use by multiple goroutines. - WriteEvent(ev *replication.BinlogEvent) (Result, error) - - // Flush flushes the buffered data to a stable storage or sends through the network. - // It is not safe for concurrent use by multiple goroutines. - Flush() error -} diff --git a/dm/syncer/streamer_controller.go b/dm/syncer/streamer_controller.go index 063b8950903..a4c3db51360 100644 --- a/dm/syncer/streamer_controller.go +++ b/dm/syncer/streamer_controller.go @@ -27,12 +27,13 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog" "github.com/pingcap/ticdc/dm/pkg/binlog/common" + "github.com/pingcap/ticdc/dm/pkg/binlog/reader" tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/retry" - "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" "github.com/pingcap/ticdc/dm/syncer/dbconn" ) @@ -46,16 +47,16 @@ var minErrorRetryInterval = 1 * time.Minute // For other implementations who implement StreamerProducer and Streamer can easily take place of Syncer.streamProducer // For test is easy to mock. type StreamerProducer interface { - generateStreamer(location binlog.Location) (streamer.Streamer, error) + generateStreamer(location binlog.Location) (reader.Streamer, error) } // Read local relay log. type localBinlogReader struct { - reader *streamer.BinlogReader + reader *relay.BinlogReader EnableGTID bool } -func (l *localBinlogReader) generateStreamer(location binlog.Location) (streamer.Streamer, error) { +func (l *localBinlogReader) generateStreamer(location binlog.Location) (reader.Streamer, error) { if l.EnableGTID { return l.reader.StartSyncByGTID(location.GetGTID().Origin().Clone()) } @@ -70,7 +71,7 @@ type remoteBinlogReader struct { EnableGTID bool } -func (r *remoteBinlogReader) generateStreamer(location binlog.Location) (streamer.Streamer, error) { +func (r *remoteBinlogReader) generateStreamer(location binlog.Location) (reader.Streamer, error) { defer func() { lastSlaveConnectionID := r.reader.LastConnectionID() r.tctx.L().Info("last slave connection", zap.Uint32("connection ID", lastSlaveConnectionID)) @@ -106,7 +107,7 @@ type StreamerController struct { localBinlogDir string timezone *time.Location - streamer streamer.Streamer + streamer reader.Streamer streamerProducer StreamerProducer // meetError means meeting error when get binlog event @@ -121,11 +122,11 @@ type StreamerController struct { // whether the server id is updated serverIDUpdated bool - notifier streamer.EventNotifier + notifier relay.EventNotifier } // NewStreamerController creates a new streamer controller. -func NewStreamerController(notifier streamer.EventNotifier, +func NewStreamerController(notifier relay.EventNotifier, syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, @@ -234,7 +235,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca if c.currentBinlogType == RemoteBinlog { c.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID} } else { - c.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(c.notifier, tctx.L(), &streamer.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} + c.streamerProducer = &localBinlogReader{relay.NewBinlogReader(c.notifier, tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID} } c.streamer, err = c.streamerProducer.generateStreamer(location) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 1351284930b..e68d5e9b12f 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -63,6 +63,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/pkg/terror" "github.com/pingcap/ticdc/dm/pkg/utils" + "github.com/pingcap/ticdc/dm/relay" "github.com/pingcap/ticdc/dm/syncer/dbconn" operator "github.com/pingcap/ticdc/dm/syncer/err-operator" "github.com/pingcap/ticdc/dm/syncer/metrics" @@ -223,11 +224,11 @@ type Syncer struct { workerJobTSArray []*atomic.Int64 // worker's sync job TS array, note that idx=0 is skip idx and idx=1 is ddl idx,sql worker job idx=(queue id + 2) lastCheckpointFlushedTime time.Time - notifier streamer.EventNotifier + notifier relay.EventNotifier } // NewSyncer creates a new Syncer. -func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier streamer.EventNotifier) *Syncer { +func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier relay.EventNotifier) *Syncer { logger := log.With(zap.String("task", cfg.Name), zap.String("unit", "binlog replication")) syncer := &Syncer{ pessimist: shardddl.NewPessimist(&logger, etcdClient, cfg.Name, cfg.SourceID), @@ -1627,7 +1628,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err1 } continue - case err == streamer.ErrorMaybeDuplicateEvent: + case err == relay.ErrorMaybeDuplicateEvent: tctx.L().Warn("read binlog met a truncated file, need to open safe-mode until the next transaction") err = maybeSkipNRowsEvent(eventIndex) if err == nil { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 4d1dffc8af0..65adcd893bd 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/ticdc/dm/dm/pb" "github.com/pingcap/ticdc/dm/pkg/binlog" "github.com/pingcap/ticdc/dm/pkg/binlog/event" + "github.com/pingcap/ticdc/dm/pkg/binlog/reader" "github.com/pingcap/ticdc/dm/pkg/conn" tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/cputil" @@ -37,7 +38,6 @@ import ( parserpkg "github.com/pingcap/ticdc/dm/pkg/parser" "github.com/pingcap/ticdc/dm/pkg/retry" "github.com/pingcap/ticdc/dm/pkg/schema" - streamer2 "github.com/pingcap/ticdc/dm/pkg/streamer" "github.com/pingcap/ticdc/dm/syncer/dbconn" sqlmock "github.com/DATA-DOG/go-sqlmock" @@ -112,7 +112,7 @@ type MockStreamProducer struct { events []*replication.BinlogEvent } -func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (streamer2.Streamer, error) { +func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (reader.Streamer, error) { if location.Position.Pos == 4 { return &MockStreamer{mp.events, 0}, nil } diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 7af71ca292f..eebced09cf3 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -162,7 +162,7 @@ function test_operate_task_bound_to_a_source() { function run() { inject_points=( - "github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval=return(1)" + "github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval=return(1)" ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" diff --git a/dm/tests/duplicate_event/run.sh b/dm/tests/duplicate_event/run.sh index f70b80c5805..cc8773bdf08 100644 --- a/dm/tests/duplicate_event/run.sh +++ b/dm/tests/duplicate_event/run.sh @@ -41,7 +41,7 @@ function run() { # with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID # here we fail at the third write rows event, sync should retry and auto recover without any duplicate event - export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/RelayGetEventFailed=15*return(3);github.com/pingcap/ticdc/dm/relay/retry/RelayAllowRetry=return" + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/RelayGetEventFailed=15*return(3);github.com/pingcap/ticdc/dm/relay/RelayAllowRetry=return" run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/dm/tests/only_dml/run.sh b/dm/tests/only_dml/run.sh index 52c666b7ad0..b0e3412dab5 100755 --- a/dm/tests/only_dml/run.sh +++ b/dm/tests/only_dml/run.sh @@ -42,7 +42,7 @@ function insert_data() { } function run() { - export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/pkg/streamer/SetHeartbeatInterval=return(1);github.com/pingcap/ticdc/dm/syncer/syncDMLBatchNotFull=return(true)" + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/SetHeartbeatInterval=return(1);github.com/pingcap/ticdc/dm/syncer/syncDMLBatchNotFull=return(true)" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 1 row affected' diff --git a/dm/tests/relay_interrupt/run.sh b/dm/tests/relay_interrupt/run.sh index 6d9abd39c8b..bda66dc71f5 100644 --- a/dm/tests/relay_interrupt/run.sh +++ b/dm/tests/relay_interrupt/run.sh @@ -103,7 +103,7 @@ function run() { echo "read binlog from relay log failed, and will use remote binlog" kill_dm_worker - export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/pkg/streamer/GetEventFromLocalFailed=return()" + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/GetEventFromLocalFailed=return()" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT prepare_data2 $i diff --git a/dm/tests/sharding2/run.sh b/dm/tests/sharding2/run.sh index cf8c6530add..da4ab00a351 100755 --- a/dm/tests/sharding2/run.sh +++ b/dm/tests/sharding2/run.sh @@ -75,7 +75,7 @@ function run() { check_port_offline $WORKER2_PORT 20 # mock recover relay writer - export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/relay/writer/MockRecoverRelayWriter=return(true)' + export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/relay/MockRecoverRelayWriter=return(true)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT