Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): calculate raw bytes checksum by using handle (#11720) #11731

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,20 @@
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}

Check warning on line 202 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L201-L202

Added lines #L201 - L202 were not covered by tests
baseInfo.RecordID = recordID

rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, recordID, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,28 +237,21 @@

func (m *mounter) unmarshalRowKVEntry(
tableInfo *model.TableInfo,
rawKey []byte,
rawValue []byte,
rawOldValue []byte,
base baseKVEntry,
) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
}
base.RecordID = recordID

var (
row, preRow map[int64]types.Datum
rowExist, preRowExist bool
)

row, rowExist, err = m.decodeRow(rawValue, recordID, tableInfo, false)
row, rowExist, err := m.decodeRow(rawValue, base.RecordID, tableInfo, false)
if err != nil {
return nil, errors.Trace(err)
}

preRow, preRowExist, err = m.decodeRow(rawOldValue, recordID, tableInfo, true)
preRow, preRowExist, err = m.decodeRow(rawOldValue, base.RecordID, tableInfo, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -490,33 +489,34 @@

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))

Check warning on line 494 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L492-L494

Added lines #L492 - L494 were not covered by tests
return 0, false, err
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))

Check warning on line 511 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L510-L511

Added lines #L510 - L511 were not covered by tests
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))

Check warning on line 519 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L518-L519

Added lines #L518 - L519 were not covered by tests
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -602,7 +602,7 @@

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
handle kv.Handle, key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
Expand All @@ -621,12 +621,14 @@
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))

Check warning on line 625 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L624-L625

Added lines #L624 - L625 were not covered by tests
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, handle, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
Expand All @@ -635,7 +637,10 @@
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))

Check warning on line 643 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L640-L643

Added lines #L640 - L643 were not covered by tests

return expected, false, nil
}
Expand All @@ -645,7 +650,7 @@
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
handle kv.Handle, key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand All @@ -665,17 +670,22 @@
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
case 1, 2:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))

Check warning on line 678 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L676-L678

Added lines #L676 - L678 were not covered by tests
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))

Check warning on line 688 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L686-L688

Added lines #L686 - L688 were not covered by tests
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand All @@ -685,7 +695,7 @@
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
tableInfo *model.TableInfo, row *rowKVEntry, handle kv.Handle, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
Expand Down Expand Up @@ -719,19 +729,15 @@
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, handle, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))

Check warning on line 740 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L739-L740

Added lines #L739 - L740 were not covered by tests
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -751,18 +757,14 @@
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, handle, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))

Check warning on line 767 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L766-L767

Added lines #L766 - L767 were not covered by tests
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down
5 changes: 3 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/executor"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -1553,7 +1554,7 @@ func TestBuildTableInfo(t *testing.T) {
for i, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
originTI, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
colDatas, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, tz)
Expand Down Expand Up @@ -1623,7 +1624,7 @@ func TestNewDMRowChange(t *testing.T) {
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
originTI, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols := []*model.Column{
Expand Down
4 changes: 2 additions & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
}

// GetSchemaVersion returns the schema version of the meta.
func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
func GetSchemaVersion(meta timeta.Reader) (int64, error) {

Check warning on line 113 in cdc/entry/schema/snapshot.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/schema/snapshot.go#L113

Added line #L113 was not covered by tests
// After we get the schema version at startTs, if the diff corresponding to that version does not exist,
// it means that the job is not committed yet, so we should subtract one from the version, i.e., version--.
version, err := meta.GetSchemaVersion()
Expand All @@ -130,7 +130,7 @@
// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
meta *timeta.Meta,
meta timeta.Reader,
currentTs uint64,
forceReplicate bool,
filter filter.Filter,
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func getAllHistoryDDLJob(storage tidbkv.Storage, f filter.Filter) ([]*timodel.Jo
return nil, errors.Trace(err)
}
defer txn.Rollback() //nolint:errcheck
txnMeta := timeta.NewMeta(txn)
txnMeta := timeta.NewReader(txn)

jobs, err := ddl.GetAllHistoryDDLJobs(txnMeta)
res := make([]*timodel.Job, 0)
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func TestAllTables(t *testing.T) {
require.Len(t, tableInfos, 2)
tableName := tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Table: "t1",
TableID: 108,
Schema: job.SchemaName,
Table: job.TableName,
TableID: job.TableID,
}, tableName)
// add ineligible table
job = helper.DDL2Job("create table test.t2(id int)")
Expand All @@ -148,9 +148,9 @@ func TestAllTables(t *testing.T) {
require.Len(t, tableInfos, 2)
tableName = tableInfos[1].TableName
require.Equal(t, model.TableName{
Schema: "test",
Schema: job.SchemaName,
Table: "t1",
TableID: 108,
TableID: 112,
}, tableName)
}

Expand Down
13 changes: 8 additions & 5 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func NewSchemaTestHelper(t testing.TB) *SchemaTestHelper {
// DDL2Job executes the DDL stmt and returns the DDL job
func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
reader := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(reader.(*timeta.Mutator), 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
Expand Down Expand Up @@ -151,7 +152,8 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), jobCnt)
reader := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(reader.(*timeta.Mutator), jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
// Set State from Synced to Done.
Expand Down Expand Up @@ -209,7 +211,8 @@ func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
// DDL2Event executes the DDL and return the corresponding event.
func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
meta := s.GetCurrentMeta()
jobs, err := tiddl.GetLastNHistoryDDLJobs(meta.(*timeta.Mutator), 1)
require.NoError(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
Expand Down Expand Up @@ -267,10 +270,10 @@ func (s *SchemaTestHelper) Tk() *testkit.TestKit {
}

// GetCurrentMeta return the current meta snapshot
func (s *SchemaTestHelper) GetCurrentMeta() *timeta.Meta {
func (s *SchemaTestHelper) GetCurrentMeta() timeta.Reader {
ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(s.t, err)
return timeta.NewSnapshotMeta(s.storage.GetSnapshot(ver))
return timeta.NewReader(s.storage.GetSnapshot(ver))
}

// Close closes the helper
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
)

// GetSnapshotMeta returns tidb meta information
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) *meta.Meta {
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) meta.Reader {

Check warning on line 30 in cdc/kv/utils.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/utils.go#L30

Added line #L30 was not covered by tests
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot)
return meta.NewReader(snapshot)

Check warning on line 32 in cdc/kv/utils.go

View check run for this annotation

Codecov / codecov/patch

cdc/kv/utils.go#L32

Added line #L32 was not covered by tests
}

// CreateTiStore creates a tikv storage client
Expand Down
14 changes: 2 additions & 12 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -99,7 +99,7 @@ func newMockDDLJobPuller(
if needSchemaStorage {
helper = entry.NewSchemaTestHelper(t)
kvStorage := helper.Storage()
ts := helper.GetCurrentMeta().StartTS
ts := helper.GetCurrentMeta().(*meta.Mutator).StartTS
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
schemaStorage, err := entry.NewSchemaStorage(
Expand Down Expand Up @@ -532,16 +532,6 @@ func TestHandleJob(t *testing.T) {
job := &timodel.Job{
Type: timodel.ActionFlashbackCluster,
BinlogInfo: &timodel.HistoryInfo{},
Args: []interface{}{
998,
map[string]interface{}{},
true, /* tidb_gc_enable */
variable.On, /* tidb_enable_auto_analyze */
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0, /* commitTS */
},
}
skip, err := ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
Loading
Loading