Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): Add Support for Multiple MySQL-Compatible Downstream Addresses in TiCDC for High Availability #11527

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
0f06a11
sync to office
wlwilliamx Aug 7, 2024
7c8890f
feat(retry): add `WithPreExecutionWhenRetry` option to execute action…
wlwilliamx Aug 12, 2024
d856ea8
sync
wlwilliamx Aug 12, 2024
120561f
feat(DDLSink): support multi mysql downstream addresses for DDL Sink
wlwilliamx Aug 13, 2024
6b1ca91
feat(mysqlBackend): support multi mysql downstream addresses for DML …
wlwilliamx Aug 13, 2024
ccb58ff
feat(mysqlSyncPointStore): support multi mysql downstream addresses f…
wlwilliamx Aug 13, 2024
735b089
feat(Observer): support multi mysql downstream addresses for Observer
wlwilliamx Aug 13, 2024
4e826ce
refactor(Observer): remove a duplicate import
wlwilliamx Aug 13, 2024
b4d9191
WIP
wlwilliamx Aug 14, 2024
e4bacc0
Merge remote-tracking branch 'upstream/master' into feature/multi-mys…
wlwilliamx Aug 20, 2024
ec856f9
test(MySQLDBConnector): add unit tests for `MySQLDBConnector`
wlwilliamx Aug 20, 2024
6a797aa
fix(TestGenerateDSNByConfig): fix missed `character_set_name` query
wlwilliamx Aug 21, 2024
93aee29
fix(TestNewMySQLBackend): fix the idempotency of close mysql connector
wlwilliamx Aug 21, 2024
d0c844a
test(mysql connector): add TestConfigureDBWhenSwitch_NilConfigureFunc…
wlwilliamx Aug 21, 2024
54addd5
feat(start_tidb_cluster): support launching multiple downstream TiDB …
wlwilliamx Aug 23, 2024
ea7a1a9
fix(start_tidb_cluster): fix the error in shell parsing of the port
wlwilliamx Aug 23, 2024
9cc3c49
fix(test_prepare): fix the error of parsing DOWN_TIDB_STATUS by modif…
wlwilliamx Aug 23, 2024
a772191
feat(start_tidb_cluster): add `--downstream_db` option to the `start_…
wlwilliamx Aug 23, 2024
199b14c
feat(start tidb): extract the code for starting the downstream tidb i…
wlwilliamx Aug 23, 2024
2bf78b4
chore(start tidb): modify `start_downstream_tidb_instances` to execut…
wlwilliamx Aug 23, 2024
829c6f3
fix(start tidb): fix the typo "workdir"
wlwilliamx Aug 23, 2024
e1967b0
fix(start tidb): add `source test_prepare` to `start_downstream_tidb_…
wlwilliamx Aug 23, 2024
36770a0
feat(stop_tidb_cluster): add the operation to stop downstream tidb 2 …
wlwilliamx Aug 23, 2024
b2c11e4
feat(start tidb): add a check for the value of the `--db` parameter
wlwilliamx Aug 23, 2024
207f395
feat(start tidb): support start tidb instances with the specific port
wlwilliamx Aug 26, 2024
8867452
feat(start tidb): capture the PIDs of TiDB instances started by start…
wlwilliamx Aug 26, 2024
fb712a5
test(multi addresses): add a integration test for the multi downstrea…
wlwilliamx Aug 27, 2024
e7808a6
fix(connector): fix a data race in `(*mysqlBackend).Close()`
wlwilliamx Aug 27, 2024
b8b6c49
fix(multi_down_addresses): fix the wrong SQL in integration test mult…
wlwilliamx Aug 27, 2024
3304d21
fix(multi_down_addresses): fix the wrong config path for check_sync_diff
wlwilliamx Aug 27, 2024
fd7d71c
fix(multi_down_addresses): fix the wrong SQL of duplicate entry for a…
wlwilliamx Aug 27, 2024
dd4f074
fix(start_downstream_tidb_instances): fix the idempotence of `CREATE …
wlwilliamx Aug 27, 2024
970dc3d
Merge remote-tracking branch 'upstream/master' into feature/multi-mys…
wlwilliamx Aug 27, 2024
50573e9
chore(connector): add the copyright information for the new files
wlwilliamx Aug 27, 2024
b7c112a
chore(integration test): add the `multi_down_addresses` integration t…
wlwilliamx Aug 28, 2024
72e6e31
chore(integration test): format the new files by `make check`
wlwilliamx Aug 28, 2024
4d8c0d4
docs(connector): add and modify some comments by following the rule o…
wlwilliamx Aug 28, 2024
44a3c9b
refactor(connector): rename `MySQLDBConnector` to `DBConnector` by th…
wlwilliamx Aug 28, 2024
da73ffa
refactor(connector): rename the function `SwitchToAvailableMySQLDB` t…
wlwilliamx Aug 28, 2024
78a80a4
test(multi_down_addresses): add `cdc cli changefeed update` integrati…
wlwilliamx Aug 28, 2024
aec17b3
fix(multi_down_addresses): modify the `changefeed-id` according to th…
wlwilliamx Aug 28, 2024
97db8dd
fix(multi_down_addresses): fix a naming error of tables
wlwilliamx Aug 28, 2024
3673ef8
test(multi_down_addresses): add DDL SQL into `multi_down_addresses` i…
wlwilliamx Aug 28, 2024
e1c6a43
fix(multi_down_addresses): fix a wrong SQL by duplicate entry for one…
wlwilliamx Aug 28, 2024
13edec0
fix(DDL Sink): fix `IsRetryableDDLError` function to allow DDL to be …
wlwilliamx Aug 29, 2024
bac745d
feat(connector): add log output
wlwilliamx Aug 29, 2024
fc445c0
refactor(connector): modify the log output of `DBConnctor`
wlwilliamx Aug 29, 2024
9aeeeca
refactor(multi_down_addresses): modify `multi_down_addresses` integra…
wlwilliamx Aug 29, 2024
757d8d0
refactor(connector): modify the log output of `DBConnctor`
wlwilliamx Aug 30, 2024
d5f4fa7
fix(DDL Sink): fix the unit test failure of `TestIsRetryableDDLError`
wlwilliamx Aug 30, 2024
3dbfef1
refactor(test_prepare): modify `TLS_TIDB_PORT` from 3307 to 3406
wlwilliamx Aug 30, 2024
d804633
refactor(DDL Sink): remove leftover TODO tag
wlwilliamx Aug 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool
return true
}

ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
ret := m.connector.CurrentDB.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table))
if ret.Err() != nil {
log.Error("check async exec ddl failed",
zap.String("namespace", m.id.Namespace),
Expand Down
34 changes: 15 additions & 19 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

import (
"context"
"database/sql"
"fmt"
"net/url"
"time"

lru "github.com/hashicorp/golang-lru"
perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -54,10 +54,9 @@
// DDLSink is a sink that writes DDL events to MySQL.
type DDLSink struct {
// id indicates which processor (changefeed) this sink belongs to.
id model.ChangeFeedID
// db is the database connection.
db *sql.DB
cfg *pmysql.Config
id model.ChangeFeedID
connector *pmysql.DBConnector
cfg *pmysql.Config
// statistics is the statistics of this sink.
// We use it to record the DDL count.
statistics *metrics.Statistics
Expand All @@ -81,22 +80,17 @@
return nil, err
}

dsnStr, err := pmysql.GenerateDSN(ctx, sinkURI, cfg, GetDBConnImpl.CreateTemporaryConnection)
connector, err := pmysql.NewDBConnectorWithFactory(ctx, cfg, sinkURI, GetDBConnImpl)
if err != nil {
return nil, err
}

db, err := GetDBConnImpl.CreateStandardConnection(ctx, dsnStr)
cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, connector.CurrentDB)
if err != nil {
return nil, err
}

cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, db)
if err != nil {
return nil, err
}

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, connector.CurrentDB)
if err != nil {
return nil, err
}
Expand All @@ -107,7 +101,7 @@
}
m := &DDLSink{
id: changefeedID,
db: db,
connector: connector,
cfg: cfg,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
lastExecutedNormalDDLCache: lruCache,
Expand Down Expand Up @@ -157,7 +151,9 @@
return err
}
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
}, retry.WithPreExecutionWhenRetry(func() error {
return m.connector.SwitchToAnAvailableDB(ctx)
}), retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),

Check warning on line 156 in cdc/sink/ddlsink/mysql/mysql_ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/ddlsink/mysql/mysql_ddl_sink.go#L154-L156

Added lines #L154 - L156 were not covered by tests
retry.WithBackoffMaxDelay(pmysql.BackoffMaxDelay.Milliseconds()),
retry.WithMaxTries(defaultDDLMaxRetry),
retry.WithIsRetryableErr(errorutil.IsRetryableDDLError))
Expand Down Expand Up @@ -210,7 +206,7 @@
start := time.Now()
log.Info("Start exec DDL", zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID),
zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))
tx, err := m.db.BeginTx(ctx, nil)
tx, err := m.connector.CurrentDB.BeginTx(ctx, nil)
if err != nil {
return err
}
Expand All @@ -229,7 +225,7 @@
// we try to set cdc write source for the ddl
if err = pmysql.SetWriteSource(pctx, m.cfg, tx); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
if errors.Cause(rbErr) != context.Canceled {
if perrors.Cause(rbErr) != context.Canceled {

Check warning on line 228 in cdc/sink/ddlsink/mysql/mysql_ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/ddlsink/mysql/mysql_ddl_sink.go#L228

Added line #L228 was not covered by tests
log.Error("Failed to rollback",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID), zap.Error(err))
Expand Down Expand Up @@ -282,8 +278,8 @@
if m.statistics != nil {
m.statistics.Close()
}
if m.db != nil {
if err := m.db.Close(); err != nil {
if m.connector.CurrentDB != nil {
if err := m.connector.CurrentDB.Close(); err != nil {
log.Warn("MySQL ddl sink close db wit error",
zap.String("namespace", m.id.Namespace),
zap.String("changefeed", m.id.ID),
Expand Down
65 changes: 32 additions & 33 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
type mysqlBackend struct {
workerID int
changefeed string
db *sql.DB
connector pmysql.DBConnector
cfg *pmysql.Config
dmlMaxRetry uint64

Expand Down Expand Up @@ -97,47 +97,44 @@
return nil, err
}

dsnStr, err := pmysql.GenerateDSN(ctx, sinkURI, cfg, dbConnFactory.CreateTemporaryConnection)
connector, err := pmysql.NewDBConnectorWithFactory(ctx, cfg, sinkURI, dbConnFactory)
if err != nil {
return nil, err
}

db, err := dbConnFactory.CreateStandardConnection(ctx, dsnStr)
cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, connector.CurrentDB)
if err != nil {
return nil, err
}

cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, db)
cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, connector.CurrentDB)
if err != nil {
return nil, err
}

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
return nil, err
}

// By default, cache-prep-stmts=true, an LRU cache is used for prepared statements,
// two connections are required to process a transaction.
// The first connection is held in the tx variable, which is used to manage the transaction.
// The second connection is requested through a call to s.db.Prepare
// in case of a cache miss for the statement query.
// The connection pool for CDC is configured with a static size, equal to the number of workers.
// CDC may hang at the "Get Connection" call is due to the limited size of the connection pool.
// When the connection pool is small,
// the chance of all connections being active at the same time increases,
// leading to exhaustion of available connections and a hang at the "Get Connection" call.
// This issue is less likely to occur when the connection pool is larger,
// as there are more connections available for use.
// Adding an extra connection to the connection pool solves the connection exhaustion issue.
db.SetMaxIdleConns(cfg.WorkerCount + 1)
db.SetMaxOpenConns(cfg.WorkerCount + 1)
connector.ConfigureDBWhenSwitch(func() {
// By default, cache-prep-stmts=true, an LRU cache is used for prepared statements,
// two connections are required to process a transaction.
// The first connection is held in the tx variable, which is used to manage the transaction.
// The second connection is requested through a call to s.db.Prepare
// in case of a cache miss for the statement query.
// The connection pool for CDC is configured with a static size, equal to the number of workers.
// CDC may hang at the "Get Connection" call is due to the limited size of the connection pool.
// When the connection pool is small,
// the chance of all connections being active at the same time increases,
// leading to exhaustion of available connections and a hang at the "Get Connection" call.
// This issue is less likely to occur when the connection pool is larger,
// as there are more connections available for use.
// Adding an extra connection to the connection pool solves the connection exhaustion issue.
connector.CurrentDB.SetMaxIdleConns(cfg.WorkerCount + 1)
connector.CurrentDB.SetMaxOpenConns(cfg.WorkerCount + 1)
}, true)

// Inherit the default value of the prepared statement cache from the SinkURI Options
cachePrepStmts := cfg.CachePrepStmts
if cachePrepStmts {
// query the size of the prepared statement cache on serverside
maxPreparedStmtCount, err := pmysql.QueryMaxPreparedStmtCount(ctx, db)
maxPreparedStmtCount, err := pmysql.QueryMaxPreparedStmtCount(ctx, connector.CurrentDB)

Check warning on line 137 in cdc/sink/dmlsink/txn/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/mysql/mysql.go#L137

Added line #L137 was not covered by tests
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +164,7 @@
}

var maxAllowedPacket int64
maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, db)
maxAllowedPacket, err = pmysql.QueryMaxAllowedPacket(ctx, connector.CurrentDB)
if err != nil {
log.Warn("failed to query max_allowed_packet, use default value",
zap.String("changefeed", changefeed),
Expand All @@ -180,7 +177,7 @@
backends = append(backends, &mysqlBackend{
workerID: i,
changefeed: changefeed,
db: db,
connector: *connector,
cfg: cfg,
dmlMaxRetry: defaultDMLMaxRetry,
statistics: statistics,
Expand Down Expand Up @@ -261,9 +258,9 @@
if s.stmtCache != nil {
s.stmtCache.Purge()
}
if s.db != nil {
err = s.db.Close()
s.db = nil
if s.connector.CurrentDB != nil {
err = s.connector.CurrentDB.Close()
s.connector.CurrentDB = nil
}
return
}
Expand Down Expand Up @@ -673,7 +670,7 @@
if s.cachePrepStmts {
if stmt, ok := s.stmtCache.Get(query); ok {
prepStmt = stmt.(*sql.Stmt)
} else if stmt, err := s.db.Prepare(query); err == nil {
} else if stmt, err := s.connector.CurrentDB.Prepare(query); err == nil {

Check warning on line 673 in cdc/sink/dmlsink/txn/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/txn/mysql/mysql.go#L673

Added line #L673 was not covered by tests
prepStmt = stmt
s.stmtCache.Add(query, stmt)
} else {
Expand Down Expand Up @@ -741,7 +738,7 @@
})

err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
tx, err := s.db.BeginTx(pctx, nil)
tx, err := s.connector.CurrentDB.BeginTx(pctx, nil)
if err != nil {
return 0, 0, logDMLTxnErr(
wrapMysqlTxnError(err),
Expand Down Expand Up @@ -799,7 +796,9 @@
zap.Int("workerID", s.workerID),
zap.Int("numOfRows", dmls.rowCount))
return nil
}, retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
}, retry.WithPreExecutionWhenRetry(func() error {
return s.connector.SwitchToAnAvailableDB(pctx)
}), retry.WithBackoffBaseDelay(pmysql.BackoffBaseDelay.Milliseconds()),
retry.WithBackoffMaxDelay(pmysql.BackoffMaxDelay.Milliseconds()),
retry.WithMaxTries(s.dmlMaxRetry),
retry.WithIsRetryableErr(isRetryableDMLError))
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@
if err != nil {
return err
}
dsn, err := pmysql.GenBasicDSN(sinkURI, cfg)
dsnList, err := pmysql.GetDSNCfgs(sinkURI, cfg)

Check warning on line 125 in cdc/sink/validator/validator.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/validator/validator.go#L125

Added line #L125 was not covered by tests
if err != nil {
return err
}
testDB, err := pmysql.GetTestDB(ctx, dsn, pmysql.CreateMySQLDBConn)
testDB, err := pmysql.GetTestDB(ctx, dsnList[0], pmysql.CreateMySQLDBConn)

Check warning on line 129 in cdc/sink/validator/validator.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/validator/validator.go#L129

Added line #L129 was not covered by tests
if err != nil {
return err
}
Expand Down
Loading
Loading