Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay/syncer: relay notifies syncer of new write to reduce sync latency #2225

Merged
merged 24 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
838d439
notify syncer on new binlog event
D3Hunter Oct 9, 2021
7a14bef
reader notify streamer reader
D3Hunter Oct 12, 2021
e4f2c7a
move switch path check into binlog file change check
D3Hunter Oct 13, 2021
b067a07
add timer to relayLogUpdatedOrNewCreated to handle special case
D3Hunter Oct 14, 2021
41568ea
ignore duplicate FORMAT event
D3Hunter Oct 14, 2021
29c9fd4
add/fix unit test
D3Hunter Oct 14, 2021
9df7eed
make lint happy
D3Hunter Oct 14, 2021
4b2aa47
add comments to public if
D3Hunter Oct 15, 2021
b865044
fix comments
D3Hunter Oct 15, 2021
3ab6475
fix lint
D3Hunter Oct 18, 2021
ea8fd6e
abstruct notifier chan in syncer into a struct and pass in from subtask
D3Hunter Oct 18, 2021
64d388f
rename fields to make code more verbose
D3Hunter Oct 18, 2021
ac2f391
add comment
D3Hunter Oct 18, 2021
af5f7e2
add comments
D3Hunter Oct 18, 2021
b340a30
Merge branch 'master' into relay-notify
D3Hunter Oct 19, 2021
5afad9b
Merge branch 'master' into relay-notify
D3Hunter Oct 20, 2021
100788c
remove unnecessiary lock operation
D3Hunter Oct 25, 2021
7434c89
Merge remote-tracking branch 'upstream/master' into relay-notify
D3Hunter Oct 25, 2021
026d9fb
fix unstable integration test case ha_cases3
D3Hunter Oct 26, 2021
2213223
Merge remote-tracking branch 'upstream/master' into relay-notify
D3Hunter Oct 26, 2021
11511a3
fix comments
D3Hunter Oct 26, 2021
9e93be1
undo redirect stderr in run_sql, some case depends on the output
D3Hunter Oct 26, 2021
ebd1d2b
add more unit test
D3Hunter Oct 27, 2021
99b2ed7
fix import groups
D3Hunter Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type RelayHolder interface {
Result() *pb.ProcessResult
// Update updates relay config online
Update(ctx context.Context, cfg *config.SourceConfig) error
// RegisterListener registers a relay listener
RegisterListener(el relay.Listener)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// UnRegisterListener unregisters a relay listener
UnRegisterListener(el relay.Listener)
}

// NewRelayHolder is relay holder initializer
Expand Down Expand Up @@ -315,6 +319,18 @@ func (h *realRelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo {
return h.relay.ActiveRelayLog()
}

func (h *realRelayHolder) RegisterListener(el relay.Listener) {
h.Lock()
defer h.Unlock()
h.relay.RegisterListener(el)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

func (h *realRelayHolder) UnRegisterListener(el relay.Listener) {
h.Lock()
defer h.Unlock()
h.relay.UnRegisterListener(el)
}

/******************** dummy relay holder ********************/

type dummyRelayHolder struct {
Expand Down Expand Up @@ -432,3 +448,9 @@ func (d *dummyRelayHolder) Stage() pb.Stage {
defer d.Unlock()
return d.stage
}

func (d *dummyRelayHolder) RegisterListener(el relay.Listener) {
}

func (d *dummyRelayHolder) UnRegisterListener(el relay.Listener) {
}
6 changes: 6 additions & 0 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) RegisterListener(el relay.Listener) {
}

func (d *DummyRelay) UnRegisterListener(el relay.Listener) {
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
Expand Down
3 changes: 3 additions & 0 deletions dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ func (w *SourceWorker) EnableRelay() (err error) {
w.observeRelayStage(w.relayCtx, w.etcdClient, revRelay)
}()

w.relayHolder.RegisterListener(w.subTaskHolder)

w.relayEnabled.Store(true)
w.l.Info("relay enabled")
w.subTaskHolder.resetAllSubTasks(true)
Expand Down Expand Up @@ -385,6 +387,7 @@ func (w *SourceWorker) DisableRelay() {
if w.relayHolder != nil {
r := w.relayHolder
w.relayHolder = nil
r.UnRegisterListener(w.subTaskHolder)
r.Close()
}
if w.relayPurger != nil {
Expand Down
23 changes: 23 additions & 0 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type SubTask struct {
units []unit.Unit // units do job one by one
currUnit unit.Unit
prevUnit unit.Unit
syncer *syncer.Syncer
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
resultWg sync.WaitGroup

stage pb.Stage // stage of current sub task
Expand Down Expand Up @@ -190,11 +191,23 @@ func (st *SubTask) initUnits() error {

needCloseUnits = st.units[:skipIdx]
st.units = st.units[skipIdx:]
st.postInitSyncer()

st.setCurrUnit(st.units[0])
return nil
}

func (st *SubTask) postInitSyncer() {
// TODO, right now initUnits create units first and then remove unnecessary units(before first non fresh unit)
// maybe can be refactored into check first, then create, so we don't need to loop all units to get syncer here
for _, u := range st.units {
if s, ok := u.(*syncer.Syncer); ok {
st.syncer = s
break
}
}
}

// Run runs the sub task.
// TODO: check concurrent problems.
func (st *SubTask) Run(expectStage pb.Stage) {
Expand Down Expand Up @@ -724,3 +737,13 @@ func updateTaskMetric(task, sourceID string, stage pb.Stage, workerName string)
taskState.WithLabelValues(task, sourceID, workerName).Set(float64(stage))
}
}

func (st *SubTask) relayNotify() {
if st.syncer != nil {
// skip if there's pending notify
select {
case st.syncer.Notified() <- struct{}{}:
default:
}
}
}
13 changes: 13 additions & 0 deletions dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package worker
import (
"context"
"sync"

"github.com/go-mysql-org/go-mysql/replication"
)

// subTaskHolder holds subtask instances.
Expand Down Expand Up @@ -88,3 +90,14 @@ func (h *subTaskHolder) getAllSubTasks() map[string]*SubTask {
}
return result
}

// OnEvent implements relay.Listener
// only syncer unit of subtask need to be notified, but it's much simpler and less error-prone to manage it here
// as relay event need to broadcast to every syncer(most subtask have a syncer).
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems e *replication.BinlogEvent is not used here, why we need this param ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a event listener
the parameter is not used now, but it's added for completeness(observer pattern), may used later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (h *subTaskHolder) OnEvent(e *replication.BinlogEvent) {
func (h *subTaskHolder) OnEvent(_ *replication.BinlogEvent) {

h.mu.RLock()
defer h.mu.RUnlock()
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
for _, s := range h.subTasks {
s.relayNotify()
}
}
19 changes: 10 additions & 9 deletions loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,7 @@ import (
"path/filepath"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Expand All @@ -37,6 +28,16 @@ import (
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"

"github.com/docker/go-units"
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
Expand Down
Loading