Skip to content

Commit

Permalink
dm-relay/streamer: move binlog log reader into relay (#3243)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Nov 5, 2021
1 parent df6eefc commit 9a6bd36
Show file tree
Hide file tree
Showing 52 changed files with 771 additions and 1,057 deletions.
17 changes: 8 additions & 9 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/relay/purger"
)

// RelayHolder for relay unit.
type RelayHolder interface {
// Init initializes the holder
Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error)
Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error)
// Start starts run the relay
Start()
// Close closes the holder
Expand Down Expand Up @@ -94,11 +93,11 @@ func NewRealRelayHolder(sourceCfg *config.SourceConfig) RelayHolder {
}

// Init initializes the holder.
func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
func (h *realRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) {
h.closed.Store(false)

// initial relay purger
operators := []purger.RelayOperator{
operators := []relay.Operator{
h,
streamer.GetReaderHub(),
}
Expand All @@ -107,7 +106,7 @@ func (h *realRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeI
return nil, terror.Annotate(err, "initial relay unit")
}

return purger.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil
return relay.NewPurger(h.cfg.Purge, h.cfg.RelayDir, operators, interceptors), nil
}

// Start starts run the relay.
Expand Down Expand Up @@ -306,7 +305,7 @@ func (h *realRelayHolder) Update(ctx context.Context, sourceCfg *config.SourceCo
return nil
}

// EarliestActiveRelayLog implements RelayOperator.EarliestActiveRelayLog.
// EarliestActiveRelayLog implements Operator.EarliestActiveRelayLog.
func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}
Expand Down Expand Up @@ -355,13 +354,13 @@ func NewDummyRelayHolderWithInitError(cfg *config.SourceConfig) RelayHolder {
}

// Init implements interface of RelayHolder.
func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []purger.PurgeInterceptor) (purger.Purger, error) {
func (d *dummyRelayHolder) Init(ctx context.Context, interceptors []relay.PurgeInterceptor) (relay.Purger, error) {
// initial relay purger
operators := []purger.RelayOperator{
operators := []relay.Operator{
d,
}

return purger.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError
return relay.NewDummyPurger(d.cfg.Purge, d.cfg.RelayDir, operators, interceptors), d.initError
}

// Start implements interface of RelayHolder.
Expand Down
7 changes: 3 additions & 4 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/relay/purger"
)

type testRelay struct{}
Expand Down Expand Up @@ -139,11 +138,11 @@ func (d *DummyRelay) PurgeRelayDir() error {
func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = NewDummyRelay
originNewPurger := purger.NewPurger
purger.NewPurger = purger.NewDummyPurger
originNewPurger := relay.NewPurger
relay.NewPurger = relay.NewDummyPurger
defer func() {
relay.NewRelay = originNewRelay
purger.NewPurger = originNewPurger
relay.NewPurger = originNewPurger
}()

cfg := loadSourceConfigWithoutPassword(c)
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/ha"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
)

// do not forget to update this path if the file removed/renamed.
Expand Down Expand Up @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay/purger"
"github.com/pingcap/ticdc/dm/relay"
)

// SourceWorker manages a source(upstream) which is mainly related to subtasks and relay.
Expand Down Expand Up @@ -76,7 +76,7 @@ type SourceWorker struct {
relayCancel context.CancelFunc
relayWg sync.WaitGroup
relayHolder RelayHolder
relayPurger purger.Purger
relayPurger relay.Purger

taskStatusChecker TaskStatusChecker

Expand Down Expand Up @@ -321,7 +321,7 @@ func (w *SourceWorker) EnableRelay() (err error) {

// 2. initial relay holder, the cfg's password need decrypt
w.relayHolder = NewRelayHolder(w.cfg)
relayPurger, err := w.relayHolder.Init(w.relayCtx, []purger.PurgeInterceptor{
relayPurger, err := w.relayHolder.Init(w.relayCtx, []relay.PurgeInterceptor{
w,
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/pingcap/ticdc/dm/pkg/conn"
"github.com/pingcap/ticdc/dm/pkg/ha"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
)

var emptyWorkerStatusInfoJSONLength = 25
Expand Down Expand Up @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/shardddl/pessimism"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/syncer"
)

Expand All @@ -60,7 +60,7 @@ func (r relayNotifier) Notified() chan interface{} {
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand Down Expand Up @@ -120,7 +120,7 @@ type SubTask struct {

workerName string

notifier streamer.EventNotifier
notifier relay.EventNotifier
}

// NewSubTask is subtask initializer
Expand Down
12 changes: 6 additions & 6 deletions dm/dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/ticdc/dm/dumpling"
"github.com/pingcap/ticdc/dm/loader"
"github.com/pingcap/ticdc/dm/pkg/binlog"
"github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
"github.com/pingcap/ticdc/dm/syncer"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -177,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -186,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -421,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -446,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
32 changes: 0 additions & 32 deletions dm/pkg/binlog/writer/writer.go

This file was deleted.

55 changes: 0 additions & 55 deletions dm/pkg/streamer/util.go

This file was deleted.

Loading

0 comments on commit 9a6bd36

Please sign in to comment.