Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-relay: create binlog reader from relay, refactor relay #3327

Merged
merged 37 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
907fc27
binlog writer should not be used concurrently
D3Hunter Nov 1, 2021
2d8008b
reuse same binlog writer to make using state easier
D3Hunter Nov 1, 2021
b8db477
make writer a field of relay and reuse
D3Hunter Nov 1, 2021
6d545ea
add offset
D3Hunter Nov 2, 2021
84f41fc
move recover into relay
D3Hunter Nov 2, 2021
efc238a
remove unnecessiary code
D3Hunter Nov 2, 2021
d37ca02
binlog reader created using relay; register listeners implicitly
D3Hunter Nov 2, 2021
6326f0b
remove the transformer entity, make it a method in relay
D3Hunter Nov 2, 2021
d4df262
fix unit test, make code compile
D3Hunter Nov 3, 2021
9e9aa58
make test suite can runs in single test
D3Hunter Nov 4, 2021
33e9acd
make test suite can runs in single test
D3Hunter Nov 4, 2021
adb841c
fix make check
D3Hunter Nov 5, 2021
7583627
fix unit test
D3Hunter Nov 4, 2021
dc6dd70
fix unit test
D3Hunter Nov 3, 2021
08ee9c8
fix unit test
D3Hunter Nov 4, 2021
1015dff
fix unstable case(may fail when ut runs over 5min)
D3Hunter Nov 4, 2021
9f7ce1d
fix make check
D3Hunter Nov 5, 2021
e8dede1
Merge remote-tracking branch 'upstream/master' into refactor-relay
D3Hunter Nov 8, 2021
fab993e
Merge remote-tracking branch 'upstream/master' into refactor-relay
D3Hunter Nov 8, 2021
9505845
make test suite can runs in single test
D3Hunter Nov 11, 2021
70662cb
make test suite can runs in single test
D3Hunter Nov 11, 2021
77c7582
Merge remote-tracking branch 'upstream/master' into refactor-relay
D3Hunter Nov 11, 2021
6c8277f
Revert "make test suite can runs in single test"
D3Hunter Nov 11, 2021
e4aa5b7
Revert "make test suite can runs in single test"
D3Hunter Nov 11, 2021
638a3d4
Revert "make test suite can runs in single test"
D3Hunter Nov 11, 2021
63e5c02
Revert "make test suite can runs in single test"
D3Hunter Nov 11, 2021
d4b6c0a
only one TestXXX func is needed, remove others
D3Hunter Nov 11, 2021
9e0b0d9
Update dm/relay/relay.go
D3Hunter Nov 15, 2021
6208de6
remove useless comment
D3Hunter Nov 15, 2021
3262bec
Merge branch 'refactor-relay' of github.com:D3Hunter/ticdc into refac…
D3Hunter Nov 15, 2021
89a1bbb
Merge branch 'master' into refactor-relay
D3Hunter Nov 15, 2021
c617888
change test case wait time
D3Hunter Nov 15, 2021
572dd55
Merge branch 'master' into refactor-relay
ti-chi-bot Nov 15, 2021
6658f8d
Merge branch 'master' into refactor-relay
ti-chi-bot Nov 15, 2021
d442b2d
Merge branch 'master' into refactor-relay
ti-chi-bot Nov 15, 2021
31a5970
Merge branch 'master' into refactor-relay
ti-chi-bot Nov 15, 2021
5e54edc
Merge branch 'master' into refactor-relay
ti-chi-bot Nov 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
// Relay returns relay object
Relay() relay.Process
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -310,12 +308,8 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.relay.RegisterListener(el)
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.relay.UnRegisterListener(el)
func (h *realRelayHolder) Relay() relay.Process {
return h.relay
}

/******************** dummy relay holder ********************/
Expand All @@ -326,14 +320,16 @@ type dummyRelayHolder struct {
stage pb.Stage
relayBinlog string

cfg *config.SourceConfig
cfg *config.SourceConfig
relay2 relay.Process
}

// NewDummyRelayHolder creates a new RelayHolder.
func NewDummyRelayHolder(cfg *config.SourceConfig) RelayHolder {
return &dummyRelayHolder{
cfg: cfg,
stage: pb.Stage_New,
cfg: cfg,
stage: pb.Stage_New,
relay2: &relay.Relay{},
}
}

Expand Down Expand Up @@ -436,8 +432,6 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
func (d *dummyRelayHolder) Relay() relay.Process {
return d.relay2
}
5 changes: 5 additions & 0 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/dm/dm/unit"
"github.com/pingcap/ticdc/dm/pkg/binlog"
"github.com/pingcap/ticdc/dm/pkg/gtid"
"github.com/pingcap/ticdc/dm/pkg/log"
pkgstreamer "github.com/pingcap/ticdc/dm/pkg/streamer"
"github.com/pingcap/ticdc/dm/pkg/utils"
"github.com/pingcap/ticdc/dm/relay"
Expand All @@ -46,6 +47,10 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader {
return nil
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

Expand Down
6 changes: 4 additions & 2 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down Expand Up @@ -551,7 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) {

func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) {
etcd.Close()
time.Sleep(retryConnectSleepTime + time.Duration(defaultKeepAliveTTL+3)*time.Second)
c.Assert(utils.WaitSomething(60, time.Second, func() bool {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
return s.getWorker(true) == nil
}), IsTrue)
c.Assert(s.getWorker(true), IsNil)
}

Expand Down
23 changes: 14 additions & 9 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,9 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
w.subTaskHolder.resetAllSubTasks(w.getRelayWithoutLock())
return nil
}

Expand All @@ -382,12 +380,11 @@ func (w *SourceWorker) DisableRelay() {
w.l.Info("finish refreshing task checker")
}

