Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller: fix a bug that may cause error when replicate truncate table ddl #11772

Merged
merged 4 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ func (m *ddlManager) tick(
}

for _, event := range events {
snap := m.schema.GetLastSnapshot()
if event.Type == timodel.ActionCreateTable ||
event.Type == timodel.ActionCreateTables {
if snap.IsIneligibleTableID(event.TableInfo.ID) {
continue
}
}
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}
Expand Down
43 changes: 33 additions & 10 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,27 +489,50 @@ func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot,
return false, nil
}

ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID)
if !ineligible {
snapAfter := p.schemaStorage.GetLastSnapshot()

if job.Type == timodel.ActionCreateTable {
// For create table, oldTableID is the new table ID.
isEligibleAfter := !snapAfter.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID)
if isEligibleAfter {
return false, nil
}
}

// For create tables, we always apply the DDL here.
if job.Type == timodel.ActionCreateTables {
return false, nil
}

oldTableID := job.TableID
newTableID := job.BinlogInfo.TableInfo.ID

// If the table is eligible after the DDL, we should apply the DDL.
// No matter its status before the DDL.
isEligibleAfter := !p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(newTableID)
if isEligibleAfter {
return false, nil
}

// If the table is not in the snapshot before the DDL,
// Steps here means this table is ineligible after the DDL.
// We need to check if its status before the DDL.

// 1. If the table is not in the snapshot before the DDL,
// we should ignore the DDL.
_, exist := snapBefore.PhysicalTableByID(job.TableID)
_, exist := snapBefore.PhysicalTableByID(oldTableID)
if !exist {
return true, nil
}

// If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL.
// If so, we should return an error to inform the user that it is a
// dangerous operation and should be handled manually.
isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID)
if isBeforeineligible {
log.Warn("ignore the DDL event of ineligible table",
// 2. If the table is ineligible before the DDL, we should ignore the DDL.
isIneligibleBefore := snapBefore.IsIneligibleTableID(oldTableID)
if isIneligibleBefore {
log.Warn("Ignore the DDL event of ineligible table",
zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job))
return true, nil
}

// 3. If the table is eligible before the DDL, we should return an error.
return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+
"it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+
"pelase pause the changefeed and update the `force-replicate=true` "+
Expand Down
34 changes: 33 additions & 1 deletion cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) {
require.Nil(t, err)
}

func TestCcheckIneligibleTableDDL(t *testing.T) {
func TestCheckIneligibleTableDDL(t *testing.T) {
ddlJobPuller, helper := newMockDDLJobPuller(t, true)
defer helper.Close()

Expand Down Expand Up @@ -800,4 +800,36 @@ func TestCcheckIneligibleTableDDL(t *testing.T) {
require.Error(t, err)
require.False(t, skip)
require.Contains(t, err.Error(), "An eligible table become ineligible after DDL")

// case 4: create a ineligible table and truncate it, expect no error.
// It is because the table is ineligible before the DDL.
ddl = helper.DDL2Job("CREATE TABLE test1.t3 (id INT);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)

ddl = helper.DDL2Job("TRUNCATE TABLE test1.t3;")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
// Skip because the table is ineligible before the DDL.
require.True(t, skip)

// case 5: create a ineligible table and alter it to eligible, expect no error.
ddl = helper.DDL2Job("CREATE TABLE test1.t4 (id INT not null);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)

// Add a unique key to the table, make it eligible.
ddl = helper.DDL2Job("ALTER TABLE test1.t4 ADD UNIQUE KEY cdc_valid_index (id);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

// case 6: Batch create tables, including a ineligible table and a eligible table, expect no error.
ddl = helper.DDL2Job(`CREATE TABLE test1.t5 (id INT);
CREATE TABLE test1.t6 (id INT PRIMARY KEY);`)
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)
}
Loading