Skip to content

Commit

Permalink
Cherry-pick 420342f with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Feb 14, 2025
1 parent 57a3391 commit 2116cf7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/fk_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestFKExt(t *testing.T) {
setSidecarDBName("_vt")

// Ensure that there are multiple copy phase cycles per table.
extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal")
extraVTTabletArgs = append(extraVTTabletArgs,
"--vstream_packet_size=256",
"--queryserver-config-schema-change-signal",
parallelInsertWorkers)
extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4")
defer func() { extraVTTabletArgs = nil }()
initFKExtConfig(t)
Expand Down
15 changes: 15 additions & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const testWorkflowFlavor = workflowFlavorRandom
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
setSidecarDBName("_vt")
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
Expand Down Expand Up @@ -129,6 +130,20 @@ func TestFKWorkflow(t *testing.T) {
<-ch
}
mt.Complete()
<<<<<<< HEAD
=======
vtgateConn, closeConn := getVTGateConn()
defer closeConn()

if withLoad {
t11Count := getRowCount(t, vtgateConn, "t11")
t12Count := getRowCount(t, vtgateConn, "t12")
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
}

>>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772))
}

func insertInitialFKData(t *testing.T) {
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
rowsCopiedTicker := time.NewTicker(rowsCopiedUpdateInterval)
defer rowsCopiedTicker.Stop()

<<<<<<< HEAD

Check failure on line 88 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }

Check failure on line 88 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected <<, expected }
parallelism := getInsertParallelism()
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}
=======
parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers)))

>>>>>>> 420342fddb (VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772))

Check failure on line 98 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'

Check failure on line 98 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

invalid character U+0023 '#'
copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand All @@ -114,7 +119,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk)
tableName := resp.TableName
gtid = resp.Gtid

updateRowsCopied := func() error {

Check failure on line 122 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

method has no receiver

Check failure on line 122 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected {, expected (

Check failure on line 122 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

method has no receiver

Check failure on line 122 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected {, expected (
updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get())

Check failure on line 123 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected := in parameter list; possibly missing comma or )

Check failure on line 123 in go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go

View workflow job for this annotation

GitHub Actions / Code Coverage

syntax error: unexpected := in parameter list; possibly missing comma or )
_, err := vc.vr.dbClient.Execute(updateRowsQuery)
Expand Down Expand Up @@ -204,6 +208,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("copying table %s with lastpk %v", tableName, lastpkbv)
// Prepare a vcopierCopyTask for the current batch of work.
currCh := make(chan *vcopierCopyTaskResult, 1)

if parallelism > 1 {
resp = resp.CloneVT()
}
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk))

// Send result to the global resultCh and currCh. resultCh is used by
Expand Down Expand Up @@ -291,12 +299,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("Copy of %v stopped", state.currentTableName)
return fmt.Errorf("CopyAll was interrupted due to context expiration")
default:
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if copyWorkQueue != nil {
copyWorkQueue.close()
}
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if err := vc.updatePos(ctx, gtid); err != nil {
return err
}
Expand Down

0 comments on commit 2116cf7

Please sign in to comment.