From e3d7d5c2a8d0c264a27cbe113be3f35658a23ee1 Mon Sep 17 00:00:00 2001 From: erwadba Date: Sat, 25 Feb 2023 09:08:33 +0800 Subject: [PATCH 1/3] replace crc32 to md5 checksum --- sync_diff_inspector/diff.go | 6 ++-- sync_diff_inspector/source/mysql_shard.go | 41 ++++++++++++++--------- sync_diff_inspector/source/source.go | 6 ++-- sync_diff_inspector/source/tidb.go | 8 +++-- sync_diff_inspector/utils/utils.go | 41 +++++++++++------------ 5 files changed, 55 insertions(+), 47 deletions(-) diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index e5b5e9f12..553f7f2b1 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -583,9 +583,9 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli wg.Add(1) go func() { defer wg.Done() - upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange) + upstreamInfo = df.upstream.GetCountAndMD5(ctx, tableRange) }() - downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange) + downstreamInfo = df.downstream.GetCountAndMD5(ctx, tableRange) wg.Wait() if upstreamInfo.Err != nil { @@ -601,7 +601,7 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum { return true, upstreamInfo.Count, downstreamInfo.Count, nil } - log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum)) + log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.String("upstream checksum", upstreamInfo.Checksum), zap.String("downstream checksum", downstreamInfo.Checksum)) return false, upstreamInfo.Count, downstreamInfo.Count, nil } diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 26789f289..3c1aae55c 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "fmt" + "strconv" "time" tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter" @@ -93,42 +94,50 @@ func (s *MySQLSources) Close() { } } -func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { +func (s *MySQLSources) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { beginTime := time.Now() table := s.tableDiffs[tableRange.GetTableIndex()] chunk := tableRange.GetChunk() matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table) - infoCh := make(chan *ChecksumInfo, len(s.sourceTablesMap)) - + type checksumInfo struct { + lMD5 uint64 + rMD5 uint64 + count int64 + err error + } + infoCh := make(chan *checksumInfo, len(s.sourceTablesMap)) for _, ms := range matchSources { go func(ms *common.TableShardSource) { - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) - infoCh <- &ChecksumInfo{ - Checksum: checksum, - Count: count, - Err: err, + count, lmd5, rmd5, err := utils.GetCountAndMD5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) + infoCh <- &checksumInfo{ + lMD5: lmd5, + rMD5: rmd5, + count: count, + err: err, } }(ms) } defer close(infoCh) var ( - err error - totalCount int64 - totalChecksum int64 + err error + totalCount int64 + totalLMD5 uint64 + totalRMD5 uint64 ) for range matchSources { info := <-infoCh // catch the first error - if err == nil && info.Err != nil { - err = info.Err + if err == nil && info.err != nil { + err = info.err } - totalCount += info.Count - totalChecksum ^= info.Checksum + totalCount += info.count + totalLMD5 ^= info.lMD5 + totalRMD5 ^= info.rMD5 } - + totalChecksum := strconv.FormatUint(totalLMD5, 16) + strconv.FormatUint(totalRMD5, 16) cost := time.Since(beginTime) return &ChecksumInfo{ Checksum: totalChecksum, diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index baa21e85d..6bb791863 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -50,7 +50,7 @@ const ( ) type ChecksumInfo struct { - Checksum int64 + Checksum string Count int64 Err error Cost time.Duration @@ -82,8 +82,8 @@ type Source interface { // there are many workers consume the range from the channel to compare. GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error) - // GetCountAndCrc32 gets the crc32 result and the count from given range. - GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo + // GetCountAndMD5 gets the crc32 result and the count from given range. + GetCountAndMD5(context.Context, *splitter.RangeInfo) *ChecksumInfo // GetCountForLackTable gets the count for tables that don't exist upstream or downstream. GetCountForLackTable(context.Context, *splitter.RangeInfo) int64 diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 8c47bf060..ac4d0d948 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "strconv" "time" "github.com/coreos/go-semver/semver" @@ -120,14 +121,15 @@ func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo func (s *TiDBSource) Close() { s.dbConn.Close() } -func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { + +func (s *TiDBSource) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { beginTime := time.Now() table := s.tableDiffs[tableRange.GetTableIndex()] chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) - + count, lmd5, rmd5, err := utils.GetCountAndMD5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) + checksum := strconv.FormatUint(lmd5, 16) + strconv.FormatUint(rmd5, 16) cost := time.Since(beginTime) return &ChecksumInfo{ Checksum: checksum, diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 68e0ad6de..cffe947fe 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -743,16 +743,16 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) return dataSize.Int64, nil } -// GetCountAndCRC32Checksum returns checksum code and count of some data by given condition -func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) { +// GetCountAndMD5Checksum returns checksum code and count of some data by given condition +func GetCountAndMD5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, uint64, error) { /* - calculate CRC32 checksum and count example: - mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; - +--------+------------+ - | CNT | CHECKSUM | - +--------+------------+ - | 100000 | 1128664311 | - +--------+------------+ + calculate MD5 checksum and count example: + mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED)) LMD5, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) RMD5 FROM `a`.`t`; + +--------+--------------------------------------------+ + | CNT | LMD5 | RMD5 | + +--------+--------------------------------------------+ + | 100000 | 3462532621352132810 | 17515372630935707780 | + +--------+--------------------------------------------+ 1 row in set (0.46 sec) */ columnNames := make([]string, 0, len(tbInfo.Columns)) @@ -770,24 +770,21 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", - strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED)) LMD5, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) RMD5 FROM %s WHERE %s;", + strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) - var count sql.NullInt64 - var checksum sql.NullInt64 - err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) + var ( + count int64 + lmd5 uint64 + rmd5 uint64 + ) + err := db.QueryRowContext(ctx, query, args...).Scan(&count, &lmd5, &rmd5) if err != nil { log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err)) - return -1, -1, errors.Trace(err) + return -1, 0, 0, errors.Trace(err) } - if !count.Valid || !checksum.Valid { - // if don't have any data, the checksum will be `NULL` - log.Warn("get empty count or checksum", zap.String("sql", query), zap.Reflect("args", args)) - return 0, 0, nil - } - - return count.Int64, checksum.Int64, nil + return count, lmd5, rmd5, nil } // GetRandomValues returns some random values. Different from /pkg/dbutil.GetRandomValues, it returns multi-columns at the same time. From d6ee7ea576703ac688b1c8886c860df59edd9ab8 Mon Sep 17 00:00:00 2001 From: erwadba Date: Sat, 25 Feb 2023 10:02:37 +0800 Subject: [PATCH 2/3] change test case about replace crc32 to md5 --- sync_diff_inspector/source/source_test.go | 14 +++++++------- sync_diff_inspector/utils/utils_test.go | 9 +++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index 0a7a1964d..4d860a3c1 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -182,12 +182,12 @@ func TestTiDBSource(t *testing.T) { require.NoError(t, err) require.True(t, check) require.Equal(t, n, tableCase.rangeInfo.GetTableIndex()) - countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456) + countRows := sqlmock.NewRows([]string{"CNT", "LMD5", "RMD5"}).AddRow(123, 456, 789) mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows) - checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo) + checksum := tidb.GetCountAndMD5(ctx, tableCase.rangeInfo) require.NoError(t, checksum.Err) require.Equal(t, checksum.Count, int64(123)) - require.Equal(t, checksum.Checksum, int64(456)) + require.Equal(t, checksum.Checksum, "1c8315") } // Test ChunkIterator @@ -390,17 +390,17 @@ func TestMysqlShardSources(t *testing.T) { for n, tableCase := range tableCases { require.Equal(t, n, tableCase.rangeInfo.GetTableIndex()) - var resChecksum int64 = 0 + var resChecksum uint64 = 0 for i := 0; i < len(dbs); i++ { resChecksum = resChecksum + 1< Date: Sat, 25 Feb 2023 10:20:06 +0800 Subject: [PATCH 3/3] fix the wrong annotation --- sync_diff_inspector/source/source.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index 6bb791863..fbd78c1a5 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -82,7 +82,7 @@ type Source interface { // there are many workers consume the range from the channel to compare. GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error) - // GetCountAndMD5 gets the crc32 result and the count from given range. + // GetCountAndMD5 gets the md5 result and the count from given range. GetCountAndMD5(context.Context, *splitter.RangeInfo) *ChecksumInfo // GetCountForLackTable gets the count for tables that don't exist upstream or downstream.