w.subTaskHolder.resetAllSubTasks(false)
w.subTaskHolder.resetAllSubTasks(nil)

if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down Expand Up @@ -519,7 +516,15 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage pb.St
}

w.l.Info("subtask created", zap.Stringer("config", cfg2))
st.Run(expectStage)
st.Run(expectStage, w.getRelayWithoutLock())
return nil
}

// caller should make sure w.Lock is locked before calling this method.
func (w *SourceWorker) getRelayWithoutLock() relay.Process {
if w.relayHolder != nil {
return w.relayHolder.Relay()
}
return nil
}

Expand Down Expand Up @@ -566,10 +571,10 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
err = st.Pause()
case pb.TaskOp_Resume:
w.l.Info("resume sub task", zap.String("task", name))
err = st.Resume()
err = st.Resume(w.getRelayWithoutLock())
case pb.TaskOp_AutoResume:
w.l.Info("auto_resume sub task", zap.String("task", name))
err = st.Resume()
err = st.Resume(w.getRelayWithoutLock())
default:
err = terror.ErrWorkerUpdateTaskStage.Generatef("invalid operate %s on subtask %v", op, name)
}
Expand Down Expand Up @@ -1039,5 +1044,5 @@ func (w *SourceWorker) HandleError(ctx context.Context, req *pb.HandleWorkerErro
return terror.ErrWorkerSubTaskNotFound.Generate(req.Task)
}

return st.HandleError(ctx, req)
return st.HandleError(ctx, req, w.getRelayWithoutLock())
}
4 changes: 2 additions & 2 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -417,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier relay.EventNotifier) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, relay relay.Process) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
43 changes: 11 additions & 32 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,12 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
}

// Notified implements streamer.EventNotifier.
func (r relayNotifier) Notified() chan interface{} {
return r.ch
}

// createRealUnits is subtask units initializer
// it can be used for testing.
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier relay.EventNotifier) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, relay relay.Process) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand All @@ -75,7 +65,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
} else {
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
us = append(us, syncer.NewSyncer(cfg, etcdClient, relay))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, dumpling.NewDumpling(cfg))
Expand All @@ -85,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
us = append(us, syncer.NewSyncer(cfg, etcdClient, relay))
default:
log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode))
}
Expand Down Expand Up @@ -119,8 +109,6 @@ type SubTask struct {
etcdClient *clientv3.Client

workerName string

notifier relay.EventNotifier
}

// NewSubTask is subtask initializer
Expand All @@ -143,15 +131,14 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
workerName: workerName,
notifier: &relayNotifier{ch: make(chan interface{}, 1)},
}
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName)
return &st
}

// initUnits initializes the sub task processing units.
func (st *SubTask) initUnits() error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier)
func (st *SubTask) initUnits(relay relay.Process) error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, relay)
if len(st.units) < 1 {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}
Expand Down Expand Up @@ -211,15 +198,15 @@ func (st *SubTask) initUnits() error {

// Run runs the sub task.
// TODO: check concurrent problems.
func (st *SubTask) Run(expectStage pb.Stage) {
func (st *SubTask) Run(expectStage pb.Stage, relay relay.Process) {
if st.Stage() == pb.Stage_Finished || st.Stage() == pb.Stage_Running {
st.l.Warn("prepare to run a subtask with invalid stage",
zap.Stringer("current stage", st.Stage()),
zap.Stringer("expected stage", expectStage))
return
}

if err := st.initUnits(); err != nil {
if err := st.initUnits(relay); err != nil {
st.l.Error("fail to initial subtask", log.ShortError(err))
st.fail(err)
return
Expand Down Expand Up @@ -511,9 +498,9 @@ func (st *SubTask) Pause() error {

// Resume resumes the paused sub task
// TODO: similar to Run, refactor later.
func (st *SubTask) Resume() error {
func (st *SubTask) Resume(relay relay.Process) error {
if !st.initialized.Load() {
st.Run(pb.Stage_Running)
st.Run(pb.Stage_Running, relay)
return nil
}

Expand Down Expand Up @@ -714,7 +701,7 @@ func (st *SubTask) fail(err error) {
}

// HandleError handle error for syncer unit.
func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) error {
func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest, relay relay.Process) error {
syncUnit, ok := st.currUnit.(*syncer.Syncer)
if !ok {
return terror.ErrWorkerOperSyncUnitOnly.Generate(st.currUnit.Type())
Expand All @@ -726,7 +713,7 @@ func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReq
}

if st.Stage() == pb.Stage_Paused {
err = st.Resume()
err = st.Resume(relay)
}
return err
}
Expand All @@ -738,11 +725,3 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
// skip if there's pending notify
select {
case st.notifier.Notified() <- struct{}{}:
default:
}
}
19 changes: 4 additions & 15 deletions dm/dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/ticdc/dm/relay"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -50,16 +50,16 @@ func (h *subTaskHolder) removeSubTask(name string) {
}

// resetAllSubTasks does Close, change cfg.UseRelay then Init the subtasks.
func (h *subTaskHolder) resetAllSubTasks(useRelay bool) {
func (h *subTaskHolder) resetAllSubTasks(relay relay.Process) {
h.mu.Lock()
defer h.mu.Unlock()
for _, st := range h.subTasks {
stage := st.Stage()
st.Close()
// TODO: make a st.reset
st.ctx, st.cancel = context.WithCancel(context.Background())
st.cfg.UseRelay = useRelay
st.Run(stage)
st.cfg.UseRelay = relay != nil
st.Run(stage, relay)
}
}

Expand Down Expand Up @@ -90,14 +90,3 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, s := range h.subTasks {
s.relayNotify()
}
}
Loading