Skip to content

Commit

Permalink
mysql(ticdc): remove returned error and modify the log in `CheckIsTiD…
Browse files Browse the repository at this point in the history
…B ` (#11506)

close #11505
  • Loading branch information
wlwilliamx authored Sep 10, 2024
1 parent 2e3aade commit 0848c61
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
5 changes: 1 addition & 4 deletions cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ func NewDDLSink(
return nil, err
}

cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, db)
if err != nil {
return nil, err
}
cfg.IsTiDB = pmysql.CheckIsTiDB(ctx, db)

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func NewMySQLBackends(
return nil, err
}

cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, db)
if err != nil {
return nil, err
}
cfg.IsTiDB = pmysql.CheckIsTiDB(ctx, db)

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ func newMySQLSyncPointStore(
return nil, err
}

cfg.IsTiDB, err = pmysql.CheckIsTiDB(ctx, syncDB)
if err != nil {
return nil, err
}
cfg.IsTiDB = pmysql.CheckIsTiDB(ctx, syncDB)

cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, syncDB)
if err != nil {
Expand Down
32 changes: 23 additions & 9 deletions pkg/sink/mysql/db_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,16 @@ func GenBasicDSN(sinkURI *url.URL, cfg *Config) (*dmysql.Config, error) {

// CheckIfBDRModeIsSupported checks if the downstream supports BDR mode.
func CheckIfBDRModeIsSupported(ctx context.Context, db *sql.DB) (bool, error) {
isTiDB, err := CheckIsTiDB(ctx, db)
if err != nil || !isTiDB {
return false, err
isTiDB := CheckIsTiDB(ctx, db)
if !isTiDB {
return false, nil
}
testSourceID := 1
// downstream is TiDB, set system variables.
// We should always try to set this variable, and ignore the error if
// downstream does not support this variable, it is by design.
query := fmt.Sprintf("SET SESSION %s = %d", "tidb_cdc_write_source", testSourceID)
_, err = db.ExecContext(ctx, query)
_, err := db.ExecContext(ctx, query)
if err != nil {
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
mysqlErr.Number == tmysql.ErrUnknownSystemVariable {
Expand All @@ -284,16 +284,30 @@ func CheckIfBDRModeIsSupported(ctx context.Context, db *sql.DB) (bool, error) {
}

// CheckIsTiDB checks if the downstream is TiDB.
func CheckIsTiDB(ctx context.Context, db *sql.DB) (bool, error) {
func CheckIsTiDB(ctx context.Context, db *sql.DB) bool {
var tidbVer string
// check if downstream is TiDB
row := db.QueryRowContext(ctx, "select tidb_version()")
err := row.Scan(&tidbVer)
if err != nil {
log.Error("check tidb version error", zap.Error(err))
return false, nil
}
return true, nil
log.Warn("check tidb version error, the downstream db is not tidb?", zap.Error(err))
// In earlier versions, this function returned an `error` along with a boolean value,
// which allowed callers to differentiate between network-related issues and
// the absence of TiDB. However, since the specific error content wasn't critical to
// the logic—external callers only needed to know whether the downstream was TiDB—the
// decision was made to simplify the function. External callers should not handle the
// error returned here because even if the downstream is not TiDB, TiCDC should still
// function properly, for more details: https://github.com/pingcap/tiflow/pull/11214
//
// So instead of returning an `error`, we now log a warning if the
// query fails. This keeps the external interface clean while still
// providing observability into network or query issues through logs.
//
// Note: The function returns `false` and logs a warning if the
// query to retrieve the TiDB version fails.
return false
}
return true
}

// QueryMaxPreparedStmtCount gets the value of max_prepared_stmt_count
Expand Down
5 changes: 1 addition & 4 deletions pkg/sink/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ func NewObserver(
db.SetMaxIdleConns(2)
db.SetMaxOpenConns(2)

isTiDB, err := pmysql.CheckIsTiDB(ctx, db)
if err != nil {
return nil, err
}
isTiDB := pmysql.CheckIsTiDB(ctx, db)
if isTiDB {
return NewTiDBObserver(db), nil
}
Expand Down

0 comments on commit 0848c61

Please sign in to comment.