From 7ed4d38d3f4707bddf4fa6287e40c3656380833b Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:23:56 +0800 Subject: [PATCH 1/3] tests(ticdc): fix bank test (#11407) close pingcap/tiflow#11806 --- cdc/owner/ddl_manager.go | 2 + tests/integration_tests/bank/case.go | 80 +++++++++++++++++++++++++++- tests/integration_tests/bank/run.sh | 4 +- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 83b2347bc97..5cc9320a12c 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -210,6 +210,8 @@ func (m *ddlManager) tick( } if job != nil && job.BinlogInfo != nil { + // Note: do not change the key words in the log, it is used to search the + // FinishTS of the DDL job. Some integration tests and users depend on it. log.Info("handle a ddl job", zap.String("namespace", m.changfeedID.Namespace), zap.String("ID", m.changfeedID.ID), diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index bd1a5794d1b..0219eb83ba5 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -14,10 +14,15 @@ package main import ( + "bufio" "context" "database/sql" "fmt" "math/rand" + "os" + "path/filepath" + "regexp" + "strconv" "strings" "sync/atomic" "time" @@ -620,9 +625,10 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tableName string) log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err())) return "", ctx.Err() case <-time.After(2 * time.Second): - result, ok := tryGetEndTs(db, tableName) + // result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) + result, ok := tryGetEndTsFromLog(db, tableName) if ok { - return result, nil + return strconv.Itoa(int(result)), nil } } } @@ -642,3 +648,73 @@ func tryGetEndTs(db *sql.DB, tableName string) (result string, ok bool) { return endTime, true } + +func tryGetEndTsFromLog(db *sql.DB, tableName string) (result uint64, ok bool) { + query := "SELECT JOB_ID FROM information_schema.ddl_jobs WHERE table_name = ?" + log.Info("try get end ts", zap.String("query", query), zap.String("tableName", tableName)) + var jobID uint64 + row := db.QueryRow(query, tableName) + if err := row.Scan(&jobID); err != nil { + if err != sql.ErrNoRows { + log.Info("rows scan failed", zap.Error(err)) + } + return 0, false + } + + log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName)) + + logFilePath := "/tmp/tidb_cdc_test/bank" + cdcLogFiles := make([]string, 0) + // walk all file with cdc prefix + err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") { + cdcLogFiles = append(cdcLogFiles, path) + fmt.Println(path) + } + } + return nil + }) + if err != nil { + log.Error("Failed to walk dir: %v", zap.Error(err)) + } + log.Info("total files", zap.Any("file", cdcLogFiles)) + + logRegex := regexp.MustCompile(`handle a ddl job`) + tableNameRegex := regexp.MustCompile(tableName + "`") + timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`) + for _, f := range cdcLogFiles { + file, err := os.Open(f) + if err != nil { + log.Error("Failed to open file: %v", zap.Error(err)) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) { + continue + } + + matches := timeStampRegex.FindStringSubmatch(line) + if len(matches) > 1 { + fmt.Println("found first match line: ", matches[1], ": ", line) + // convert to uint64 + result, err := strconv.ParseUint(matches[1], 10, 64) + if err != nil { + log.Error("Failed to parse uint64: %v", zap.Error(err)) + } + return result, true + } + } + + if err := scanner.Err(); err != nil { + log.Error("Error scanning file: %v", zap.Error(err)) + } + } + return 0, false +} diff --git a/tests/integration_tests/bank/run.sh b/tests/integration_tests/bank/run.sh index d883e16dc01..069d5946a9c 100644 --- a/tests/integration_tests/bank/run.sh +++ b/tests/integration_tests/bank/run.sh @@ -24,8 +24,10 @@ function prepare() { run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down" + run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400" } trap stop_tidb_cluster EXIT From 119e7f192e784cab1852340e6963fb0ecc313aab Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 3 Dec 2024 18:33:07 +0800 Subject: [PATCH 2/3] fix timeout --- tests/integration_tests/bank/case.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index 0219eb83ba5..722701132b0 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -624,7 +624,7 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tableName string) case <-ctx.Done(): log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err())) return "", ctx.Err() - case <-time.After(2 * time.Second): + case <-time.After(15 * time.Second): // result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) result, ok := tryGetEndTsFromLog(db, tableName) if ok { @@ -649,18 +649,7 @@ func tryGetEndTs(db *sql.DB, tableName string) (result string, ok bool) { return endTime, true } -func tryGetEndTsFromLog(db *sql.DB, tableName string) (result uint64, ok bool) { - query := "SELECT JOB_ID FROM information_schema.ddl_jobs WHERE table_name = ?" - log.Info("try get end ts", zap.String("query", query), zap.String("tableName", tableName)) - var jobID uint64 - row := db.QueryRow(query, tableName) - if err := row.Scan(&jobID); err != nil { - if err != sql.ErrNoRows { - log.Info("rows scan failed", zap.Error(err)) - } - return 0, false - } - +func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) { log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName)) logFilePath := "/tmp/tidb_cdc_test/bank" From cb502d08c3c7b93029390f06c760a63820d15aae Mon Sep 17 00:00:00 2001 From: CharlesCheung96 Date: Tue, 3 Dec 2024 19:44:10 +0800 Subject: [PATCH 3/3] fix --- tests/integration_tests/bank/case.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index 722701132b0..72adf2845ed 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "io" "math/rand" "os" "path/filepath" @@ -682,16 +683,23 @@ func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) { } defer file.Close() - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() + reader := bufio.NewReader(file) + for { + bs, _, err := reader.ReadLine() + if err != nil { + if err != io.EOF { + fmt.Printf("Error reading file: %v\n", err) + } + return 0, false + } + line := string(bs) if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) { continue } matches := timeStampRegex.FindStringSubmatch(line) if len(matches) > 1 { - fmt.Println("found first match line: ", matches[1], ": ", line) + fmt.Println("found first match line, Match Result: ", matches[1], ", line: ", line) // convert to uint64 result, err := strconv.ParseUint(matches[1], 10, 64) if err != nil { @@ -700,10 +708,6 @@ func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) { return result, true } } - - if err := scanner.Err(); err != nil { - log.Error("Error scanning file: %v", zap.Error(err)) - } } return 0, false }