Skip to content

Commit

Permalink
dm-relay: create binlog reader from relay, refactor relay (#3327)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Nov 15, 2021
1 parent b15022c commit c1a8fa2
Show file tree
Hide file tree
Showing 34 changed files with 874 additions and 1,346 deletions.
28 changes: 11 additions & 17 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
// Relay returns relay object
Relay() relay.Process
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -310,12 +308,8 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.relay.RegisterListener(el)
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.relay.UnRegisterListener(el)
func (h *realRelayHolder) Relay() relay.Process {
return h.relay
}

/******************** dummy relay holder ********************/
Expand All @@ -326,14 +320,16 @@ type dummyRelayHolder struct {
stage pb.Stage
relayBinlog string

cfg *config.SourceConfig
cfg *config.SourceConfig
relay2 relay.Process
}

// NewDummyRelayHolder creates a new RelayHolder.
func NewDummyRelayHolder(cfg *config.SourceConfig) RelayHolder {
return &dummyRelayHolder{
cfg: cfg,
stage: pb.Stage_New,
cfg: cfg,
stage: pb.Stage_New,
relay2: &relay.Relay{},
}
}

Expand Down Expand Up @@ -436,8 +432,6 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
func (d *dummyRelayHolder) Relay() relay.Process {
return d.relay2
}
5 changes: 5 additions & 0 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/dm/dm/unit"
"github.com/pingcap/ticdc/dm/pkg/binlog"
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/log"
pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
Expand All @@ -46,6 +47,10 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader {
return nil
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

Expand Down
6 changes: 4 additions & 2 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down Expand Up @@ -551,7 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) {

func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) {
etcd.Close()
time.Sleep(retryConnectSleepTime + time.Duration(defaultKeepAliveTTL+3)*time.Second)
c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool {
return s.getWorker(true) == nil
}), IsTrue)
c.Assert(s.getWorker(true), IsNil)
}

Expand Down
23 changes: 14 additions & 9 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,9 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
w.subTaskHolder.resetAllSubTasks(w.getRelayWithoutLock())
return nil
}

Expand All @@ -386,12 +384,11 @@ func (w *SourceWorker) DisableRelay() {
w.l.Info("finish refreshing task checker")
}

w.subTaskHolder.resetAllSubTasks(false)
w.subTaskHolder.resetAllSubTasks(nil)

if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down Expand Up @@ -523,7 +520,15 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.St
}

w.l.Info("subtask created", zap.Stringer("config", cfg2))
st.Run(expectStage)
st.Run(expectStage, w.getRelayWithoutLock())
return nil
}

// caller should make sure w.Lock is locked before calling this method.
func (w *SourceWorker) getRelayWithoutLock() relay.Process {
if w.relayHolder != nil {
return w.relayHolder.Relay()
}
return nil
}

Expand Down Expand Up @@ -570,10 +575,10 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
err = st.Pause()
case pb.TaskOp_Resume:
w.l.Info("resume sub task", zap.String("task", name))
err = st.Resume()
err = st.Resume(w.getRelayWithoutLock())
case pb.TaskOp_AutoResume:
w.l.Info("auto_resume sub task", zap.String("task", name))
err = st.Resume()
err = st.Resume(w.getRelayWithoutLock())
default:
err = terror.ErrWorkerUpdateTaskStage.Generatef("invalid operate %s on subtask %v", op, name)
}
Expand Down Expand Up @@ -1043,5 +1048,5 @@ func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErro
return terror.ErrWorkerSubTaskNotFound.Generate(req.Task)
}

return st.HandleError(ctx, req)
return st.HandleError(ctx, req, w.getRelayWithoutLock())
}
4 changes: 2 additions & 2 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
43 changes: 11 additions & 32 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,12 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
}

// Notified implements streamer.EventNotifier.
func (r relayNotifier) Notified() chan interface{} {
return r.ch
}

