Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay/syncer: relay notifies syncer of new write to reduce sync latency (#2225) #2269

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/dm-portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"net/http"
"os"

"github.com/rakyll/statik/fs"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/portal"
_ "github.com/pingcap/dm/dm/portal/statik"
"github.com/pingcap/dm/pkg/log"
"github.com/rakyll/statik/fs"
"go.uber.org/zap"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dm-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
log.L().Info("", zap.Stringer("dm-syncer conf", conf))
})

sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer.
sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL for singleton syncer.
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

Expand Down
3 changes: 2 additions & 1 deletion cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
globalLog "github.com/pingcap/log"
"go.uber.org/zap"

lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"

"github.com/pingcap/dm/dm/ctl/common"
"github.com/pingcap/dm/dm/worker"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
lightningLog "github.com/pingcap/tidb/br/pkg/lightning/log"
)

func main() {
Expand Down
3 changes: 2 additions & 1 deletion dm/pbmock/dmmaster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion dm/pbmock/dmworker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -307,6 +311,14 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.relay.RegisterListener(el)
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.relay.UnRegisterListener(el)
}

/******************** dummy relay holder ********************/

type dummyRelayHolder struct {
Expand Down Expand Up @@ -424,3 +436,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
defer d.Unlock()
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
}
6 changes: 6 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

func (d *DummyRelay) UnRegisterListener(el relay.Listener) {
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
Expand Down
3 changes: 2 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)
Expand Down Expand Up @@ -120,7 +121,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
3 changes: 3 additions & 0 deletions dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
Expand Down Expand Up @@ -385,6 +387,7 @@ func (w *SourceWorker) DisableRelay() {
if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down
5 changes: 3 additions & 2 deletions dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/ha"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -241,7 +242,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -416,7 +417,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
30 changes: 26 additions & 4 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/syncer"
Expand All @@ -44,12 +45,22 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
}

// Notified implements streamer.EventNotifier.
func (r relayNotifier) Notified() chan interface{} {
return r.ch
}

// createRealUnits is subtask units initializer
// it can be used for testing.
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand All @@ -64,7 +75,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
} else {
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, dumpling.NewDumpling(cfg))
Expand All @@ -74,7 +85,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
default:
log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode))
}
Expand Down Expand Up @@ -108,6 +119,8 @@ type SubTask struct {
etcdClient *clientv3.Client

workerName string

notifier streamer.EventNotifier
}

// NewSubTask is subtask initializer
Expand All @@ -130,14 +143,15 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
workerName: workerName,
notifier: &relayNotifier{ch: make(chan interface{}, 1)},
}
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName)
return &st
}

// initUnits initializes the sub task processing units.
func (st *SubTask) initUnits() error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName)
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier)
if len(st.units) < 1 {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}
Expand Down Expand Up @@ -724,3 +738,11 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
// skip if there's pending notify
select {
case st.notifier.Notified() <- struct{}{}:
default:
}
}
13 changes: 13 additions & 0 deletions dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package worker
import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -88,3 +90,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
h.mu.RLock()
defer h.mu.RUnlock()
for _, s := range h.subTasks {
s.relayNotify()
}
}
19 changes: 10 additions & 9 deletions dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/dm/dumpling"
"github.com/pingcap/dm/loader"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/syncer"

Expand All @@ -47,24 +48,24 @@ func (t *testSubTask) TestCreateUnits(c *C) {
Mode: "xxx",
}
worker := "worker"
c.Assert(createUnits(cfg, nil, worker), HasLen, 0)
c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0)

cfg.Mode = config.ModeFull
unitsFull := createUnits(cfg, nil, worker)
unitsFull := createUnits(cfg, nil, worker, nil)
c.Assert(unitsFull, HasLen, 2)
_, ok := unitsFull[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
_, ok = unitsFull[1].(*loader.Loader)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeIncrement
unitsIncr := createUnits(cfg, nil, worker)
unitsIncr := createUnits(cfg, nil, worker, nil)
c.Assert(unitsIncr, HasLen, 1)
_, ok = unitsIncr[0].(*syncer.Syncer)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeAll
unitsAll := createUnits(cfg, nil, worker)
unitsAll := createUnits(cfg, nil, worker, nil)
c.Assert(unitsAll, HasLen, 3)
_, ok = unitsAll[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
Expand Down Expand Up @@ -176,7 +177,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -185,7 +186,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -296,7 +297,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -420,7 +421,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -445,7 +446,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
Loading