Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay/streamer: move binlog log reader into relay #2270

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
)

// do not forget to update this path if the file removed/renamed.
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
)

var emptyWorkerStatusInfoJSONLength = 25
Expand Down Expand Up @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/syncer"
)

Expand All @@ -60,7 +60,7 @@ func (r relayNotifier) Notified() chan interface{} {
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand Down Expand Up @@ -120,7 +120,7 @@ type SubTask struct {

workerName string

notifier streamer.EventNotifier
notifier relay.EventNotifier
}

// NewSubTask is subtask initializer
Expand Down
12 changes: 6 additions & 6 deletions dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/syncer"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -186,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -446,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
55 changes: 0 additions & 55 deletions pkg/streamer/util.go

This file was deleted.

124 changes: 0 additions & 124 deletions pkg/streamer/util_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/relay/retry"
)

// Config is the configuration for Relay.
Expand All @@ -38,7 +37,7 @@ type Config struct {
UUIDSuffix int `toml:"-" json:"-"`

// for binlog reader retry
ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"`
ReaderRetry ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"`
}

func (c *Config) String() string {
Expand All @@ -63,7 +62,7 @@ func FromSourceCfg(sourceCfg *config.SourceConfig) *Config {
BinLogName: clone.RelayBinLogName,
BinlogGTID: clone.RelayBinlogGTID,
UUIDSuffix: clone.UUIDSuffix,
ReaderRetry: retry.ReaderRetryConfig{ // we use config from TaskChecker now
ReaderRetry: ReaderRetryConfig{ // we use config from TaskChecker now
BackoffRollback: clone.Checker.BackoffRollback.Duration,
BackoffMax: clone.Checker.BackoffMax.Duration,
BackoffMin: clone.Checker.BackoffMin.Duration,
Expand Down
2 changes: 1 addition & 1 deletion relay/reader/error.go → relay/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package reader
package relay

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion relay/reader/error_test.go → relay/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package reader
package relay

import (
"context"
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamer/file.go → relay/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package streamer
package relay

import (
"context"
Expand Down Expand Up @@ -219,7 +219,7 @@ func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, u
// binlog file may have rotated if we read nothing last time(either it's the first read or after notified)
lastReadCnt := r.endOffset - r.beginOffset
if lastReadCnt == 0 {
meta := &Meta{}
meta := &LocalMeta{}
_, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta)
if err != nil {
errCh <- terror.Annotate(err, "decode relay meta toml file failed")
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamer/file_test.go → relay/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package streamer
package relay

import (
"bytes"
Expand Down Expand Up @@ -358,10 +358,10 @@ func (t *testFileSuite) TestrelayLogUpdatedOrNewCreated(c *C) {
}

rotateRelayFile := func(filename string) {
meta := Meta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID}
meta := LocalMeta{BinLogName: filename, BinLogPos: binlogPos, BinlogGTID: binlogGTID}
metaFile, err2 := os.Create(path.Join(subDir, utils.MetaFilename))
c.Assert(err2, IsNil)
err = toml.NewEncoder(metaFile).Encode(meta)
err = toml.NewEncoder(metaFile).Encode(&meta)
c.Assert(err, IsNil)
_ = metaFile.Close()
}
Expand Down
Loading