// createRealUnits is subtask units initializer
// it can be used for testing.
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, relay relay.Process) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand All @@ -75,7 +65,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
} else {
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
us = append(us, syncer.NewSyncer(cfg, etcdClient, relay))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, dumpling.NewDumpling(cfg))
Expand All @@ -85,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
us = append(us, syncer.NewSyncer(cfg, etcdClient, relay))
default:
log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode))
}
Expand Down Expand Up @@ -119,8 +109,6 @@ type SubTask struct {
etcdClient *clientv3.Client

workerName string

notifier relay.EventNotifier
}

// NewSubTask is subtask initializer
Expand All @@ -143,15 +131,14 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
workerName: workerName,
notifier: &relayNotifier{ch: make(chan interface{}, 1)},
}
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName)
return &st
}

// initUnits initializes the sub task processing units.
func (st *SubTask) initUnits() error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier)
func (st *SubTask) initUnits(relay relay.Process) error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, relay)
if len(st.units) < 1 {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}
Expand Down Expand Up @@ -211,15 +198,15 @@ func (st *SubTask) initUnits() error {

// Run runs the sub task.
// TODO: check concurrent problems.
func (st *SubTask) Run(expectStage pb.Stage) {
func (st *SubTask) Run(expectStage pb.Stage, relay relay.Process) {
if st.Stage() == pb.Stage_Finished || st.Stage() == pb.Stage_Running {
st.l.Warn("prepare to run a subtask with invalid stage",
zap.Stringer("current stage", st.Stage()),
zap.Stringer("expected stage", expectStage))
return
}

if err := st.initUnits(); err != nil {
if err := st.initUnits(relay); err != nil {
st.l.Error("fail to initial subtask", log.ShortError(err))
st.fail(err)
return
Expand Down Expand Up @@ -511,9 +498,9 @@ func (st *SubTask) Pause() error {

// Resume resumes the paused sub task
// TODO: similar to Run, refactor later.
func (st *SubTask) Resume() error {
func (st *SubTask) Resume(relay relay.Process) error {
if !st.initialized.Load() {
st.Run(pb.Stage_Running)
st.Run(pb.Stage_Running, relay)
return nil
}

Expand Down Expand Up @@ -714,7 +701,7 @@ func (st *SubTask) fail(err error) {
}

// HandleError handle error for syncer unit.
func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) error {
func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest, relay relay.Process) error {
syncUnit, ok := st.currUnit.(*syncer.Syncer)
if !ok {
return terror.ErrWorkerOperSyncUnitOnly.Generate(st.currUnit.Type())
Expand All @@ -726,7 +713,7 @@ func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReq
}

if st.Stage() == pb.Stage_Paused {
err = st.Resume()
err = st.Resume(relay)
}
return err
}
Expand All @@ -738,11 +725,3 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
// skip if there's pending notify
select {
case st.notifier.Notified() <- struct{}{}:
default:
}
}
19 changes: 4 additions & 15 deletions dm/dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/ticdc/dm/relay"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -50,16 +50,16 @@ func (h *subTaskHolder) removeSubTask(name string) {
}

// resetAllSubTasks does Close, change cfg.UseRelay then Init the subtasks.
func (h *subTaskHolder) resetAllSubTasks(useRelay bool) {
func (h *subTaskHolder) resetAllSubTasks(relay relay.Process) {
h.mu.Lock()
defer h.mu.Unlock()
for _, st := range h.subTasks {
stage := st.Stage()
st.Close()
// TODO: make a st.reset
st.ctx, st.cancel = context.WithCancel(context.Background())
st.cfg.UseRelay = useRelay
st.Run(stage)
st.cfg.UseRelay = relay != nil
st.Run(stage, relay)
}
}

Expand Down Expand Up @@ -90,14 +90,3 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, s := range h.subTasks {
s.relayNotify()
}
}
Loading

0 comments on commit c1a8fa2

Please sign in to comment.