Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-relay: optimize cpu usage when enabling relay #3469

Merged
merged 30 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e2e95e2
open binlog file once, read first event once
D3Hunter Nov 1, 2021
c11f3be
move file change logic into binlog reader, reduce number of out param…
D3Hunter Nov 4, 2021
e1eb3dc
check active binlog file directly
D3Hunter Nov 5, 2021
fd4c200
use optimized go-mysql
D3Hunter Nov 8, 2021
bd838a3
change to timer, reuse timer in local streamer
D3Hunter Nov 8, 2021
61c52fa
parse uuid as needed
D3Hunter Nov 8, 2021
7d27807
we're reading from channel, no need to make a timeout context
D3Hunter Nov 9, 2021
117bddf
log debug events as needed in relay
D3Hunter Nov 9, 2021
ff7ed77
update go-mysql(using official repo)
D3Hunter Nov 11, 2021
17c02f0
add some check
D3Hunter Nov 12, 2021
3a0b1df
fix unit test
D3Hunter Nov 12, 2021
47ae8ec
fix make check
D3Hunter Nov 12, 2021
829ab18
fix unit test
D3Hunter Nov 15, 2021
d2e61f7
fix unit test
D3Hunter Nov 15, 2021
ca404d7
add unit test
D3Hunter Nov 15, 2021
6da6a52
read FMT DESC event seperately only if we start parsing from the middle
D3Hunter Nov 16, 2021
7abbc7a
fix unit test
D3Hunter Nov 16, 2021
b7221d6
remove replace in go.mod
D3Hunter Nov 16, 2021
7bf4c84
Merge branch 'master' into opt-relay-cpu
D3Hunter Nov 16, 2021
3a0a522
fix comments
D3Hunter Nov 18, 2021
3ce7506
use defined constant for binlog file header len
D3Hunter Nov 18, 2021
3ff1c61
change check logic
D3Hunter Nov 19, 2021
52247f6
check error with type
D3Hunter Nov 22, 2021
ec516aa
Merge remote-tracking branch 'upstream/master' into opt-relay-cpu
D3Hunter Nov 22, 2021
0b64bcd
fix import
D3Hunter Nov 22, 2021
83c7264
remove useless code
D3Hunter Nov 22, 2021
67ca3d9
Update dm/relay/local_reader.go
D3Hunter Nov 22, 2021
60c79c7
Merge branch 'master' into opt-relay-cpu
D3Hunter Nov 22, 2021
84c5ae9
Merge branch 'master' into opt-relay-cpu
D3Hunter Nov 22, 2021
209ec02
Merge branch 'master' into opt-relay-cpu
ti-chi-bot Nov 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) IsActive(uuid, filename string) (bool, int64) {
return false, 0
}

func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
posUUIDSuffixSeparator = "|"
// MinUUIDSuffix is same as relay.MinUUIDSuffix.
MinUUIDSuffix = 1
// FileHeaderLen is the length of binlog file header.
FileHeaderLen = 4
)

// MinPosition is the min binlog position.
Expand Down
9 changes: 9 additions & 0 deletions dm/pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,12 @@ func proxyFields() []zap.Field {
}
return fields
}

