Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests(ticdc): fix bank test (#11407) #11807

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (m *ddlManager) tick(
}

if job != nil && job.BinlogInfo != nil {
// Note: do not change the key words in the log, it is used to search the
// FinishTS of the DDL job. Some integration tests and users depend on it.
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
Expand Down
75 changes: 72 additions & 3 deletions tests/integration_tests/bank/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
package main

import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -619,10 +625,11 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tableName string)
case <-ctx.Done():
log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err()))
return "", ctx.Err()
case <-time.After(2 * time.Second):
result, ok := tryGetEndTs(db, tableName)
case <-time.After(15 * time.Second):
// result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
result, ok := tryGetEndTsFromLog(db, tableName)
if ok {
return result, nil
return strconv.Itoa(int(result)), nil
}
}
}
Expand All @@ -642,3 +649,65 @@ func tryGetEndTs(db *sql.DB, tableName string) (result string, ok bool) {

return endTime, true
}

func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) {
log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName))

logFilePath := "/tmp/tidb_cdc_test/bank"
cdcLogFiles := make([]string, 0)
// walk all file with cdc prefix
err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") {
cdcLogFiles = append(cdcLogFiles, path)
fmt.Println(path)
}
}
return nil
})
if err != nil {
log.Error("Failed to walk dir: %v", zap.Error(err))
}
log.Info("total files", zap.Any("file", cdcLogFiles))

logRegex := regexp.MustCompile(`handle a ddl job`)
tableNameRegex := regexp.MustCompile(tableName + "`")
timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`)
for _, f := range cdcLogFiles {
file, err := os.Open(f)
if err != nil {
log.Error("Failed to open file: %v", zap.Error(err))
}
defer file.Close()

reader := bufio.NewReader(file)
for {
bs, _, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
fmt.Printf("Error reading file: %v\n", err)
}
return 0, false
}
line := string(bs)
if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) {
continue
}

matches := timeStampRegex.FindStringSubmatch(line)
if len(matches) > 1 {
fmt.Println("found first match line, Match Result: ", matches[1], ", line: ", line)
// convert to uint64
result, err := strconv.ParseUint(matches[1], 10, 64)
if err != nil {
log.Error("Failed to parse uint64: %v", zap.Error(err))
}
return result, true
}
}
}
return 0, false
}
4 changes: 3 additions & 1 deletion tests/integration_tests/bank/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ function prepare() {
run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/"

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down"
run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400"
}

trap stop_tidb_cluster EXIT
Expand Down
Loading