Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-relay/streamer: move binlog log reader into relay #3243

Merged
merged 11 commits into from
Nov 5, 2021
17 changes: 8 additions & 9 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/relay/purger"
)

// RelayHolder for relay unit.
type RelayHolder interface {
// Init initializes the holder
Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error)
Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error)
// Start starts run the relay
Start()
// Close closes the holder
Expand Down Expand Up @@ -94,11 +93,11 @@ func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder {
}

// Init initializes the holder.
func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
func (h *realRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) {
h.closed.Store(false)

// initial relay purger
operators := []purger.RelayOperator{
operators := []relay.Operator{
h,
streamer.GetReaderHub(),
}
Expand All @@ -107,7 +106,7 @@ func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeI
return nil, terror.Annotate(err, "initial relay unit")
}

return purger.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil
return relay.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil
}

// Start starts run the relay.
Expand Down Expand Up @@ -306,7 +305,7 @@ func (h *realRelayHolder) Update(ctx context.Context, sourceCfg *config.SourceCo
return nil
}

// EarliestActiveRelayLog implements RelayOperator.EarliestActiveRelayLog.
// EarliestActiveRelayLog implements Operator.EarliestActiveRelayLog.
func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}
Expand Down Expand Up @@ -355,13 +354,13 @@ func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder {
}

// Init implements interface of RelayHolder.
func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) {
// initial relay purger
operators := []purger.RelayOperator{
operators := []relay.Operator{
d,
}

return purger.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError
return relay.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError
}

// Start implements interface of RelayHolder.
Expand Down
7 changes: 3 additions & 4 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/relay/purger"
)

type testRelay struct{}
Expand Down Expand Up @@ -139,11 +138,11 @@ func (d *DummyRelay) PurgeRelayDir() error {
func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = NewDummyRelay
originNewPurger := purger.NewPurger
purger.NewPurger = purger.NewDummyPurger
originNewPurger := relay.NewPurger
relay.NewPurger = relay.NewDummyPurger
defer func() {
relay.NewRelay = originNewRelay
purger.NewPurger = originNewPurger
relay.NewPurger = originNewPurger
}()

cfg := loadSourceConfigWithoutPassword(c)
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/ha"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
)

// do not forget to update this path if the file removed/renamed.
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay/purger"
"github.com/pingcap/ticdc/dm/relay"
)

// SourceWorker manages a source(upstream) which is mainly related to subtasks and relay.
Expand Down Expand Up @@ -76,7 +76,7 @@ type SourceWorker struct {
relayCancel context.CancelFunc
relayWg sync.WaitGroup
relayHolder RelayHolder
relayPurger purger.Purger
relayPurger relay.Purger

taskStatusChecker TaskStatusChecker

Expand Down Expand Up @@ -321,7 +321,7 @@ func (w *SourceWorker) EnableRelay() (err error) {

// 2. initial relay holder, the cfg's password need decrypt
w.relayHolder = NewRelayHolder(w.cfg)
relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{
relayPurger, err := w.relayHolder.Init(w.relayCtx, []relay.PurgeInterceptor{
w,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/pingcap/ticdc/dm/pkg/conn"
"github.com/pingcap/ticdc/dm/pkg/ha"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
)

var emptyWorkerStatusInfoJSONLength = 25
Expand Down Expand Up @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/shardddl/pessimism"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/syncer"
)

Expand All @@ -60,7 +60,7 @@ func (r relayNotifier) Notified() chan interface{} {
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand Down Expand Up @@ -120,7 +120,7 @@ type SubTask struct {

workerName string

notifier streamer.EventNotifier
notifier relay.EventNotifier
}

// NewSubTask is subtask initializer
Expand Down
12 changes: 6 additions & 6 deletions dm/dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/ticdc/dm/dumpling"
"github.com/pingcap/ticdc/dm/loader"
"github.com/pingcap/ticdc/dm/pkg/binlog"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/syncer"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -186,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -446,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
32 changes: 0 additions & 32 deletions dm/pkg/binlog/writer/writer.go

This file was deleted.

55 changes: 0 additions & 55 deletions dm/pkg/streamer/util.go

This file was deleted.

Loading