From f7b929519974c298a5cfed62eb657c7b328e5e3f Mon Sep 17 00:00:00 2001 From: Florent Poinsard <35779988+frouioui@users.noreply.github.com> Date: Thu, 5 Dec 2024 10:44:03 -0600 Subject: [PATCH 1/8] [main] Bump go version to 1.23.4 (#17335) Signed-off-by: Florent Poinsard --- .github/workflows/update_golang_version.yml | 2 +- Makefile | 2 +- build.env | 2 +- docker/bootstrap/CHANGELOG.md | 6 +++++- docker/bootstrap/Dockerfile.common | 2 +- docker/lite/Dockerfile | 2 +- docker/lite/Dockerfile.percona80 | 2 +- docker/vttestserver/Dockerfile.mysql80 | 2 +- go.mod | 2 +- test.go | 2 +- test/templates/dockerfile.tpl | 2 +- 11 files changed, 15 insertions(+), 11 deletions(-) diff --git a/.github/workflows/update_golang_version.yml b/.github/workflows/update_golang_version.yml index 09bbf451451..ad5dceeafa5 100644 --- a/.github/workflows/update_golang_version.yml +++ b/.github/workflows/update_golang_version.yml @@ -39,7 +39,7 @@ jobs: if [ ${{ matrix.branch }} == "main" ]; then go run ./go/tools/go-upgrade/go-upgrade.go upgrade --main --allow-major-upgrade - else if [ ${{ matrix.branch }} == "release-21.0" ]; then + elif [ ${{ matrix.branch }} == "release-21.0" ]; then go run ./go/tools/go-upgrade/go-upgrade.go upgrade else go run ./go/tools/go-upgrade/go-upgrade.go upgrade --workflow-update=false diff --git a/Makefile b/Makefile index 385a3238d47..13f7fde28e0 100644 --- a/Makefile +++ b/Makefile @@ -286,7 +286,7 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto # This rule builds the bootstrap images for all flavors. DOCKER_IMAGES_FOR_TEST = mysql80 percona80 DOCKER_IMAGES = common $(DOCKER_IMAGES_FOR_TEST) -BOOTSTRAP_VERSION=38 +BOOTSTRAP_VERSION=39 ensure_bootstrap_version: find docker/ -type f -exec sed -i "s/^\(ARG bootstrap_version\)=.*/\1=${BOOTSTRAP_VERSION}/" {} \; sed -i 's/\(^.*flag.String(\"bootstrap-version\",\) *\"[^\"]\+\"/\1 \"${BOOTSTRAP_VERSION}\"/' test.go diff --git a/build.env b/build.env index 5c51b4dc660..58abbf41d1b 100755 --- a/build.env +++ b/build.env @@ -17,7 +17,7 @@ source ./tools/shell_functions.inc go version >/dev/null 2>&1 || fail "Go is not installed or is not in \$PATH. See https://vitess.io/contributing/build-from-source for install instructions." -goversion_min 1.23.3 || echo "Go version reported: `go version`. Version 1.23.3+ recommended. See https://vitess.io/contributing/build-from-source for install instructions." +goversion_min 1.23.4 || echo "Go version reported: `go version`. Version 1.23.4+ recommended. See https://vitess.io/contributing/build-from-source for install instructions." mkdir -p dist mkdir -p bin diff --git a/docker/bootstrap/CHANGELOG.md b/docker/bootstrap/CHANGELOG.md index 24ed5261df9..b52783d0c30 100644 --- a/docker/bootstrap/CHANGELOG.md +++ b/docker/bootstrap/CHANGELOG.md @@ -149,4 +149,8 @@ List of changes between bootstrap image versions. ## [38] - 2024-11-10 ### Changes -- Update build to golang 1.23.3 \ No newline at end of file +- Update build to golang 1.23.3 + +## [39] - 2024-12-04 +### Changes +- Update build to golang 1.23.4 \ No newline at end of file diff --git a/docker/bootstrap/Dockerfile.common b/docker/bootstrap/Dockerfile.common index f9654cc22d8..3ace19543f1 100644 --- a/docker/bootstrap/Dockerfile.common +++ b/docker/bootstrap/Dockerfile.common @@ -1,4 +1,4 @@ -FROM --platform=linux/amd64 golang:1.23.3-bullseye +FROM --platform=linux/amd64 golang:1.23.4-bullseye # Install Vitess build dependencies RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ diff --git a/docker/lite/Dockerfile b/docker/lite/Dockerfile index 59fd9297bbf..fff1414fab6 100644 --- a/docker/lite/Dockerfile +++ b/docker/lite/Dockerfile @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder +FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER diff --git a/docker/lite/Dockerfile.percona80 b/docker/lite/Dockerfile.percona80 index 953a42c9fea..a839fb35514 100644 --- a/docker/lite/Dockerfile.percona80 +++ b/docker/lite/Dockerfile.percona80 @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder +FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER diff --git a/docker/vttestserver/Dockerfile.mysql80 b/docker/vttestserver/Dockerfile.mysql80 index ed5d0b78acb..74b9564ef48 100644 --- a/docker/vttestserver/Dockerfile.mysql80 +++ b/docker/vttestserver/Dockerfile.mysql80 @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder +FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER diff --git a/go.mod b/go.mod index ccc4cc32b3b..a1ac96f5e34 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module vitess.io/vitess -go 1.23.3 +go 1.23.4 require ( cloud.google.com/go/storage v1.46.0 diff --git a/test.go b/test.go index 14f51a06e9d..95d62af892f 100755 --- a/test.go +++ b/test.go @@ -77,7 +77,7 @@ For example: // Flags var ( flavor = flag.String("flavor", "mysql80", "comma-separated bootstrap flavor(s) to run against (when using Docker mode). Available flavors: all,"+flavors) - bootstrapVersion = flag.String("bootstrap-version", "38", "the version identifier to use for the docker images") + bootstrapVersion = flag.String("bootstrap-version", "39", "the version identifier to use for the docker images") runCount = flag.Int("runs", 1, "run each test this many times") retryMax = flag.Int("retry", 3, "max number of retries, to detect flaky tests") logPass = flag.Bool("log-pass", false, "log test output even if it passes") diff --git a/test/templates/dockerfile.tpl b/test/templates/dockerfile.tpl index 82388850947..af4376d3ca9 100644 --- a/test/templates/dockerfile.tpl +++ b/test/templates/dockerfile.tpl @@ -1,4 +1,4 @@ -ARG bootstrap_version=38 +ARG bootstrap_version=39 ARG image="vitess/bootstrap:${bootstrap_version}-{{.Platform}}" FROM "${image}" From 5ace629a06b5a443fb16ec25ba6cc0c9cacfafc3 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Fri, 6 Dec 2024 19:35:18 +0530 Subject: [PATCH 2/8] test: Add missing unit tests in `vtctl/workflow` (#17304) Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/framework_test.go | 21 ++ go/vt/vtctl/workflow/resharder_test.go | 311 +++++++++++++++++++++++++ 2 files changed, 332 insertions(+) diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index 249ff07cf41..d84d1236075 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -271,6 +271,7 @@ type testTMClient struct { vrQueries map[int][]*queryResult createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse readVReplicationWorkflowRequests map[uint32]*readVReplicationWorkflowRequestResponse + updateVReplicationWorklowsRequests map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest applySchemaRequests map[uint32]*applySchemaRequestResponse primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -294,6 +295,7 @@ func newTestTMClient(env *testEnv) *testTMClient { vrQueries: make(map[int][]*queryResult), createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), readVReplicationWorkflowRequests: make(map[uint32]*readVReplicationWorkflowRequestResponse), + updateVReplicationWorklowsRequests: make(map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest), applySchemaRequests: make(map[uint32]*applySchemaRequestResponse), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), @@ -677,6 +679,19 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet }, nil } +func (tmc *testTMClient) UpdateVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if expect := tmc.updateVReplicationWorklowsRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, req) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request on tablet %s: got %+v, want %+v", + topoproto.TabletAliasString(tablet.Alias), req, expect) + } + } + delete(tmc.updateVReplicationWorklowsRequests, tablet.Alias.Uid) + return nil, nil +} + func (tmc *testTMClient) ValidateVReplicationPermissions(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ValidateVReplicationPermissionsRequest) (*tabletmanagerdatapb.ValidateVReplicationPermissionsResponse, error) { return &tabletmanagerdatapb.ValidateVReplicationPermissionsResponse{ User: "vt_filtered", @@ -736,6 +751,12 @@ func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabl tmc.readVReplicationWorkflowsResponses[key] = append(tmc.readVReplicationWorkflowsResponses[key], resp) } +func (tmc *testTMClient) AddUpdateVReplicationRequests(tabletUID uint32, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + tmc.updateVReplicationWorklowsRequests[tabletUID] = req +} + func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse { if len(tmc.readVReplicationWorkflowsResponses) == 0 { return nil diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go index 6353f36db9f..6fe1afb0c70 100644 --- a/go/vt/vtctl/workflow/resharder_test.go +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -22,14 +22,20 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/ptr" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtgate/vindexes" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -65,6 +71,8 @@ func TestReshardCreate(t *testing.T) { sourceKeyspace, targetKeyspace *testKeyspace preFunc func(env *testEnv) want *vtctldatapb.WorkflowStatusResponse + updateVReplicationRequest *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest + autoStart bool wantErr string }{ { @@ -77,6 +85,11 @@ func TestReshardCreate(t *testing.T) { KeyspaceName: targetKeyspaceName, ShardNames: []string{"-80", "80-"}, }, + autoStart: true, + updateVReplicationRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest{ + AllWorkflows: true, + State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Running), + }, want: &vtctldatapb.WorkflowStatusResponse{ ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{ "targetks/-80": { @@ -137,6 +150,7 @@ func TestReshardCreate(t *testing.T) { SourceShards: tc.sourceKeyspace.ShardNames, TargetShards: tc.targetKeyspace.ShardNames, Cells: []string{env.cell}, + AutoStart: tc.autoStart, } for i := range tc.sourceKeyspace.ShardNames { @@ -172,6 +186,9 @@ func TestReshardCreate(t *testing.T) { "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{}, ) + if tc.updateVReplicationRequest != nil { + env.tmc.AddUpdateVReplicationRequests(uint32(tabletUID), tc.updateVReplicationRequest) + } } if tc.preFunc != nil { @@ -187,6 +204,300 @@ func TestReshardCreate(t *testing.T) { if tc.want != nil { require.Equal(t, tc.want, res) } + + // Expect updateVReplicationWorklowsRequests to be empty, + // if AutoStart is enabled. This is because we delete the specific + // key from the map in the testTMC, once updateVReplicationWorklows() + // with the expected request is called. + if tc.autoStart { + assert.Len(t, env.tmc.updateVReplicationWorklowsRequests, 0) + } + }) + } +} + +func TestReadRefStreams(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceKeyspace", + ShardNames: []string{"-"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetKeyspace", + ShardNames: []string{"-"}, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + s1, err := env.ts.UpdateShardFields(ctx, targetKeyspace.KeyspaceName, "-", func(si *topo.ShardInfo) error { + return nil + }) + require.NoError(t, err) + + sourceTablet, ok := env.tablets[sourceKeyspace.KeyspaceName][100] + require.True(t, ok) + + env.tmc.schema = map[string]*tabletmanagerdatapb.SchemaDefinition{ + "t1": {}, + } + + rules := make([]*binlogdatapb.Rule, len(env.tmc.schema)) + for i, table := range maps.Keys(env.tmc.schema) { + rules[i] = &binlogdatapb.Rule{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + } + } + + refKey := fmt.Sprintf("wf:%s:-", sourceKeyspace.KeyspaceName) + + testCases := []struct { + name string + addVReplicationWorkflowsResponse *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse + preRefStreams map[string]*refStream + wantRefStreamKeys []string + wantErr bool + errContains string + }{ + { + name: "error for unnamed workflow", + addVReplicationWorkflowsResponse: &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Workflow: "", + WorkflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + }, + }, + }, + wantErr: true, + }, + { + name: "populate ref streams", + addVReplicationWorkflowsResponse: &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Workflow: "wf", + WorkflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspace.KeyspaceName, + Shard: "-", + Tables: maps.Keys(env.tmc.schema), + Filter: &binlogdatapb.Filter{ + Rules: rules, + }, + }, + }, + }, + }, + }, + }, + wantRefStreamKeys: []string{refKey}, + }, + { + name: "mismatched streams with empty map", + preRefStreams: map[string]*refStream{}, + addVReplicationWorkflowsResponse: &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Workflow: "wf", + WorkflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspace.KeyspaceName, + Shard: "-", + Tables: maps.Keys(env.tmc.schema), + Filter: &binlogdatapb.Filter{ + Rules: rules, + }, + }, + }, + }, + }, + }, + }, + wantErr: true, + errContains: "mismatch", + }, + { + name: "mismatched streams", + preRefStreams: map[string]*refStream{ + refKey: nil, + "nonexisting": nil, + }, + addVReplicationWorkflowsResponse: &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Workflow: "wf", + WorkflowType: binlogdatapb.VReplicationWorkflowType_Reshard, + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + + Bls: &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspace.KeyspaceName, + Shard: "-", + Tables: maps.Keys(env.tmc.schema), + Filter: &binlogdatapb.Filter{ + Rules: rules, + }, + }, + }, + }, + }, + }, + }, + wantErr: true, + errContains: "mismatch", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rs := &resharder{ + s: env.ws, + keyspace: targetKeyspace.KeyspaceName, + sourceShards: []*topo.ShardInfo{s1}, + sourcePrimaries: map[string]*topo.TabletInfo{ + "-": { + Tablet: sourceTablet, + }, + }, + workflow: "wf", + vschema: &vschemapb.Keyspace{ + Tables: map[string]*vschemapb.Table{ + "t1": { + Type: vindexes.TypeReference, + }, + }, + }, + refStreams: tc.preRefStreams, + } + + workflowKey := env.tmc.GetWorkflowKey(sourceKeyspace.KeyspaceName, "-") + + env.tmc.AddVReplicationWorkflowsResponse(workflowKey, tc.addVReplicationWorkflowsResponse) + + err := rs.readRefStreams(ctx) + if !tc.wantErr { + assert.NoError(t, err) + for _, rk := range tc.wantRefStreamKeys { + assert.Contains(t, rs.refStreams, rk) + } + return + } + + assert.Error(t, err) + assert.ErrorContains(t, err, tc.errContains) + }) + } +} + +func TestBlsIsReference(t *testing.T) { + testCases := []struct { + name string + bls *binlogdatapb.BinlogSource + tables map[string]*vschemapb.Table + expected bool + wantErr bool + errContains string + }{ + { + name: "all references", + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "ref_table1"}, + {Match: "ref_table2"}, + }, + }, + }, + tables: map[string]*vschemapb.Table{ + "ref_table1": {Type: vindexes.TypeReference}, + "ref_table2": {Type: vindexes.TypeReference}, + }, + expected: true, + }, + { + name: "all sharded", + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "sharded_table1"}, + {Match: "sharded_table2"}, + }, + }, + }, + tables: map[string]*vschemapb.Table{ + "sharded_table1": {Type: vindexes.TypeTable}, + "sharded_table2": {Type: vindexes.TypeTable}, + }, + expected: false, + }, + { + name: "mixed reference and sharded tables", + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "ref_table"}, + {Match: "sharded_table"}, + }, + }, + }, + tables: map[string]*vschemapb.Table{ + "ref_table": {Type: vindexes.TypeReference}, + "sharded_table": {Type: vindexes.TypeTable}, + }, + wantErr: true, + }, + { + name: "rule table not found in vschema", + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "unknown_table"}, + }, + }, + }, + tables: map[string]*vschemapb.Table{}, + wantErr: true, + errContains: "unknown_table", + }, + { + name: "internal operation table ignored", + bls: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + {Match: "_vt_hld_6ace8bcef73211ea87e9f875a4d24e90_20200915120410_"}, + }, + }, + }, + tables: map[string]*vschemapb.Table{}, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rs := &resharder{ + vschema: &vschemapb.Keyspace{ + Tables: tc.tables, + }, + } + + result, err := rs.blsIsReference(tc.bls) + + if tc.wantErr { + assert.ErrorContains(t, err, tc.errContains) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expected, result) + } }) } } From 46752440821c2c70526e5ea20084730f3f9e8e72 Mon Sep 17 00:00:00 2001 From: Noble Mittal <62551163+beingnoble03@users.noreply.github.com> Date: Fri, 6 Dec 2024 19:36:44 +0530 Subject: [PATCH 3/8] refac: Refactor `Server.GetWorkflows()` (#17092) Signed-off-by: Noble Mittal Signed-off-by: Rohit Nayak Co-authored-by: Rohit Nayak --- go/vt/vtctl/workflow/server.go | 597 +--------------------- go/vt/vtctl/workflow/server_test.go | 62 ++- go/vt/vtctl/workflow/workflows.go | 672 +++++++++++++++++++++++++ go/vt/vtctl/workflow/workflows_test.go | 260 ++++++++++ 4 files changed, 1008 insertions(+), 583 deletions(-) create mode 100644 go/vt/vtctl/workflow/workflows.go create mode 100644 go/vt/vtctl/workflow/workflows_test.go diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 7c49de58c9b..6c538a48f75 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -18,7 +18,6 @@ package workflow import ( "context" - "encoding/json" "errors" "fmt" "math" @@ -41,11 +40,9 @@ import ( "vitess.io/vitess/go/constants/sidecar" "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/ptr" - "vitess.io/vitess/go/sets" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" @@ -58,7 +55,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/schematools" - "vitess.io/vitess/go/vt/vtctl/workflow/common" "vitess.io/vitess/go/vt/vtctl/workflow/vexec" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" @@ -406,546 +402,28 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows span.Annotate("include_logs", req.IncludeLogs) span.Annotate("shards", req.Shards) - readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} - if req.Workflow != "" { - readReq.IncludeWorkflows = []string{req.Workflow} + w := &workflowFetcher{ + ts: s.ts, + tmc: s.tmc, + parser: s.SQLParser(), + logger: s.Logger(), } - if req.ActiveOnly { - readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} - } - - // Guards access to the maps used throughout. - m := sync.Mutex{} - shards, err := common.GetShards(ctx, s.ts, req.Keyspace, req.Shards) + workflowsByShard, err := w.fetchWorkflowsByShard(ctx, req) if err != nil { return nil, err } - results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) - readWorkflowsEg, readWorkflowsCtx := errgroup.WithContext(ctx) - for _, shard := range shards { - readWorkflowsEg.Go(func() error { - si, err := s.ts.GetShard(readWorkflowsCtx, req.Keyspace, shard) - if err != nil { - return err - } - if si.PrimaryAlias == nil { - return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, req.Keyspace, shard) - } - primary, err := s.ts.GetTablet(readWorkflowsCtx, si.PrimaryAlias) - if err != nil { - return err - } - if primary == nil { - return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, shard, topoproto.TabletAliasString(si.PrimaryAlias)) - } - // Clone the request so that we can set the correct DB name for tablet. - req := readReq.CloneVT() - wres, err := s.tmc.ReadVReplicationWorkflows(readWorkflowsCtx, primary.Tablet, req) - if err != nil { - return err - } - m.Lock() - defer m.Unlock() - results[primary] = wres - return nil - }) - } - if readWorkflowsEg.Wait() != nil { - return nil, err - } - - copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(results)) - - fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { - span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchCopyStates") - defer span.Finish() - - span.Annotate("keyspace", req.Keyspace) - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - - copyStates, err := s.getWorkflowCopyStates(ctx, tablet, streamIds) - if err != nil { - return err - } - m.Lock() - defer m.Unlock() - - for _, copyState := range copyStates { - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) - copyStatesByShardStreamId[shardStreamId] = append( - copyStatesByShardStreamId[shardStreamId], - copyState, - ) - } - - return nil - } - - fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) - for tablet, result := range results { - tablet := tablet // loop closure - - streamIds := make([]int32, 0, len(result.Workflows)) - for _, wf := range result.Workflows { - for _, stream := range wf.Streams { - streamIds = append(streamIds, stream.Id) - } - } - - if len(streamIds) == 0 { - continue - } - - fetchCopyStatesEg.Go(func() error { - return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) - }) - } - - if err := fetchCopyStatesEg.Wait(); err != nil { + copyStatesByShardStreamId, err := w.fetchCopyStatesByShardStream(ctx, workflowsByShard) + if err != nil { return nil, err } - workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) - sourceKeyspaceByWorkflow := make(map[string]string, len(results)) - sourceShardsByWorkflow := make(map[string]sets.Set[string], len(results)) - targetKeyspaceByWorkflow := make(map[string]string, len(results)) - targetShardsByWorkflow := make(map[string]sets.Set[string], len(results)) - maxVReplicationLagByWorkflow := make(map[string]float64, len(results)) - maxVReplicationTransactionLagByWorkflow := make(map[string]float64, len(results)) - - // We guarantee the following invariants when this function is called for a - // given workflow: - // - workflow.Name != "" (more precisely, ".Name is set 'properly'") - // - workflowsMap[workflow.Name] == workflow - // - sourceShardsByWorkflow[workflow.Name] != nil - // - targetShardsByWorkflow[workflow.Name] != nil - // - workflow.ShardStatuses != nil - scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, tablet *topo.TabletInfo) error { - // This is not called concurrently, but we still protect the maps to ensure - // that we're concurrency-safe in the face of future changes (e.g. where other - // things are running concurrently with this which also access these maps). - m.Lock() - defer m.Unlock() - for _, rstream := range res.Streams { - // The value in the pos column can be compressed and thus not - // have a valid GTID consisting of valid UTF-8 characters so we - // have to decode it so that it's properly decompressed first - // when needed. - pos := rstream.Pos - if pos != "" { - mpos, err := binlogplayer.DecodePosition(pos) - if err != nil { - return err - } - pos = mpos.String() - } - - cells := strings.Split(res.Cells, ",") - for i := range cells { - cells[i] = strings.TrimSpace(cells[i]) - } - options := res.Options - if options != "" { - if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { - return err - } - } - stream := &vtctldatapb.Workflow_Stream{ - Id: int64(rstream.Id), - Shard: tablet.Shard, - Tablet: tablet.Alias, - BinlogSource: rstream.Bls, - Position: pos, - StopPosition: rstream.StopPos, - State: rstream.State.String(), - DbName: tablet.DbName(), - TabletTypes: res.TabletTypes, - TabletSelectionPreference: res.TabletSelectionPreference, - Cells: cells, - TransactionTimestamp: rstream.TransactionTimestamp, - TimeUpdated: rstream.TimeUpdated, - Message: rstream.Message, - Tags: strings.Split(res.Tags, ","), - RowsCopied: rstream.RowsCopied, - ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ - ComponentThrottled: rstream.ComponentThrottled, - TimeThrottled: rstream.TimeThrottled, - }, - } - - // Merge in copy states, which we've already fetched. - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) - if copyState, ok := copyStatesByShardStreamId[shardStreamId]; ok { - stream.CopyStates = copyState - } - - if rstream.TimeUpdated == nil { - rstream.TimeUpdated = &vttimepb.Time{} - } - - switch { - case strings.Contains(strings.ToLower(stream.Message), "error"): - stream.State = binlogdatapb.VReplicationWorkflowState_Error.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: - stream.State = binlogdatapb.VReplicationWorkflowState_Copying.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: - stream.State = binlogdatapb.VReplicationWorkflowState_Lagging.String() - } - - shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) - shardStream, ok := workflow.ShardStreams[shardStreamKey] - if !ok { - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - - si, err := s.ts.GetShard(ctx, req.Keyspace, tablet.Shard) - if err != nil { - return err - } - - shardStream = &vtctldatapb.Workflow_ShardStream{ - Streams: nil, - TabletControls: si.TabletControls, - IsPrimaryServing: si.IsPrimaryServing, - } - - workflow.ShardStreams[shardStreamKey] = shardStream - } - - shardStream.Streams = append(shardStream.Streams, stream) - sourceShardsByWorkflow[workflow.Name].Insert(stream.BinlogSource.Shard) - targetShardsByWorkflow[workflow.Name].Insert(tablet.Shard) - - if ks, ok := sourceKeyspaceByWorkflow[workflow.Name]; ok && ks != stream.BinlogSource.Keyspace { - return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, ks, stream.BinlogSource.Keyspace) - } - - sourceKeyspaceByWorkflow[workflow.Name] = stream.BinlogSource.Keyspace - - if ks, ok := targetKeyspaceByWorkflow[workflow.Name]; ok && ks != tablet.Keyspace { - return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, ks, tablet.Keyspace) - } - - targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace - - if stream.TimeUpdated == nil { - stream.TimeUpdated = &vttimepb.Time{} - } - timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) - vreplicationLag := time.Since(timeUpdated) - - // MaxVReplicationLag represents the time since we last processed any event - // in the workflow. - if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok { - if vreplicationLag.Seconds() > currentMaxLag { - maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() - } - } else { - maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() - } - - workflow.WorkflowType = res.WorkflowType.String() - workflow.WorkflowSubType = res.WorkflowSubType.String() - workflow.DeferSecondaryKeys = res.DeferSecondaryKeys - - // MaxVReplicationTransactionLag estimates the actual statement processing lag - // between the source and the target. If we are still processing source events it - // is the difference b/w current time and the timestamp of the last event. If - // heartbeats are more recent than the last event, then the lag is the time since - // the last heartbeat as there can be an actual event immediately after the - // heartbeat, but which has not yet been processed on the target. - // We don't allow switching during the copy phase, so in that case we just return - // a large lag. All timestamps are in seconds since epoch. - if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0 - } - if rstream.TransactionTimestamp == nil { - rstream.TransactionTimestamp = &vttimepb.Time{} - } - lastTransactionTime := rstream.TransactionTimestamp.Seconds - if rstream.TimeHeartbeat == nil { - rstream.TimeHeartbeat = &vttimepb.Time{} - } - lastHeartbeatTime := rstream.TimeHeartbeat.Seconds - if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64 - } else { - if lastTransactionTime == 0 /* no new events after copy */ || - lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { - - lastTransactionTime = lastHeartbeatTime - } - now := time.Now().Unix() /* seconds since epoch */ - transactionReplicationLag := float64(now - lastTransactionTime) - if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag - } - } - } - - return nil - } - - for tablet, result := range results { - // In the old implementation, we knew we had at most one (0 <= N <= 1) - // workflow for each shard primary we queried. There might be multiple - // rows (streams) comprising that workflow, so we would aggregate the - // rows for a given primary into a single value ("the workflow", - // ReplicationStatusResult in the old types). - // - // In this version, we have many (N >= 0) workflows for each shard - // primary we queried, so we need to determine if each row corresponds - // to a workflow we're already aggregating, or if it's a workflow we - // haven't seen yet for that shard primary. We use the workflow name to - // dedupe for this. - for _, wfres := range result.Workflows { - workflowName := wfres.Workflow - workflow, ok := workflowsMap[workflowName] - if !ok { - workflow = &vtctldatapb.Workflow{ - Name: workflowName, - ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, - } - - workflowsMap[workflowName] = workflow - sourceShardsByWorkflow[workflowName] = sets.New[string]() - targetShardsByWorkflow[workflowName] = sets.New[string]() - } - - if err := scanWorkflow(ctx, workflow, wfres, tablet); err != nil { - return nil, err - } - } - } - - var ( - fetchLogsWG sync.WaitGroup - vrepLogQuery = strings.TrimSpace(` -SELECT - id, - vrepl_id, - type, - state, - message, - created_at, - updated_at, - count -FROM - _vt.vreplication_log -WHERE vrepl_id IN %a -ORDER BY - vrepl_id ASC, - id ASC -`) - ) - - fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchStreamLogs") - defer span.Finish() - - span.Annotate("keyspace", req.Keyspace) - span.Annotate("workflow", workflow.Name) - - vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) - for _, shardStream := range maps.Values(workflow.ShardStreams) { - for _, stream := range shardStream.Streams { - vreplIDs = append(vreplIDs, stream.Id) - } - } - idsBV, err := sqltypes.BuildBindVariable(vreplIDs) - if err != nil { - return - } - - query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) - if err != nil { - return - } - - vx := vexec.NewVExec(req.Keyspace, workflow.Name, s.ts, s.tmc, s.SQLParser()) - results, err := vx.QueryContext(ctx, query) - if err != nil { - // Note that we do not return here. If there are any query results - // in the map (i.e. some tablets returned successfully), we will - // still try to read log rows from them on a best-effort basis. But, - // we will also pre-emptively record the top-level fetch error on - // every stream in every shard in the workflow. Further processing - // below may override the error message for certain streams. - for _, streams := range workflow.ShardStreams { - for _, stream := range streams.Streams { - stream.LogFetchError = err.Error() - } - } - } - - for target, p3qr := range results { - qr := sqltypes.Proto3ToResult(p3qr) - shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) - - ss, ok := workflow.ShardStreams[shardStreamKey] - if !ok || ss == nil { - continue - } - - streams := ss.Streams - streamIdx := 0 - markErrors := func(err error) { - if streamIdx >= len(streams) { - return - } - - streams[streamIdx].LogFetchError = err.Error() - } - - for _, row := range qr.Rows { - id, err := row[0].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamID, err := row[1].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - typ := row[2].ToString() - state := row[3].ToString() - message := row[4].ToString() - - createdAt, err := time.Parse("2006-01-02 15:04:05", row[5].ToString()) - if err != nil { - markErrors(err) - continue - } - - updatedAt, err := time.Parse("2006-01-02 15:04:05", row[6].ToString()) - if err != nil { - markErrors(err) - continue - } - - count, err := row[7].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamLog := &vtctldatapb.Workflow_Stream_Log{ - Id: id, - StreamId: streamID, - Type: typ, - State: state, - CreatedAt: &vttimepb.Time{ - Seconds: createdAt.Unix(), - }, - UpdatedAt: &vttimepb.Time{ - Seconds: updatedAt.Unix(), - }, - Message: message, - Count: count, - } - - // Earlier, in the main loop where we called scanWorkflow for - // each _vt.vreplication row, we also sorted each ShardStreams - // slice by ascending id, and our _vt.vreplication_log query - // ordered by (stream_id ASC, id ASC), so we can walk the - // streams in index order in O(n) amortized over all the rows - // for this tablet. - for streamIdx < len(streams) { - stream := streams[streamIdx] - if stream.Id < streamLog.StreamId { - streamIdx++ - continue - } - - if stream.Id > streamLog.StreamId { - s.Logger().Warningf("Found stream log for nonexistent stream: %+v", streamLog) - // This can happen on manual/failed workflow cleanup so move to the next log. - break - } - - // stream.Id == streamLog.StreamId - stream.Logs = append(stream.Logs, streamLog) - break - } - } - } - } - - workflows := make([]*vtctldatapb.Workflow, 0, len(workflowsMap)) - - for name, workflow := range workflowsMap { - sourceShards, ok := sourceShardsByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no source shards", name) - } - - sourceKeyspace, ok := sourceKeyspaceByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no source keyspace", name) - } - - targetShards, ok := targetShardsByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no target shards", name) - } - - targetKeyspace, ok := targetKeyspaceByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no target keyspace", name) - } - - maxVReplicationLag, ok := maxVReplicationLagByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no tracked vreplication lag", name) - } - - maxVReplicationTransactionLag, ok := maxVReplicationTransactionLagByWorkflow[name] - if !ok { - return nil, vterrors.Wrapf(ErrInvalidWorkflow, "%s has no tracked vreplication transaction lag", name) - } - - workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: sourceKeyspace, - Shards: sets.List(sourceShards), - } - - workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: targetKeyspace, - Shards: sets.List(targetShards), - } - - workflow.MaxVReplicationLag = int64(maxVReplicationLag) - workflow.MaxVReplicationTransactionLag = int64(maxVReplicationTransactionLag) - - // Sort shard streams by stream_id ASC, to support an optimization - // in fetchStreamLogs below. - for _, shardStreams := range workflow.ShardStreams { - sort.Slice(shardStreams.Streams, func(i, j int) bool { - return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id - }) - } - - workflows = append(workflows, workflow) - - if req.IncludeLogs { - // Fetch logs for all streams associated with this workflow in the background. - fetchLogsWG.Add(1) - go func(ctx context.Context, workflow *vtctldatapb.Workflow) { - defer fetchLogsWG.Done() - fetchStreamLogs(ctx, workflow) - }(ctx, workflow) - } + workflows, err := w.buildWorkflows(ctx, workflowsByShard, copyStatesByShardStreamId, req) + if err != nil { + return nil, err } - // Wait for all the log fetchers to finish. - fetchLogsWG.Wait() - return &vtctldatapb.GetWorkflowsResponse{ Workflows: workflows, }, nil @@ -1080,51 +558,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN return ts, state, nil } -func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates") - defer span.Finish() - - span.Annotate("keyspace", tablet.Keyspace) - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) - - idsBV, err := sqltypes.BuildBindVariable(streamIds) - if err != nil { - return nil, err - } - query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", - idsBV, idsBV) - if err != nil { - return nil, err - } - qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query) - if err != nil { - return nil, err - } - - result := sqltypes.Proto3ToResult(qr) - if result == nil { - return nil, nil - } - - copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) - for i, row := range result.Rows { - streamId, err := row[0].ToInt64() - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) - } - // These string fields are technically varbinary, but this is close enough. - copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ - StreamId: streamId, - Table: row[1].ToString(), - LastPk: row[2].ToString(), - } - } - - return copyStates, nil -} - // LookupVindexCreate creates the lookup vindex in the specified // keyspace and creates a VReplication workflow to backfill that // vindex from the keyspace to the target/lookup table specified. @@ -1545,7 +978,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms - if err := s.setupInitialDeniedTables(ctx, ts); err != nil { + if err := setupInitialDeniedTables(ctx, ts); err != nil { return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards") } } @@ -1600,7 +1033,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl }) } -func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error { +func validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error { if mz.IsMultiTenantMigration() { switch { case req.NoRoutingRules: @@ -1612,7 +1045,7 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque return nil } -func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { +func setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error { if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { return nil } @@ -1630,7 +1063,7 @@ func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitch } func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error { - if err := s.validateRoutingRuleFlags(req, mz); err != nil { + if err := validateRoutingRuleFlags(req, mz); err != nil { return err } diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index dbe06ab1a47..26d722f1de0 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -2470,7 +2470,7 @@ func TestGetWorkflowsStreamLogs(t *testing.T) { }, sourceShards, targetShards) logResult := sqltypes.MakeTestResult( - sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|`count`", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), + sqltypes.MakeTestFields("id|vrepl_id|type|state|message|created_at|updated_at|count", "int64|int64|varchar|varchar|varchar|varchar|varchar|int64"), "1|0|State Change|Running|test message for non-existent 1|2006-01-02 15:04:05|2006-01-02 15:04:05|1", "2|0|State Change|Stopped|test message for non-existent 2|2006-01-02 15:04:06|2006-01-02 15:04:06|1", "3|1|State Change|Running|log message|2006-01-02 15:04:07|2006-01-02 15:04:07|1", @@ -2499,3 +2499,63 @@ func TestGetWorkflowsStreamLogs(t *testing.T) { assert.Equal(t, gotLogs[0].State, "Running") assert.Equal(t, gotLogs[0].Id, int64(3)) } + +func TestWorkflowStatus(t *testing.T) { + ctx := context.Background() + + sourceKeyspace := "source_keyspace" + targetKeyspace := "target_keyspace" + workflow := "test_workflow" + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: sourceKeyspace, + TargetKeyspace: targetKeyspace, + Workflow: workflow, + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + tablesResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name", "varchar"), "table1", "table2") + te.tmc.expectVRQuery(200, "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", tablesResult) + + tablesTargetCopyResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|table_rows|data_length", "varchar|int64|int64"), "table1|50|500", "table2|100|250") + te.tmc.expectVRQuery(200, "select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_target_keyspace' and table_name in ('table1','table2')", tablesTargetCopyResult) + + tablesSourceCopyResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|table_rows|data_length", "varchar|int64|int64"), "table1|100|1000", "table2|200|500") + te.tmc.expectVRQuery(100, "select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_source_keyspace' and table_name in ('table1','table2')", tablesSourceCopyResult) + + te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{}) + + res, err := te.ws.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{ + Keyspace: targetKeyspace, + Workflow: workflow, + Shards: targetShards, + }) + + assert.NoError(t, err) + + require.NotNil(t, res.TableCopyState) + + stateTable1 := res.TableCopyState["table1"] + stateTable2 := res.TableCopyState["table2"] + require.NotNil(t, stateTable1) + require.NotNil(t, stateTable2) + + assert.Equal(t, int64(100), stateTable1.RowsTotal) + assert.Equal(t, int64(200), stateTable2.RowsTotal) + assert.Equal(t, int64(50), stateTable1.RowsCopied) + assert.Equal(t, int64(100), stateTable2.RowsCopied) + assert.Equal(t, float32(50), stateTable1.RowsPercentage) + assert.Equal(t, float32(50), stateTable2.RowsPercentage) +} diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go new file mode 100644 index 00000000000..da0ee5dfec7 --- /dev/null +++ b/go/vt/vtctl/workflow/workflows.go @@ -0,0 +1,672 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file provides functions for fetching and retrieving information about VReplication workflows + +At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to +get the following: +- Fetch workflows by shard +- Fetch copy states by shard stream +- Build workflows with metadata +- Fetch stream logs +*/ + +package workflow + +import ( + "context" + "encoding/json" + "fmt" + "math" + "sort" + "strings" + "sync" + "time" + + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + + "vitess.io/vitess/go/sets" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/workflow/common" + "vitess.io/vitess/go/vt/vtctl/workflow/vexec" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" +) + +// workflowFetcher is responsible for fetching and retrieving information +// about VReplication workflows. +type workflowFetcher struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + + logger logutil.Logger + parser *sqlparser.Parser +} + +type workflowMetadata struct { + sourceKeyspace string + sourceShards sets.Set[string] + targetKeyspace string + targetShards sets.Set[string] + maxVReplicationLag float64 + maxVReplicationTransactionLag float64 +} + +var vrepLogQuery = strings.TrimSpace(` +SELECT + id, + vrepl_id, + type, + state, + message, + created_at, + updated_at, + count +FROM + _vt.vreplication_log +WHERE vrepl_id IN %a +ORDER BY + vrepl_id ASC, + id ASC +`) + +func (wf *workflowFetcher) fetchWorkflowsByShard( + ctx context.Context, + req *vtctldatapb.GetWorkflowsRequest, +) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} + if req.Workflow != "" { + readReq.IncludeWorkflows = []string{req.Workflow} + } + if req.ActiveOnly { + readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} + } + + m := sync.Mutex{} + + shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards) + if err != nil { + return nil, err + } + + results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) + + err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error { + primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias) + if err != nil { + return err + } + if primary == nil { + return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias)) + } + // Clone the request so that we can set the correct DB name for tablet. + req := readReq.CloneVT() + wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req) + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + results[primary] = wres + return nil + }) + if err != nil { + return nil, err + } + + return results, nil +} + +func (wf *workflowFetcher) fetchCopyStatesByShardStream( + ctx context.Context, + workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, +) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) { + m := sync.Mutex{} + + copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard)) + + fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates") + defer span.Finish() + + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + + copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds) + if err != nil { + return err + } + + m.Lock() + defer m.Unlock() + + for _, copyState := range copyStates { + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) + copyStatesByShardStreamId[shardStreamId] = append( + copyStatesByShardStreamId[shardStreamId], + copyState, + ) + } + + return nil + } + + fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) + for tablet, result := range workflowsByShard { + streamIds := make([]int32, 0, len(result.Workflows)) + for _, wf := range result.Workflows { + for _, stream := range wf.Streams { + streamIds = append(streamIds, stream.Id) + } + } + + if len(streamIds) == 0 { + continue + } + + fetchCopyStatesEg.Go(func() error { + return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) + }) + } + if err := fetchCopyStatesEg.Wait(); err != nil { + return nil, err + } + + return copyStatesByShardStreamId, nil +} + +func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates") + defer span.Finish() + + span.Annotate("keyspace", tablet.Keyspace) + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) + + idsBV, err := sqltypes.BuildBindVariable(streamIds) + if err != nil { + return nil, err + } + query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", + idsBV, idsBV) + if err != nil { + return nil, err + } + qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query) + if err != nil { + return nil, err + } + + result := sqltypes.Proto3ToResult(qr) + if result == nil { + return nil, nil + } + + copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) + for i, row := range result.Named().Rows { + streamId, err := row["vrepl_id"].ToInt64() + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) + } + // These string fields are technically varbinary, but this is close enough. + copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ + StreamId: streamId, + Table: row["table_name"].ToString(), + LastPk: row["lastpk"].ToString(), + } + } + + return copyStates, nil +} + +func (wf *workflowFetcher) buildWorkflows( + ctx context.Context, + results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + req *vtctldatapb.GetWorkflowsRequest, +) ([]*vtctldatapb.Workflow, error) { + workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) + workflowMetadataMap := make(map[string]*workflowMetadata, len(results)) + + for tablet, result := range results { + // In the old implementation, we knew we had at most one (0 <= N <= 1) + // workflow for each shard primary we queried. There might be multiple + // rows (streams) comprising that workflow, so we would aggregate the + // rows for a given primary into a single value ("the workflow", + // ReplicationStatusResult in the old types). + // + // In this version, we have many (N >= 0) workflows for each shard + // primary we queried, so we need to determine if each row corresponds + // to a workflow we're already aggregating, or if it's a workflow we + // haven't seen yet for that shard primary. We use the workflow name to + // dedupe for this. + for _, wfres := range result.Workflows { + workflowName := wfres.Workflow + workflow, ok := workflowsMap[workflowName] + if !ok { + workflow = &vtctldatapb.Workflow{ + Name: workflowName, + ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, + } + + workflowsMap[workflowName] = workflow + workflowMetadataMap[workflowName] = &workflowMetadata{ + sourceShards: sets.New[string](), + targetShards: sets.New[string](), + } + } + + metadata := workflowMetadataMap[workflowName] + err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace) + if err != nil { + return nil, err + } + } + } + + for name, workflow := range workflowsMap { + meta := workflowMetadataMap[name] + updateWorkflowWithMetadata(workflow, meta) + + // Sort shard streams by stream_id ASC, to support an optimization + // in fetchStreamLogs below. + for _, shardStreams := range workflow.ShardStreams { + sort.Slice(shardStreams.Streams, func(i, j int) bool { + return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id + }) + } + } + + if req.IncludeLogs { + var fetchLogsWG sync.WaitGroup + + for _, workflow := range workflowsMap { + // Fetch logs for all streams associated with this workflow in the background. + fetchLogsWG.Add(1) + go func(ctx context.Context, workflow *vtctldatapb.Workflow) { + defer fetchLogsWG.Done() + wf.fetchStreamLogs(ctx, req.Keyspace, workflow) + }(ctx, workflow) + } + + // Wait for all the log fetchers to finish. + fetchLogsWG.Wait() + } + + return maps.Values(workflowsMap), nil +} + +func (wf *workflowFetcher) scanWorkflow( + ctx context.Context, + workflow *vtctldatapb.Workflow, + res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, + tablet *topo.TabletInfo, + meta *workflowMetadata, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + keyspace string, +) error { + shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) + shardStream, ok := workflow.ShardStreams[shardStreamKey] + if !ok { + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard) + if err != nil { + return err + } + + shardStream = &vtctldatapb.Workflow_ShardStream{ + Streams: nil, + TabletControls: si.TabletControls, + IsPrimaryServing: si.IsPrimaryServing, + } + + workflow.ShardStreams[shardStreamKey] = shardStream + } + + for _, rstream := range res.Streams { + // The value in the pos column can be compressed and thus not + // have a valid GTID consisting of valid UTF-8 characters so we + // have to decode it so that it's properly decompressed first + // when needed. + pos := rstream.Pos + if pos != "" { + mpos, err := binlogplayer.DecodePosition(pos) + if err != nil { + return err + } + pos = mpos.String() + } + + cells := strings.Split(res.Cells, ",") + for i := range cells { + cells[i] = strings.TrimSpace(cells[i]) + } + options := res.Options + if options != "" { + if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { + return err + } + } + + stream := &vtctldatapb.Workflow_Stream{ + Id: int64(rstream.Id), + Shard: tablet.Shard, + Tablet: tablet.Alias, + BinlogSource: rstream.Bls, + Position: pos, + StopPosition: rstream.StopPos, + State: rstream.State.String(), + DbName: tablet.DbName(), + TabletTypes: res.TabletTypes, + TabletSelectionPreference: res.TabletSelectionPreference, + Cells: cells, + TransactionTimestamp: rstream.TransactionTimestamp, + TimeUpdated: rstream.TimeUpdated, + Message: rstream.Message, + Tags: strings.Split(res.Tags, ","), + RowsCopied: rstream.RowsCopied, + ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ + ComponentThrottled: rstream.ComponentThrottled, + TimeThrottled: rstream.TimeThrottled, + }, + } + + // Merge in copy states, which we've already fetched. + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) + if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok { + stream.CopyStates = copyStates + } + + if rstream.TimeUpdated == nil { + rstream.TimeUpdated = &vttimepb.Time{} + } + + stream.State = getStreamState(stream, rstream) + + shardStream.Streams = append(shardStream.Streams, stream) + + meta.sourceShards.Insert(stream.BinlogSource.Shard) + meta.targetShards.Insert(tablet.Shard) + + if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace { + return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace) + } + + meta.sourceKeyspace = stream.BinlogSource.Keyspace + + if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace { + return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace) + } + + meta.targetKeyspace = tablet.Keyspace + + if stream.TimeUpdated == nil { + stream.TimeUpdated = &vttimepb.Time{} + } + timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) + vreplicationLag := time.Since(timeUpdated) + + // MaxVReplicationLag represents the time since we last processed any event + // in the workflow. + if vreplicationLag.Seconds() > meta.maxVReplicationLag { + meta.maxVReplicationLag = vreplicationLag.Seconds() + } + + workflow.WorkflowType = res.WorkflowType.String() + workflow.WorkflowSubType = res.WorkflowSubType.String() + workflow.DeferSecondaryKeys = res.DeferSecondaryKeys + + // MaxVReplicationTransactionLag estimates the actual statement processing lag + // between the source and the target. If we are still processing source events it + // is the difference b/w current time and the timestamp of the last event. If + // heartbeats are more recent than the last event, then the lag is the time since + // the last heartbeat as there can be an actual event immediately after the + // heartbeat, but which has not yet been processed on the target. + // We don't allow switching during the copy phase, so in that case we just return + // a large lag. All timestamps are in seconds since epoch. + if rstream.TransactionTimestamp == nil { + rstream.TransactionTimestamp = &vttimepb.Time{} + } + lastTransactionTime := rstream.TransactionTimestamp.Seconds + if rstream.TimeHeartbeat == nil { + rstream.TimeHeartbeat = &vttimepb.Time{} + } + lastHeartbeatTime := rstream.TimeHeartbeat.Seconds + if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { + meta.maxVReplicationTransactionLag = math.MaxInt64 + } else { + if lastTransactionTime == 0 /* no new events after copy */ || + lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { + + lastTransactionTime = lastHeartbeatTime + } + now := time.Now().Unix() /* seconds since epoch */ + transactionReplicationLag := float64(now - lastTransactionTime) + if transactionReplicationLag > meta.maxVReplicationTransactionLag { + meta.maxVReplicationTransactionLag = transactionReplicationLag + } + } + } + + return nil +} + +func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) { + workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.sourceKeyspace, + Shards: sets.List(meta.sourceShards), + } + + workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.targetKeyspace, + Shards: sets.List(meta.targetShards), + } + + workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag) + workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag) +} + +func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs") + defer span.Finish() + + span.Annotate("keyspace", keyspace) + span.Annotate("workflow", workflow.Name) + + vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) + for _, shardStream := range maps.Values(workflow.ShardStreams) { + for _, stream := range shardStream.Streams { + vreplIDs = append(vreplIDs, stream.Id) + } + } + idsBV, err := sqltypes.BuildBindVariable(vreplIDs) + if err != nil { + return + } + + query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) + if err != nil { + return + } + + vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser) + results, err := vx.QueryContext(ctx, query) + if err != nil { + // Note that we do not return here. If there are any query results + // in the map (i.e. some tablets returned successfully), we will + // still try to read log rows from them on a best-effort basis. But, + // we will also pre-emptively record the top-level fetch error on + // every stream in every shard in the workflow. Further processing + // below may override the error message for certain streams. + for _, streams := range workflow.ShardStreams { + for _, stream := range streams.Streams { + stream.LogFetchError = err.Error() + } + } + } + + for target, p3qr := range results { + qr := sqltypes.Proto3ToResult(p3qr) + shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) + + ss, ok := workflow.ShardStreams[shardStreamKey] + if !ok || ss == nil { + continue + } + + streams := ss.Streams + streamIdx := 0 + markErrors := func(err error) { + if streamIdx >= len(streams) { + return + } + + streams[streamIdx].LogFetchError = err.Error() + } + + for _, row := range qr.Named().Rows { + id, err := row["id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamID, err := row["vrepl_id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + typ := row["type"].ToString() + state := row["state"].ToString() + message := row["message"].ToString() + + createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + count, err := row["count"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamLog := &vtctldatapb.Workflow_Stream_Log{ + Id: id, + StreamId: streamID, + Type: typ, + State: state, + CreatedAt: &vttimepb.Time{ + Seconds: createdAt.Unix(), + }, + UpdatedAt: &vttimepb.Time{ + Seconds: updatedAt.Unix(), + }, + Message: message, + Count: count, + } + + // Earlier, in buildWorkflows, we sorted each ShardStreams + // slice by ascending id, and our _vt.vreplication_log query + // ordered by (stream_id ASC, id ASC), so we can walk the + // streams in index order in O(n) amortized over all the rows + // for this tablet. + for streamIdx < len(streams) { + stream := streams[streamIdx] + if stream.Id < streamLog.StreamId { + streamIdx++ + continue + } + + if stream.Id > streamLog.StreamId { + wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog) + // This can happen on manual/failed workflow cleanup so move to the next log. + break + } + + // stream.Id == streamLog.StreamId + stream.Logs = append(stream.Logs, streamLog) + break + } + } + } +} + +func (wf *workflowFetcher) forAllShards( + ctx context.Context, + keyspace string, + shards []string, + f func(ctx context.Context, shard *topo.ShardInfo) error, +) error { + eg, egCtx := errgroup.WithContext(ctx) + for _, shard := range shards { + eg.Go(func() error { + si, err := wf.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return err + } + if si.PrimaryAlias == nil { + return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard) + } + + if err := f(egCtx, si); err != nil { + return err + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + return nil +} + +func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string { + switch { + case strings.Contains(strings.ToLower(stream.Message), "error"): + return binlogdatapb.VReplicationWorkflowState_Error.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: + return binlogdatapb.VReplicationWorkflowState_Copying.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: + return binlogdatapb.VReplicationWorkflowState_Lagging.String() + } + return rstream.State.String() +} diff --git a/go/vt/vtctl/workflow/workflows_test.go b/go/vt/vtctl/workflow/workflows_test.go new file mode 100644 index 00000000000..2015c8d1b7c --- /dev/null +++ b/go/vt/vtctl/workflow/workflows_test.go @@ -0,0 +1,260 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +func TestGetStreamState(t *testing.T) { + testCases := []struct { + name string + stream *vtctldatapb.Workflow_Stream + rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream + want string + }{ + { + name: "error state", + stream: &vtctldatapb.Workflow_Stream{ + Message: "test error", + }, + want: "Error", + }, + { + name: "copying state", + stream: &vtctldatapb.Workflow_Stream{ + State: "Running", + CopyStates: []*vtctldatapb.Workflow_Stream_CopyState{ + { + Table: "table1", + }, + }, + }, + want: "Copying", + }, + { + name: "lagging state", + stream: &vtctldatapb.Workflow_Stream{ + State: "Running", + }, + rstream: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + TimeUpdated: &vttime.Time{ + Seconds: int64(time.Now().Second()) - 11, + }, + }, + want: "Lagging", + }, + { + name: "non-running and error free", + stream: &vtctldatapb.Workflow_Stream{ + State: "Stopped", + }, + rstream: &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + State: binlogdata.VReplicationWorkflowState_Stopped, + }, + want: "Stopped", + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + state := getStreamState(tt.stream, tt.rstream) + assert.Equal(t, tt.want, state) + }) + } +} + +func TestGetWorkflowCopyStates(t *testing.T) { + ctx := context.Background() + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "source_keyspace", + TargetKeyspace: "target_keyspace", + Workflow: "test_workflow", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + wf := workflowFetcher{ + ts: te.ws.ts, + tmc: te.tmc, + } + + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + } + + query := "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)" + te.tmc.expectVRQuery(100, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "1|table2|1", + )) + + copyStates, err := wf.getWorkflowCopyStates(ctx, &topo.TabletInfo{ + Tablet: tablet, + }, []int32{1}) + assert.NoError(t, err) + assert.Len(t, copyStates, 2) + + state1 := &vtctldatapb.Workflow_Stream_CopyState{ + Table: "table1", + LastPk: "2", + StreamId: 1, + } + state2 := &vtctldatapb.Workflow_Stream_CopyState{ + Table: "table2", + LastPk: "1", + StreamId: 1, + } + assert.Contains(t, copyStates, state1) + assert.Contains(t, copyStates, state2) +} + +func TestFetchCopyStatesByShardStream(t *testing.T) { + ctx := context.Background() + + sourceShards := []string{"-"} + targetShards := []string{"-"} + + te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "source_keyspace", + TargetKeyspace: "target_keyspace", + Workflow: "test_workflow", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "table1", + SourceExpression: fmt.Sprintf("select * from %s", "table1"), + }, + { + TargetTable: "table2", + SourceExpression: fmt.Sprintf("select * from %s", "table2"), + }, + }, + }, sourceShards, targetShards) + + wf := workflowFetcher{ + ts: te.ws.ts, + tmc: te.tmc, + } + + tablet := &topodatapb.Tablet{ + Shard: "-80", + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + } + tablet2 := &topodatapb.Tablet{ + Shard: "80-", + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + } + + query := "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1, 2) and id in (select max(id) from _vt.copy_state where vrepl_id in (1, 2) group by vrepl_id, table_name)" + te.tmc.expectVRQuery(100, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "2|table2|1", "2|table1|1", + )) + + te.tmc.expectVRQuery(101, query, sqltypes.MakeTestResult( + sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|varchar"), + "1|table1|2", "1|table2|1", + )) + + ti := &topo.TabletInfo{ + Tablet: tablet, + } + ti2 := &topo.TabletInfo{ + Tablet: tablet2, + } + + readVReplicationResponse := map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{ + ti: { + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + }, { + Id: 2, + }, + }, + }, + }, + }, + ti2: { + Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + { + Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + { + Id: 1, + }, { + Id: 2, + }, + }, + }, + }, + }, + } + copyStatesByStreamId, err := wf.fetchCopyStatesByShardStream(ctx, readVReplicationResponse) + assert.NoError(t, err) + + copyStates1 := copyStatesByStreamId["-80/1"] + copyStates2 := copyStatesByStreamId["-80/2"] + copyStates3 := copyStatesByStreamId["80-/1"] + + require.NotNil(t, copyStates1) + require.NotNil(t, copyStates2) + require.NotNil(t, copyStates3) + + assert.Len(t, copyStates1, 1) + assert.Len(t, copyStates2, 2) + assert.Len(t, copyStates3, 2) + + assert.Nil(t, copyStatesByStreamId["80-/2"]) +} From f9acb7722bd3efea6f3ea2074092050e7a6581ad Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 6 Dec 2024 09:27:34 -0500 Subject: [PATCH 4/8] VReplication: Enable VPlayerBatching in unit tests (#17339) Signed-off-by: Matt Lord --- .../vreplication/external_connector_test.go | 3 +- .../vreplication/framework_test.go | 1 - .../vreplication/vcopier_test.go | 16 ++++++++++ .../tabletmanager/vreplication/vdbclient.go | 23 ++++++++------ .../tabletmanager/vreplication/vplayer.go | 3 +- .../vreplication/vplayer_flaky_test.go | 31 +++++++++++++------ .../tabletmanager/vreplication/vreplicator.go | 10 ++++-- 7 files changed, 62 insertions(+), 25 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index e00a5578171..c671d2a086d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) { expectDBClientAndVreplicationQueries(t, []string{ "begin", - "insert into tab1(id,val) values (1,_binary'a')", - "insert into tab1(id,val) values (2,_binary'b')", + "insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')", "/update _vt.vreplication set pos=", "commit", }, pos) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index fe8b62d3cef..12a05a69dbc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) { resetBinlogClient() vttablet.InitVReplicationConfigDefaults() - vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 // Engines cannot be initialized in testenv because it introduces circular dependencies. streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index a95e0bf17c5..a7e4794ba76 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 8a4409db06c..63538be881d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -30,6 +30,8 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +const beginStmtLen = int64(len("begin;")) + // vdbClient is a wrapper on binlogplayer.DBClient. // It allows us to retry a failed transactions on lock errors. type vdbClient struct { @@ -56,16 +58,19 @@ func (vc *vdbClient) Begin() error { if vc.InTransaction { return nil } - if err := vc.DBClient.Begin(); err != nil { - return err + if vc.maxBatchSize > 0 { + // We are batching the contents of the transaction, which + // starts with the BEGIN and ends with the COMMIT, so we + // do not send a BEGIN down the wire ahead of time. + vc.queriesPos = int64(len(vc.queries)) + vc.batchSize = beginStmtLen + } else { + // We're not batching so we start the transaction here + // by sending the BEGIN down the wire. + if err := vc.DBClient.Begin(); err != nil { + return err + } } - - // If we're batching, we only batch the contents of the - // transaction, which starts with the begin and ends with - // the commit. - vc.queriesPos = int64(len(vc.queries)) - vc.batchSize = 6 // begin and semicolon - vc.queries = append(vc.queries, "begin") vc.InTransaction = true vc.startTime = time.Now() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..98e36119622 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vr.dbClient.Commit() } batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + // We only do batching in the running/replicating phase. + if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { batchMode = true } if batchMode { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 0cc568c1cf1..50d93e60e5a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { // It does not work when filter is enabled output := qh.Expect( - "begin", "rollback", fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')", output: qh.Expect( "begin", - "insert into dst4(id1,val) values (1,_binary'aaa')", - "insert into dst4(id1,val) values (3,_binary'ccc')", + "insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')", "/update _vt.vreplication set pos=", "commit", ), @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')", output: qh.Expect( "begin", - "insert into dst5(id1,val) values (1,_binary'abc')", - "insert into dst5(id1,val) values (4,_binary'abc')", + "insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')", "/update _vt.vreplication set pos=", "commit", ), @@ -1495,9 +1492,7 @@ func TestPlayerRowMove(t *testing.T) { }) expectDBClientQueries(t, qh.Expect( "begin", - "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", + "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", "/update _vt.vreplication set pos=", "commit", )) @@ -1505,7 +1500,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "1", "1"}, {"2", "5", "2"}, }) - validateQueryCountStat(t, "replicate", 3) + validateQueryCountStat(t, "replicate", 1) execStatements(t, []string{ "update src set val1=1, val2=4 where id=3", @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "5", "2"}, {"2", "2", "1"}, }) - validateQueryCountStat(t, "replicate", 5) + validateQueryCountStat(t, "replicate", 3) } func TestPlayerTypes(t *testing.T) { @@ -2179,6 +2174,14 @@ func TestPlayerSplitTransaction(t *testing.T) { func TestPlayerLockErrors(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), @@ -2258,6 +2261,14 @@ func TestPlayerLockErrors(t *testing.T) { func TestPlayerCancelOnLock(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 9ec274ab0ea..42701288a44 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -508,8 +508,14 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me } vr.stats.State.Store(state.String()) query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id) - if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil { - return fmt.Errorf("could not set state: %v: %v", query, err) + // If we're batching a transaction, then include the state update + // in the current transaction batch. + if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 { + vr.dbClient.AddQueryToTrxBatch(query) + } else { // Otherwise, send it down the wire + if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil { + return fmt.Errorf("could not set state: %v: %v", query, err) + } } if state == vr.state { return nil From 747a61c1434b4dc1a8c76d5c6f64e66dd0e72347 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 6 Dec 2024 10:46:22 -0500 Subject: [PATCH 5/8] VStreamer: For larger compressed transaction payloads, stream the internal contents (#17239) Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 7 ++++ go/mysql/binlog_event_mysql56_test.go | 2 ++ .../tabletserver/vstreamer/vstreamer.go | 32 ++++++++++++++++--- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 7455218d4b5..37d28431087 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -98,6 +98,11 @@ type TransactionPayload struct { payload []byte reader io.Reader iterator func() (BinlogEvent, error) + // StreamingContents tells the consumer that we are streaming the + // decompressed payload and they should also stream the events. + // This ensures that neither the producer nor the consumer are + // holding the entire payload's contents in memory. + StreamingContents bool } // IsTransactionPayload returns true if a compressed transaction @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error { } compressedTrxPayloadsUsingStream.Add(1) tp.reader = streamDecoder + // Signal the consumer to also stream the contents. + tp.StreamingContents = true return nil } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 861d98c6e4f..5844779de63 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { } } if tc.inMemory { + require.False(t, tp.StreamingContents) require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get()) require.Equal(t, tc.want, eventStrs) } else { + require.True(t, tp.StreamingContents) require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get()) require.Len(t, eventStrs, len(tc.want)) totalSize := 0 diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index ea7f75cdc38..59db723ff2b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } return fmt.Errorf("unexpected server EOF") } - vevents, err := vs.parseEvent(ev) + vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { vs.vse.errorCounts.Add("ParseEvent", 1) return err @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // parseEvent parses an event from the binlog and converts it to a list of VEvents. -func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { +// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent +// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need +// to be streamed directly here in order to avoid holding the entire payload's contents, +// which can be 10s or even 100s of GiBs, all in memory. +func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) { if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -672,11 +676,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } return nil, err } - tpvevents, err := vs.parseEvent(tpevent) + tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event if err != nil { return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } - vevents = append(vevents, tpvevents...) + if tp.StreamingContents { + // Transmit each internal event individually to avoid buffering + // the large transaction's entire payload of events in memory, as + // the uncompressed size can be 10s or even 100s of GiBs in size. + if bufferAndTransmit == nil { + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided") + } + for _, tpvevent := range tpvevents { + tpvevent.Timestamp = int64(ev.Timestamp()) + tpvevent.CurrentTime = time.Now().UnixNano() + if err := bufferAndTransmit(tpvevent); err != nil { + if err == io.EOF { + return nil, nil + } + vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err) + } + } + } else { // Process the payload's internal events all at once + vevents = append(vevents, tpvevents...) + } } vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) } From 775ec983bad14d9e2c8d2a97feeeb3fb501f1c7d Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:02:21 +0530 Subject: [PATCH 6/8] Improve logging in buffering (#17294) Signed-off-by: Manan Gupta --- go/vt/vtgate/buffer/buffer.go | 1 + go/vt/vtgate/buffer/shard_buffer.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index dec83e2c78c..eb937a6361c 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -208,6 +208,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, } func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { + log.Infof("Keyspace Event received for keyspace %v", ksevent.Keyspace) for _, shard := range ksevent.Shards { sb := b.getOrCreateBuffer(shard.Target.Keyspace, shard.Target.Shard) if sb != nil { diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index e1f02bb7f0e..66c6ee702e6 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -286,7 +286,7 @@ func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery. msg = "Dry-run: Would have started buffering" } starts.Add(sb.statsKey, 1) - log.Infof("%v for shard: %s (window: %v, size: %v, max failover duration: %v) (A failover was detected by this seen error: %v.)", + log.V(2).Infof("%v for shard: %s (window: %v, size: %v, max failover duration: %v) (A failover was detected by this seen error: %v.)", msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), sb.buf.config.Window, @@ -488,7 +488,7 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS sb.mu.Lock() defer sb.mu.Unlock() - log.Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v", + log.V(2).Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v", sb.keyspace, sb.shard, stillServing, keyspaceEvent.MoveTablesState) if !topoproto.TabletAliasEqual(alias, sb.currentPrimary) { @@ -562,7 +562,7 @@ func (sb *shardBuffer) stopBufferingLocked(reason stopReason, details string) { if sb.mode == bufferModeDryRun { msg = "Dry-run: Would have stopped buffering" } - log.Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", + log.V(2).Infof("%v for shard: %s after: %.1f seconds due to: %v. Draining %d buffered requests now.", msg, topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d.Seconds(), details, len(q)) var clientEntryError error @@ -622,7 +622,7 @@ func (sb *shardBuffer) drain(q []*entry, err error) { wg.Wait() d := sb.timeNow().Sub(start) - log.Infof("Draining finished for shard: %s Took: %v for: %d requests.", topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d, len(q)) + log.V(2).Infof("Draining finished for shard: %s Took: %v for: %d requests.", topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d, len(q)) requestsDrained.Add(sb.statsKey, int64(len(q))) // Draining is done. Change state from "draining" to "idle". From 250148ee797cf04f965e4261c87464aa4b824c2a Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 9 Dec 2024 09:03:26 +0530 Subject: [PATCH 7/8] Fix flakiness in `TestSemiSyncRequiredWithTwoPC` (#17332) Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/twopc_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 5a97f79a79f..8e249365bda 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1353,7 +1353,6 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) { out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none") require.NoError(t, err, out) defer func() { - clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") for _, shard := range clusterInstance.Keyspaces[0].Shards { clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[0].Alias) } @@ -1376,6 +1375,21 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) { _, err = utils.ExecAllowError(t, conn, "commit") require.Error(t, err) require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not") + + _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") + require.NoError(t, err) + for _, shard := range clusterInstance.Keyspaces[0].Shards { + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[1].Alias) + require.NoError(t, err) + } + + // Transaction should now succeed. + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + _, err = utils.ExecAllowError(t, conn, "commit") + require.NoError(t, err) } // TestReadTransactionStatus tests that read transaction state rpc works as expected. From cb6692019453fb94b5b4ee8b40437778ec97a5d6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 9 Dec 2024 08:34:40 +0200 Subject: [PATCH 8/8] Multi-metrics throttler post v21 cleanup: remove unthrottled entry from topo (#17283) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../throttler_topo/throttler_test.go | 2 +- go/vt/schemamanager/tablet_executor.go | 14 ++++++-------- go/vt/vtctl/grpcvtctldserver/server.go | 14 ++++++-------- go/vt/vtgate/executorcontext/vcursor_impl.go | 15 +++++++-------- 4 files changed, 20 insertions(+), 25 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index d5a1053d77d..df727802648 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -463,7 +463,7 @@ func TestThrottleViaApplySchema(t *testing.T) { require.NotNil(t, keyspace.Keyspace.ThrottlerConfig.ThrottledApps) // ThrottledApps will actually be empty at this point, but more specifically we want to see that "online-ddl" is not there. appRule, ok := keyspace.Keyspace.ThrottlerConfig.ThrottledApps[throttlerapp.OnlineDDLName.String()] - assert.True(t, ok, "app rule: %v", appRule) + assert.False(t, ok, "app rule: %v", appRule) }) } diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index bd521703723..60e86bf7faf 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -249,14 +249,12 @@ func (exec *TabletExecutor) executeAlterMigrationThrottle(ctx context.Context, a throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule) } if req.ThrottledApp != nil && req.ThrottledApp.Name != "" { - // TODO(shlomi) in v22: replace the following line with the commented out block - throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // timeNow := time.Now() - // if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { - // throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // } else { - // delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) - // } + timeNow := time.Now() + if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + } else { + delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) + } } return throttlerConfig diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 3bfce2204a2..c3dc22d21b4 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -2098,14 +2098,12 @@ func (s *VtctldServer) UpdateThrottlerConfig(ctx context.Context, req *vtctldata throttlerConfig.CheckAsCheckSelf = false } if req.ThrottledApp != nil && req.ThrottledApp.Name != "" { - // TODO(shlomi) in v22: replace the following line with the commented out block - throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // timeNow := time.Now() - // if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { - // throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // } else { - // delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) - // } + timeNow := time.Now() + if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + } else { + delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) + } } return throttlerConfig } diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index d6aac5cf5b0..c1f341b38cf 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/config" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/discovery" @@ -1439,14 +1440,12 @@ func (vc *VCursorImpl) ThrottleApp(ctx context.Context, throttledAppRule *topoda throttlerConfig.ThrottledApps = make(map[string]*topodatapb.ThrottledAppRule) } if req.ThrottledApp != nil && req.ThrottledApp.Name != "" { - // TODO(shlomi) in v22: replace the following line with the commented out block - throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // timeNow := time.Now() - // if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { - // throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp - // } else { - // delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) - // } + timeNow := time.Now() + if protoutil.TimeFromProto(req.ThrottledApp.ExpiresAt).After(timeNow) { + throttlerConfig.ThrottledApps[req.ThrottledApp.Name] = req.ThrottledApp + } else { + delete(throttlerConfig.ThrottledApps, req.ThrottledApp.Name) + } } return throttlerConfig }