From d467a793fdcd037993d1aa8ec8c372b028a89e76 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:31:20 +0100 Subject: [PATCH] VReplication Atomic Copy Workflows: fix bugs around concurrent inserts (#17772) Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/fk_ext_test.go | 5 ++++- go/test/endtoend/vreplication/fk_test.go | 14 +++++++++----- .../vreplication/vcopier_atomic.go | 17 ++++++++--------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go index e17247ab46b..6b3cc17ecc9 100644 --- a/go/test/endtoend/vreplication/fk_ext_test.go +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -86,7 +86,10 @@ func TestFKExt(t *testing.T) { setSidecarDBName("_vt") // Ensure that there are multiple copy phase cycles per table. - extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal") + extraVTTabletArgs = append(extraVTTabletArgs, + "--vstream_packet_size=256", + "--queryserver-config-schema-change-signal", + parallelInsertWorkers) extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4") defer func() { extraVTTabletArgs = nil }() initFKExtConfig(t) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 34881cbcd1a..b7ce5de1331 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -40,6 +40,7 @@ const testWorkflowFlavor = workflowFlavorVtctld // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, // i.e. with foreign_key_checks=0. func TestFKWorkflow(t *testing.T) { + setSidecarDBName("_vt") extraVTTabletArgs = []string{ // Ensure that there are multiple copy phase cycles per table. "--vstream_packet_size=256", @@ -132,11 +133,14 @@ func TestFKWorkflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - t11Count := getRowCount(t, vtgateConn, "t11") - t12Count := getRowCount(t, vtgateConn, "t12") - require.Greater(t, t11Count, 1) - require.Greater(t, t12Count, 1) - require.Equal(t, t11Count, t12Count) + if withLoad { + t11Count := getRowCount(t, vtgateConn, "t11") + t12Count := getRowCount(t, vtgateConn, "t12") + require.Greater(t, t11Count, 1) + require.Greater(t, t12Count, 1) + require.Equal(t, t11Count, t12Count) + } + } func insertInitialFKData(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 1e3892c0f05..aae17027bc4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -84,11 +84,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings defer rowsCopiedTicker.Stop() parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers))) - // For now do not support concurrent inserts for atomic copies. - if parallelism > 1 { - parallelism = 1 - log.Infof("Disabling concurrent inserts for atomic copies") - } + copyWorkerFactory := vc.newCopyWorkerFactory(parallelism) var copyWorkQueue *vcopierCopyWorkQueue @@ -115,7 +111,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk) tableName := resp.TableName gtid = resp.Gtid - updateRowsCopied := func() error { updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get()) _, err := vc.vr.dbClient.Execute(updateRowsQuery) @@ -205,6 +200,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("copying table %s with lastpk %v", tableName, lastpkbv) // Prepare a vcopierCopyTask for the current batch of work. currCh := make(chan *vcopierCopyTaskResult, 1) + + if parallelism > 1 { + resp = resp.CloneVT() + } currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk)) // Send result to the global resultCh and currCh. resultCh is used by @@ -292,12 +291,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("Copy of %v stopped", state.currentTableName) return fmt.Errorf("CopyAll was interrupted due to context expiration") default: - if err := vc.deleteCopyState(state.currentTableName); err != nil { - return err - } if copyWorkQueue != nil { copyWorkQueue.close() } + if err := vc.deleteCopyState(state.currentTableName); err != nil { + return err + } if err := vc.updatePos(ctx, gtid); err != nil { return err }