func NewStoppedTimer() *time.Timer {
// stopped timer should be Reset with correct duration, so use 0 here
t := time.NewTimer(0)
if !t.Stop() {
<-t.C
}
return t
}
21 changes: 17 additions & 4 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/pingcap/errors"
Expand All @@ -33,6 +34,8 @@ type BinlogWriter struct {

offset atomic.Int64
file *os.File
relayDir string
uuid string
filename string

logger log.Logger
Expand All @@ -55,14 +58,16 @@ func (s *BinlogWriterStatus) String() string {
}

// NewBinlogWriter creates a BinlogWriter instance.
func NewBinlogWriter(logger log.Logger) *BinlogWriter {
func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
return &BinlogWriter{
logger: logger,
logger: logger,
relayDir: relayDir,
}
}

func (w *BinlogWriter) Open(filename string) error {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
func (w *BinlogWriter) Open(uuid, filename string) error {
fullName := filepath.Join(w.relayDir, uuid, filename)
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return terror.ErrBinlogWriterOpenFile.Delegate(err)
}
Expand All @@ -80,6 +85,7 @@ func (w *BinlogWriter) Open(filename string) error {

w.offset.Store(fs.Size())
w.file = f
w.uuid = uuid
w.filename = filename

return nil
Expand All @@ -100,6 +106,7 @@ func (w *BinlogWriter) Close() error {

w.file = nil
w.offset.Store(0)
w.uuid = ""
w.filename = ""

return err
Expand Down Expand Up @@ -132,3 +139,9 @@ func (w *BinlogWriter) Status() *BinlogWriterStatus {
func (w *BinlogWriter) Offset() int64 {
return w.offset.Load()
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
w.mu.RLock()
defer w.mu.RUnlock()
return uuid == w.uuid && filename == w.filename, w.offset.Load()
}
21 changes: 13 additions & 8 deletions dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ type testBinlogWriterSuite struct{}

func (t *testBinlogWriterSuite) TestWrite(c *C) {
dir := c.MkDir()
filename := filepath.Join(dir, "test-mysql-bin.000001")
uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001"
binlogDir := filepath.Join(dir, uuid)
c.Assert(os.Mkdir(binlogDir, 0o755), IsNil)

filename := "test-mysql-bin.000001"
var (
allData bytes.Buffer
data1 = []byte("test-data")
)

{
w := NewBinlogWriter(log.L())
w := NewBinlogWriter(log.L(), dir)
c.Assert(w, NotNil)
c.Assert(w.Open(filename), IsNil)
c.Assert(w.Open(uuid, filename), IsNil)
fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(allData.Len()))
Expand All @@ -50,19 +54,19 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {

{
// not opened
w := NewBinlogWriter(log.L())
w := NewBinlogWriter(log.L(), dir)
err := w.Write(data1)
c.Assert(err, ErrorMatches, "*not opened")

// open non exist dir
err = w.Open(filepath.Join(dir, "not-exist", "bin.000001"))
err = w.Open("not-exist-uuid", "bin.000001")
c.Assert(err, ErrorMatches, "*no such file or directory")
}

{
// normal call flow
w := NewBinlogWriter(log.L())
err := w.Open(filename)
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
c.Assert(err, IsNil)
c.Assert(w.file, NotNil)
c.Assert(w.filename, Equals, filename)
Expand Down Expand Up @@ -93,7 +97,8 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {
c.Assert(w.Close(), IsNil) // noop

// try to read the data back
dataInFile, err := os.ReadFile(filename)
fullName := filepath.Join(binlogDir, filename)
dataInFile, err := os.ReadFile(fullName)
c.Assert(err, IsNil)
c.Assert(dataInFile, DeepEquals, allData.Bytes())
}
Expand Down
129 changes: 0 additions & 129 deletions dm/relay/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@
package relay

import (
"context"
"os"
"path"
"path/filepath"
"sort"
"time"

"github.com/BurntSushi/toml"
"go.uber.org/zap"

"github.com/pingcap/ticdc/dm/pkg/binlog"
Expand All @@ -42,12 +38,6 @@ const (
FileCmpBigger
)

// SwitchPath represents next binlog file path which should be switched.
type SwitchPath struct {
nextUUID string
nextBinlogName string
}

// EventNotifier notifies whether there is new binlog event written to the file.
type EventNotifier interface {
// Notified returns a channel used to check whether there is new binlog event written to the file
Expand Down Expand Up @@ -206,122 +196,3 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) {
return -1, nil
}
}

type relayLogFileChecker struct {
notifier EventNotifier
relayDir, currentUUID string
latestRelayLogDir, latestFilePath, latestFile string
beginOffset, endOffset int64
}

// relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created.
func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, updatePathCh chan string, switchCh chan SwitchPath, errCh chan error) {
// binlog file may have rotated if we read nothing last time(either it's the first read or after notified)
lastReadCnt := r.endOffset - r.beginOffset
if lastReadCnt == 0 {
meta := &LocalMeta{}
_, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta)
if err != nil {
errCh <- terror.Annotate(err, "decode relay meta toml file failed")
return
}
// current watched file size have no change means that no new writes have been made
// our relay meta file will be updated immediately after receive the rotate event,
// although we cannot ensure that the binlog filename in the meta is the next file after latestFile
// but if we return a different filename with latestFile, the outer logic (parseDirAsPossible)
// will find the right one
if meta.BinLogName != r.latestFile {
// we need check file size again, as the file may have been changed during our metafile check
cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset)
if err2 != nil {
errCh <- terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset)
return
}
switch {
case cmp < 0:
errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath)
case cmp > 0:
updatePathCh <- r.latestFilePath
default:
nextFilePath := filepath.Join(r.latestRelayLogDir, meta.BinLogName)
log.L().Info("newer relay log file is already generated",
zap.String("now file path", r.latestFilePath),
zap.String("new file path", nextFilePath))
updatePathCh <- nextFilePath
}
return
}

// maybe UUID index file changed
switchPath, err := r.getSwitchPath()
if err != nil {
errCh <- err
return
}
if switchPath != nil {
// we need check file size again, as the file may have been changed during path check
cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset)
if err != nil {
errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset)
return
}
switch {
case cmp < 0:
errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath)
case cmp > 0:
updatePathCh <- r.latestFilePath
default:
log.L().Info("newer relay uuid path is already generated",
zap.String("current path", r.latestRelayLogDir),
zap.Any("new path", switchPath))
switchCh <- *switchPath
}
return
}
}

timer := time.NewTimer(watcherInterval)
defer timer.Stop()
select {
case <-ctx.Done():
errCh <- terror.Annotate(ctx.Err(), "context meet error")
case <-r.notifier.Notified():
// the notified event may not be the current relay file
// in that case we may read 0 bytes and check again
updatePathCh <- r.latestFilePath
case <-timer.C:
// for a task start after source shutdown or there's no new write, it'll not be notified,
// and if it's reading from dir 000001 and there's need to switch dir to 000002,
// after the task read files in dir 000001, the read size > 0, so it goes to the select directly,
// since there is no notify, it blocks, that'll leave dir 000002 un-synced.
// so we stop waiting after watcherInterval to give it a chance to check again
updatePathCh <- r.latestFilePath
}
}

func (r *relayLogFileChecker) getSwitchPath() (*SwitchPath, error) {
// reload uuid
uuids, err := utils.ParseUUIDIndex(path.Join(r.relayDir, utils.UUIDIndexFilename))
if err != nil {
return nil, err
}
nextUUID, _, err := getNextUUID(r.currentUUID, uuids)
if err != nil {
return nil, err
}
if len(nextUUID) == 0 {
return nil, nil
}

// try to get the first binlog file in next subdirectory
nextBinlogName, err := getFirstBinlogName(r.relayDir, nextUUID)
if err != nil {
// because creating subdirectory and writing relay log file are not atomic
if terror.ErrBinlogFilesNotFound.Equal(err) {
return nil, nil
}
return nil, err
}

return &SwitchPath{nextUUID, nextBinlogName}, nil
}
Loading