Skip to content

Commit

Permalink
dm-relay: optimize cpu usage when enabling relay (#3469)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Nov 22, 2021
1 parent 563bcf9 commit 0076d09
Show file tree
Hide file tree
Showing 20 changed files with 999 additions and 1,040 deletions.
4 changes: 4 additions & 0 deletions dm/dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type DummyRelay struct {
reloadErr error
}

func (d *DummyRelay) IsActive(uuid, filename string) (bool, int64) {
return false, 0
}

func (d *DummyRelay) NewReader(logger log.Logger, cfg *relay.BinlogReaderConfig) *relay.BinlogReader {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
posUUIDSuffixSeparator = "|"
// MinUUIDSuffix is same as relay.MinUUIDSuffix.
MinUUIDSuffix = 1
// FileHeaderLen is the length of binlog file header.
FileHeaderLen = 4
)

// MinPosition is the min binlog position.
Expand Down
9 changes: 9 additions & 0 deletions dm/pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,12 @@ func proxyFields() []zap.Field {
}
return fields
}

func NewStoppedTimer() *time.Timer {
// stopped timer should be Reset with correct duration, so use 0 here
t := time.NewTimer(0)
if !t.Stop() {
<-t.C
}
return t
}
21 changes: 17 additions & 4 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/pingcap/errors"
Expand All @@ -33,6 +34,8 @@ type BinlogWriter struct {

offset atomic.Int64
file *os.File
relayDir string
uuid string
filename string

logger log.Logger
Expand All @@ -55,14 +58,16 @@ func (s *BinlogWriterStatus) String() string {
}

// NewBinlogWriter creates a BinlogWriter instance.
func NewBinlogWriter(logger log.Logger) *BinlogWriter {
func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
return &BinlogWriter{
logger: logger,
logger: logger,
relayDir: relayDir,
}
}

func (w *BinlogWriter) Open(filename string) error {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
func (w *BinlogWriter) Open(uuid, filename string) error {
fullName := filepath.Join(w.relayDir, uuid, filename)
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o644)
if err != nil {
return terror.ErrBinlogWriterOpenFile.Delegate(err)
}
Expand All @@ -80,6 +85,7 @@ func (w *BinlogWriter) Open(filename string) error {

w.offset.Store(fs.Size())
w.file = f
w.uuid = uuid
w.filename = filename

return nil
Expand All @@ -100,6 +106,7 @@ func (w *BinlogWriter) Close() error {

w.file = nil
w.offset.Store(0)
w.uuid = ""
w.filename = ""

return err
Expand Down Expand Up @@ -132,3 +139,9 @@ func (w *BinlogWriter) Status() *BinlogWriterStatus {
func (w *BinlogWriter) Offset() int64 {
return w.offset.Load()
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
w.mu.RLock()
defer w.mu.RUnlock()
return uuid == w.uuid && filename == w.filename, w.offset.Load()
}
21 changes: 13 additions & 8 deletions dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ type testBinlogWriterSuite struct{}

func (t *testBinlogWriterSuite) TestWrite(c *C) {
dir := c.MkDir()
filename := filepath.Join(dir, "test-mysql-bin.000001")
uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001"
binlogDir := filepath.Join(dir, uuid)
c.Assert(os.Mkdir(binlogDir, 0o755), IsNil)

filename := "test-mysql-bin.000001"
var (
allData bytes.Buffer
data1 = []byte("test-data")
)

{
w := NewBinlogWriter(log.L())
w := NewBinlogWriter(log.L(), dir)
c.Assert(w, NotNil)
c.Assert(w.Open(filename), IsNil)
c.Assert(w.Open(uuid, filename), IsNil)
fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(allData.Len()))
Expand All @@ -50,19 +54,19 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {

{
// not opened
w := NewBinlogWriter(log.L())
w := NewBinlogWriter(log.L(), dir)
err := w.Write(data1)
c.Assert(err, ErrorMatches, "*not opened")

// open non exist dir
err = w.Open(filepath.Join(dir, "not-exist", "bin.000001"))
err = w.Open("not-exist-uuid", "bin.000001")
c.Assert(err, ErrorMatches, "*no such file or directory")
}

{
// normal call flow
w := NewBinlogWriter(log.L())
err := w.Open(filename)
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
c.Assert(err, IsNil)
c.Assert(w.file, NotNil)
c.Assert(w.filename, Equals, filename)
Expand Down Expand Up @@ -93,7 +97,8 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {
c.Assert(w.Close(), IsNil) // noop

// try to read the data back
dataInFile, err := os.ReadFile(filename)
fullName := filepath.Join(binlogDir, filename)
dataInFile, err := os.ReadFile(fullName)
c.Assert(err, IsNil)
c.Assert(dataInFile, DeepEquals, allData.Bytes())
}
Expand Down
28 changes: 0 additions & 28 deletions dm/relay/error.go

This file was deleted.

38 changes: 0 additions & 38 deletions dm/relay/error_test.go

This file was deleted.

129 changes: 0 additions & 129 deletions dm/relay/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@
package relay

import (
"context"
"os"
"path"
"path/filepath"
"sort"
"time"

"github.com/BurntSushi/toml"
"go.uber.org/zap"

"github.com/pingcap/ticdc/dm/pkg/binlog"
Expand All @@ -42,12 +38,6 @@ const (
FileCmpBigger
)

// SwitchPath represents next binlog file path which should be switched.
type SwitchPath struct {
nextUUID string
nextBinlogName string
}

// EventNotifier notifies whether there is new binlog event written to the file.
type EventNotifier interface {
// Notified returns a channel used to check whether there is new binlog event written to the file
Expand Down Expand Up @@ -206,122 +196,3 @@ func fileSizeUpdated(path string, latestSize int64) (int, error) {
return -1, nil
}
}

type relayLogFileChecker struct {
notifier EventNotifier
relayDir, currentUUID string
latestRelayLogDir, latestFilePath, latestFile string
beginOffset, endOffset int64
}

// relayLogUpdatedOrNewCreated checks whether current relay log file is updated or new relay log is created.
func (r *relayLogFileChecker) relayLogUpdatedOrNewCreated(ctx context.Context, updatePathCh chan string, switchCh chan SwitchPath, errCh chan error) {
// binlog file may have rotated if we read nothing last time(either it's the first read or after notified)
lastReadCnt := r.endOffset - r.beginOffset
if lastReadCnt == 0 {
meta := &LocalMeta{}
_, err := toml.DecodeFile(filepath.Join(r.latestRelayLogDir, utils.MetaFilename), meta)
if err != nil {
errCh <- terror.Annotate(err, "decode relay meta toml file failed")
return
}
// current watched file size have no change means that no new writes have been made
// our relay meta file will be updated immediately after receive the rotate event,
// although we cannot ensure that the binlog filename in the meta is the next file after latestFile
// but if we return a different filename with latestFile, the outer logic (parseDirAsPossible)
// will find the right one
if meta.BinLogName != r.latestFile {
// we need check file size again, as the file may have been changed during our metafile check
cmp, err2 := fileSizeUpdated(r.latestFilePath, r.endOffset)
if err2 != nil {
errCh <- terror.Annotatef(err2, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset)
return
}
switch {
case cmp < 0:
errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath)
case cmp > 0:
updatePathCh <- r.latestFilePath
default:
nextFilePath := filepath.Join(r.latestRelayLogDir, meta.BinLogName)
log.L().Info("newer relay log file is already generated",
zap.String("now file path", r.latestFilePath),
zap.String("new file path", nextFilePath))
updatePathCh <- nextFilePath
}
return
}

// maybe UUID index file changed
switchPath, err := r.getSwitchPath()
if err != nil {
errCh <- err
return
}
if switchPath != nil {
// we need check file size again, as the file may have been changed during path check
cmp, err := fileSizeUpdated(r.latestFilePath, r.endOffset)
if err != nil {
errCh <- terror.Annotatef(err, "latestFilePath=%s endOffset=%d", r.latestFilePath, r.endOffset)
return
}
switch {
case cmp < 0:
errCh <- terror.ErrRelayLogFileSizeSmaller.Generate(r.latestFilePath)
case cmp > 0:
updatePathCh <- r.latestFilePath
default:
log.L().Info("newer relay uuid path is already generated",
zap.String("current path", r.latestRelayLogDir),
zap.Any("new path", switchPath))
switchCh <- *switchPath
}
return
}
}

timer := time.NewTimer(watcherInterval)
defer timer.Stop()
select {
case <-ctx.Done():
errCh <- terror.Annotate(ctx.Err(), "context meet error")
case <-r.notifier.Notified():
// the notified event may not be the current relay file
// in that case we may read 0 bytes and check again
updatePathCh <- r.latestFilePath
case <-timer.C:
// for a task start after source shutdown or there's no new write, it'll not be notified,
// and if it's reading from dir 000001 and there's need to switch dir to 000002,
// after the task read files in dir 000001, the read size > 0, so it goes to the select directly,
// since there is no notify, it blocks, that'll leave dir 000002 un-synced.
// so we stop waiting after watcherInterval to give it a chance to check again
updatePathCh <- r.latestFilePath
}
}

func (r *relayLogFileChecker) getSwitchPath() (*SwitchPath, error) {
// reload uuid
uuids, err := utils.ParseUUIDIndex(path.Join(r.relayDir, utils.UUIDIndexFilename))
if err != nil {
return nil, err
}
nextUUID, _, err := getNextUUID(r.currentUUID, uuids)
if err != nil {
return nil, err
}
if len(nextUUID) == 0 {
return nil, nil
}

// try to get the first binlog file in next subdirectory
nextBinlogName, err := getFirstBinlogName(r.relayDir, nextUUID)
if err != nil {
// because creating subdirectory and writing relay log file are not atomic
if terror.ErrBinlogFilesNotFound.Equal(err) {
return nil, nil
}
return nil, err
}

return &SwitchPath{nextUUID, nextBinlogName}, nil
}
Loading

0 comments on commit 0076d09

Please sign in to comment.