Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11772
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Dec 9, 2024
1 parent 46a22f7 commit 5c4a2ba
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 10 deletions.
37 changes: 37 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,43 @@ func (m *ddlManager) tick(
return nil, nil, err
}

<<<<<<< HEAD
=======
// Note: do not change the key words in the log, it is used to search the
// FinishTS of the DDL job. Some integration tests and users depend on it.
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}

for _, event := range events {
snap := m.schema.GetLastSnapshot()
if event.Type == timodel.ActionCreateTable ||
event.Type == timodel.ActionCreateTables {
if snap.IsIneligibleTableID(event.TableInfo.ID) {
log.Info("table is ineligible, skip the ddl",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.String("query", job.Query),
zap.Any("table", event.TableInfo))
continue
}
}
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}

// Send DDL events to redo log.
if m.redoDDLManager.Enabled() {
>>>>>>> 84391225e9 (puller: fix a bug that may cause error when replicate truncate table ddl (#11772))
for _, event := range events {
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
Expand Down
43 changes: 33 additions & 10 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,27 +467,50 @@ func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot,
return false, nil
}

ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID)
if !ineligible {
snapAfter := p.schemaStorage.GetLastSnapshot()

if job.Type == timodel.ActionCreateTable {
// For create table, oldTableID is the new table ID.
isEligibleAfter := !snapAfter.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID)
if isEligibleAfter {
return false, nil
}
}

// For create tables, we always apply the DDL here.
if job.Type == timodel.ActionCreateTables {
return false, nil
}

oldTableID := job.TableID
newTableID := job.BinlogInfo.TableInfo.ID

// If the table is eligible after the DDL, we should apply the DDL.
// No matter its status before the DDL.
isEligibleAfter := !p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(newTableID)
if isEligibleAfter {
return false, nil
}

// If the table is not in the snapshot before the DDL,
// Steps here means this table is ineligible after the DDL.
// We need to check if its status before the DDL.

// 1. If the table is not in the snapshot before the DDL,
// we should ignore the DDL.
_, exist := snapBefore.PhysicalTableByID(job.TableID)
_, exist := snapBefore.PhysicalTableByID(oldTableID)
if !exist {
return true, nil
}

// If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL.
// If so, we should return an error to inform the user that it is a
// dangerous operation and should be handled manually.
isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID)
if isBeforeineligible {
log.Warn("ignore the DDL event of ineligible table",
// 2. If the table is ineligible before the DDL, we should ignore the DDL.
isIneligibleBefore := snapBefore.IsIneligibleTableID(oldTableID)
if isIneligibleBefore {
log.Warn("Ignore the DDL event of ineligible table",
zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job))
return true, nil
}

// 3. If the table is eligible before the DDL, we should return an error.
return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+
"it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+
"pelase pause the changefeed and update the `force-replicate=true` "+
Expand Down
37 changes: 37 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,15 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) {
require.Nil(t, err)
}

<<<<<<< HEAD
func TestCcheckIneligibleTableDDL(t *testing.T) {
startTs := uint64(10)
mockPuller := newMockPuller(t, startTs)
ddlJobPuller, helper := newMockDDLJobPuller(t, mockPuller, true)
=======
func TestCheckIneligibleTableDDL(t *testing.T) {
ddlJobPuller, helper := newMockDDLJobPuller(t, true)
>>>>>>> 84391225e9 (puller: fix a bug that may cause error when replicate truncate table ddl (#11772))
defer helper.Close()

ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl)
Expand Down Expand Up @@ -862,4 +867,36 @@ func TestCcheckIneligibleTableDDL(t *testing.T) {
require.Error(t, err)
require.False(t, skip)
require.Contains(t, err.Error(), "An eligible table become ineligible after DDL")

// case 4: create a ineligible table and truncate it, expect no error.
// It is because the table is ineligible before the DDL.
ddl = helper.DDL2Job("CREATE TABLE test1.t3 (id INT);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)

ddl = helper.DDL2Job("TRUNCATE TABLE test1.t3;")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
// Skip because the table is ineligible before the DDL.
require.True(t, skip)

// case 5: create a ineligible table and alter it to eligible, expect no error.
ddl = helper.DDL2Job("CREATE TABLE test1.t4 (id INT not null);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)

// Add a unique key to the table, make it eligible.
ddl = helper.DDL2Job("ALTER TABLE test1.t4 ADD UNIQUE KEY cdc_valid_index (id);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

// case 6: Batch create tables, including a ineligible table and a eligible table, expect no error.
ddl = helper.DDL2Job(`CREATE TABLE test1.t5 (id INT);
CREATE TABLE test1.t6 (id INT PRIMARY KEY);`)
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)
}

0 comments on commit 5c4a2ba

Please sign in to comment.