diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go index e17247ab46b..6b3cc17ecc9 100644 --- a/go/test/endtoend/vreplication/fk_ext_test.go +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -86,7 +86,10 @@ func TestFKExt(t *testing.T) { setSidecarDBName("_vt") // Ensure that there are multiple copy phase cycles per table. - extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal") + extraVTTabletArgs = append(extraVTTabletArgs, + "--vstream_packet_size=256", + "--queryserver-config-schema-change-signal", + parallelInsertWorkers) extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4") defer func() { extraVTTabletArgs = nil }() initFKExtConfig(t) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index 72cd278002f..f56bb0c290c 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -40,6 +40,7 @@ const testWorkflowFlavor = workflowFlavorVtctld // It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without, // i.e. with foreign_key_checks=0. func TestFKWorkflow(t *testing.T) { + setSidecarDBName("_vt") extraVTTabletArgs = []string{ // Ensure that there are multiple copy phase cycles per table. "--vstream_packet_size=256", @@ -132,11 +133,14 @@ func TestFKWorkflow(t *testing.T) { vtgateConn, closeConn := getVTGateConn() defer closeConn() - t11Count := getRowCount(t, vtgateConn, "t11") - t12Count := getRowCount(t, vtgateConn, "t12") - require.Greater(t, t11Count, 1) - require.Greater(t, t12Count, 1) - require.Equal(t, t11Count, t12Count) + if withLoad { + t11Count := getRowCount(t, vtgateConn, "t11") + t12Count := getRowCount(t, vtgateConn, "t12") + require.Greater(t, t11Count, 1) + require.Greater(t, t12Count, 1) + require.Equal(t, t11Count, t12Count) + } + } func insertInitialFKData(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index 6e4204c46a5..424996255e2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -86,11 +86,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings defer rowsCopiedTicker.Stop() parallelism := getInsertParallelism() - // For now do not support concurrent inserts for atomic copies. - if parallelism > 1 { - parallelism = 1 - log.Infof("Disabling concurrent inserts for atomic copies") - } + copyWorkerFactory := vc.newCopyWorkerFactory(parallelism) var copyWorkQueue *vcopierCopyWorkQueue @@ -114,7 +110,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk) tableName := resp.TableName gtid = resp.Gtid - updateRowsCopied := func() error { updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get()) _, err := vc.vr.dbClient.Execute(updateRowsQuery) @@ -204,6 +199,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("copying table %s with lastpk %v", tableName, lastpkbv) // Prepare a vcopierCopyTask for the current batch of work. currCh := make(chan *vcopierCopyTaskResult, 1) + + if parallelism > 1 { + resp = resp.CloneVT() + } currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk)) // Send result to the global resultCh and currCh. resultCh is used by @@ -291,12 +290,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings log.Infof("Copy of %v stopped", state.currentTableName) return fmt.Errorf("CopyAll was interrupted due to context expiration") default: - if err := vc.deleteCopyState(state.currentTableName); err != nil { - return err - } if copyWorkQueue != nil { copyWorkQueue.close() } + if err := vc.deleteCopyState(state.currentTableName); err != nil { + return err + } if err := vc.updatePos(ctx, gtid); err != nil { return err }