Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay/syncer: relay notifies syncer of new write to reduce sync latency #2225

Merged
merged 24 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
838d439
notify syncer on new binlog event
D3Hunter Oct 9, 2021
7a14bef
reader notify streamer reader
D3Hunter Oct 12, 2021
e4f2c7a
move switch path check into binlog file change check
D3Hunter Oct 13, 2021
b067a07
add timer to relayLogUpdatedOrNewCreated to handle special case
D3Hunter Oct 14, 2021
41568ea
ignore duplicate FORMAT event
D3Hunter Oct 14, 2021
29c9fd4
add/fix unit test
D3Hunter Oct 14, 2021
9df7eed
make lint happy
D3Hunter Oct 14, 2021
4b2aa47
add comments to public if
D3Hunter Oct 15, 2021
b865044
fix comments
D3Hunter Oct 15, 2021
3ab6475
fix lint
D3Hunter Oct 18, 2021
ea8fd6e
abstruct notifier chan in syncer into a struct and pass in from subtask
D3Hunter Oct 18, 2021
64d388f
rename fields to make code more verbose
D3Hunter Oct 18, 2021
ac2f391
add comment
D3Hunter Oct 18, 2021
af5f7e2
add comments
D3Hunter Oct 18, 2021
b340a30
Merge branch 'master' into relay-notify
D3Hunter Oct 19, 2021
5afad9b
Merge branch 'master' into relay-notify
D3Hunter Oct 20, 2021
100788c
remove unnecessiary lock operation
D3Hunter Oct 25, 2021
7434c89
Merge remote-tracking branch 'upstream/master' into relay-notify
D3Hunter Oct 25, 2021
026d9fb
fix unstable integration test case ha_cases3
D3Hunter Oct 26, 2021
2213223
Merge remote-tracking branch 'upstream/master' into relay-notify
D3Hunter Oct 26, 2021
11511a3
fix comments
D3Hunter Oct 26, 2021
9e93be1
undo redirect stderr in run_sql, some case depends on the output
D3Hunter Oct 26, 2021
ebd1d2b
add more unit test
D3Hunter Oct 27, 2021
99b2ed7
fix import groups
D3Hunter Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dm-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {
log.L().Info("", zap.Stringer("dm-syncer conf", conf))
})

sync := syncer.NewSyncer(conf, nil) // do not support shard DDL for singleton syncer.
sync := syncer.NewSyncer(conf, nil, nil) // do not support shard DDL for singleton syncer.
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

Expand Down
18 changes: 18 additions & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -307,6 +311,14 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.relay.RegisterListener(el)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.relay.UnRegisterListener(el)
}

/******************** dummy relay holder ********************/

type dummyRelayHolder struct {
Expand Down Expand Up @@ -424,3 +436,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
defer d.Unlock()
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
}
6 changes: 6 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

func (d *DummyRelay) UnRegisterListener(el relay.Listener) {
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
Expand Down
4 changes: 3 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/pingcap/dm/pkg/streamer"
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved

"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -120,7 +122,7 @@ func (t *testServer) TestServer(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
3 changes: 3 additions & 0 deletions dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
Expand Down Expand Up @@ -385,6 +387,7 @@ func (w *SourceWorker) DisableRelay() {
if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down
6 changes: 4 additions & 2 deletions dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/dm/pkg/streamer"
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -241,7 +243,7 @@ var _ = Suite(&testWorkerFunctionalities{})
func (t *testWorkerFunctionalities) SetUpSuite(c *C) {
NewRelayHolder = NewDummyRelayHolder
NewSubTask = NewRealSubTask
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
atomic.AddInt32(&t.createUnitCount, 1)
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down Expand Up @@ -416,7 +418,7 @@ func (t *testWorkerEtcdCompact) SetUpSuite(c *C) {
cfg.UseRelay = false
return NewRealSubTask(cfg, etcdClient, worker)
}
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
mockSync := NewMockUnit(pb.UnitType_Sync)
Expand Down
31 changes: 27 additions & 4 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"github.com/pingcap/dm/pkg/streamer"
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -44,12 +46,22 @@ const (
waitRelayCatchupTimeout = 30 * time.Second
)

type relayNotifier struct {
// ch with size = 1, we only need to be notified whether binlog file of relay changed, not how many times
ch chan interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ch chan interface{}
ch chan struct{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no gains

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has some cost to convert struct{}{} to interface{}

}

// Notified implements streamer.EventNotifier.
func (r relayNotifier) Notified() chan interface{} {
return r.ch
}
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved

// createRealUnits is subtask units initializer
// it can be used for testing.
var createUnits = createRealUnits

// createRealUnits creates process units base on task mode.
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string) []unit.Unit {
func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, workerName string, notifier streamer.EventNotifier) []unit.Unit {
failpoint.Inject("mockCreateUnitsDumpOnly", func(_ failpoint.Value) {
log.L().Info("create mock worker units with dump unit only", zap.String("failpoint", "mockCreateUnitsDumpOnly"))
failpoint.Return([]unit.Unit{dumpling.NewDumpling(cfg)})
Expand All @@ -64,7 +76,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
} else {
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
case config.ModeFull:
// NOTE: maybe need another checker in the future?
us = append(us, dumpling.NewDumpling(cfg))
Expand All @@ -74,7 +86,7 @@ func createRealUnits(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, wor
us = append(us, loader.NewLightning(cfg, etcdClient, workerName))
}
case config.ModeIncrement:
us = append(us, syncer.NewSyncer(cfg, etcdClient))
us = append(us, syncer.NewSyncer(cfg, etcdClient, notifier))
default:
log.L().Error("unsupported task mode", zap.String("subtask", cfg.Name), zap.String("task mode", cfg.Mode))
}
Expand Down Expand Up @@ -108,6 +120,8 @@ type SubTask struct {
etcdClient *clientv3.Client

workerName string

notifier streamer.EventNotifier
}

// NewSubTask is subtask initializer
Expand All @@ -130,14 +144,15 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
workerName: workerName,
notifier: &relayNotifier{ch: make(chan interface{}, 1)},
}
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName)
return &st
}

// initUnits initializes the sub task processing units.
func (st *SubTask) initUnits() error {
st.units = createUnits(st.cfg, st.etcdClient, st.workerName)
st.units = createUnits(st.cfg, st.etcdClient, st.workerName, st.notifier)
if len(st.units) < 1 {
return terror.ErrWorkerNoAvailUnits.Generate(st.cfg.Name, st.cfg.Mode)
}
Expand Down Expand Up @@ -724,3 +739,11 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
// skip if there's pending notify
select {
case st.notifier.Notified() <- struct{}{}:
default:
}
}
13 changes: 13 additions & 0 deletions dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package worker
import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -88,3 +90,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems e *replication.BinlogEvent is not used here, why we need this param ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a event listener
the parameter is not used now, but it's added for completeness(observer pattern), may used later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
func (h *subTaskHolder) OnEvent(_ *replication.BinlogEvent) {

h.mu.RLock()
defer h.mu.RUnlock()
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
for _, s := range h.subTasks {
s.relayNotify()
}
}
20 changes: 11 additions & 9 deletions dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"strings"
"time"

"github.com/pingcap/dm/pkg/streamer"
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
Expand Down Expand Up @@ -47,24 +49,24 @@ func (t *testSubTask) TestCreateUnits(c *C) {
Mode: "xxx",
}
worker := "worker"
c.Assert(createUnits(cfg, nil, worker), HasLen, 0)
c.Assert(createUnits(cfg, nil, worker, nil), HasLen, 0)

cfg.Mode = config.ModeFull
unitsFull := createUnits(cfg, nil, worker)
unitsFull := createUnits(cfg, nil, worker, nil)
c.Assert(unitsFull, HasLen, 2)
_, ok := unitsFull[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
_, ok = unitsFull[1].(*loader.Loader)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeIncrement
unitsIncr := createUnits(cfg, nil, worker)
unitsIncr := createUnits(cfg, nil, worker, nil)
c.Assert(unitsIncr, HasLen, 1)
_, ok = unitsIncr[0].(*syncer.Syncer)
c.Assert(ok, IsTrue)

cfg.Mode = config.ModeAll
unitsAll := createUnits(cfg, nil, worker)
unitsAll := createUnits(cfg, nil, worker, nil)
c.Assert(unitsAll, HasLen, 3)
_, ok = unitsAll[0].(*dumpling.Dumpling)
c.Assert(ok, IsTrue)
Expand Down Expand Up @@ -176,7 +178,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return nil
}
st.Run(pb.Stage_Running)
Expand All @@ -185,7 +187,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -296,7 +298,7 @@ func (t *testSubTask) TestPauseAndResumeSubtask(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down Expand Up @@ -420,7 +422,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {
defer func() {
createUnits = createRealUnits
}()
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand All @@ -445,7 +447,7 @@ func (t *testSubTask) TestSubtaskWithStage(c *C) {

st = NewSubTaskWithStage(cfg, pb.Stage_Finished, nil, "worker")
c.Assert(st.Stage(), DeepEquals, pb.Stage_Finished)
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string) []unit.Unit {
createUnits = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, worker string, notifier streamer.EventNotifier) []unit.Unit {
return []unit.Unit{mockDumper, mockLoader}
}

Expand Down
Loading