Skip to content

Commit

Permalink
Merge branch 'main' into throttler-multi-metrics-v22-cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Dec 9, 2024
2 parents e1c3817 + cb66920 commit 4aab771
Show file tree
Hide file tree
Showing 30 changed files with 1,474 additions and 628 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/update_golang_version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
if [ ${{ matrix.branch }} == "main" ]; then
go run ./go/tools/go-upgrade/go-upgrade.go upgrade --main --allow-major-upgrade
else if [ ${{ matrix.branch }} == "release-21.0" ]; then
elif [ ${{ matrix.branch }} == "release-21.0" ]; then
go run ./go/tools/go-upgrade/go-upgrade.go upgrade
else
go run ./go/tools/go-upgrade/go-upgrade.go upgrade --workflow-update=false
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ $(PROTO_GO_OUTS): minimaltools install_protoc-gen-go proto/*.proto
# This rule builds the bootstrap images for all flavors.
DOCKER_IMAGES_FOR_TEST = mysql80 percona80
DOCKER_IMAGES = common $(DOCKER_IMAGES_FOR_TEST)
BOOTSTRAP_VERSION=38
BOOTSTRAP_VERSION=39
ensure_bootstrap_version:
find docker/ -type f -exec sed -i "s/^\(ARG bootstrap_version\)=.*/\1=${BOOTSTRAP_VERSION}/" {} \;
sed -i 's/\(^.*flag.String(\"bootstrap-version\",\) *\"[^\"]\+\"/\1 \"${BOOTSTRAP_VERSION}\"/' test.go
Expand Down
2 changes: 1 addition & 1 deletion build.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
source ./tools/shell_functions.inc

go version >/dev/null 2>&1 || fail "Go is not installed or is not in \$PATH. See https://vitess.io/contributing/build-from-source for install instructions."
goversion_min 1.23.3 || echo "Go version reported: `go version`. Version 1.23.3+ recommended. See https://vitess.io/contributing/build-from-source for install instructions."
goversion_min 1.23.4 || echo "Go version reported: `go version`. Version 1.23.4+ recommended. See https://vitess.io/contributing/build-from-source for install instructions."

mkdir -p dist
mkdir -p bin
Expand Down
6 changes: 5 additions & 1 deletion docker/bootstrap/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ List of changes between bootstrap image versions.

## [38] - 2024-11-10
### Changes
- Update build to golang 1.23.3
- Update build to golang 1.23.3

## [39] - 2024-12-04
### Changes
- Update build to golang 1.23.4
2 changes: 1 addition & 1 deletion docker/bootstrap/Dockerfile.common
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 golang:1.23.3-bullseye
FROM --platform=linux/amd64 golang:1.23.4-bullseye

# Install Vitess build dependencies
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
Expand Down
2 changes: 1 addition & 1 deletion docker/lite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder
FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder

# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER
Expand Down
2 changes: 1 addition & 1 deletion docker/lite/Dockerfile.percona80
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder
FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder

# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER
Expand Down
2 changes: 1 addition & 1 deletion docker/vttestserver/Dockerfile.mysql80
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM --platform=linux/amd64 golang:1.23.3-bullseye AS builder
FROM --platform=linux/amd64 golang:1.23.4-bullseye AS builder

# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module vitess.io/vitess

go 1.23.3
go 1.23.4

require (
cloud.google.com/go/storage v1.46.0
Expand Down
7 changes: 7 additions & 0 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type TransactionPayload struct {
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
// StreamingContents tells the consumer that we are streaming the
// decompressed payload and they should also stream the events.
// This ensures that neither the producer nor the consumer are
// holding the entire payload's contents in memory.
StreamingContents bool
}

// IsTransactionPayload returns true if a compressed transaction
Expand Down Expand Up @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error {
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
// Signal the consumer to also stream the contents.
tp.StreamingContents = true
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
}
}
if tc.inMemory {
require.False(t, tp.StreamingContents)
require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get())
require.Equal(t, tc.want, eventStrs)
} else {
require.True(t, tp.StreamingContents)
require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get())
require.Len(t, eventStrs, len(tc.want))
totalSize := 0
Expand Down
16 changes: 15 additions & 1 deletion go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,6 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=none")
require.NoError(t, err, out)
defer func() {
clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
for _, shard := range clusterInstance.Keyspaces[0].Shards {
clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[0].Alias)
}
Expand All @@ -1376,6 +1375,21 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
_, err = utils.ExecAllowError(t, conn, "commit")
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")

_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err)
for _, shard := range clusterInstance.Keyspaces[0].Shards {
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, shard.Vttablets[1].Alias)
require.NoError(t, err)
}

// Transaction should now succeed.
utils.Exec(t, conn, "begin")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)")
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
_, err = utils.ExecAllowError(t, conn, "commit")
require.NoError(t, err)
}

// TestReadTransactionStatus tests that read transaction state rpc works as expected.
Expand Down
21 changes: 21 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ type testTMClient struct {
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*readVReplicationWorkflowRequestResponse
updateVReplicationWorklowsRequests map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest
applySchemaRequests map[uint32]*applySchemaRequestResponse
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
Expand All @@ -294,6 +295,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*readVReplicationWorkflowRequestResponse),
updateVReplicationWorklowsRequests: make(map[uint32]*tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest),
applySchemaRequests: make(map[uint32]*applySchemaRequestResponse),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
Expand Down Expand Up @@ -677,6 +679,19 @@ func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet
}, nil
}

func (tmc *testTMClient) UpdateVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if expect := tmc.updateVReplicationWorklowsRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request on tablet %s: got %+v, want %+v",
topoproto.TabletAliasString(tablet.Alias), req, expect)
}
}
delete(tmc.updateVReplicationWorklowsRequests, tablet.Alias.Uid)
return nil, nil
}

func (tmc *testTMClient) ValidateVReplicationPermissions(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ValidateVReplicationPermissionsRequest) (*tabletmanagerdatapb.ValidateVReplicationPermissionsResponse, error) {
return &tabletmanagerdatapb.ValidateVReplicationPermissionsResponse{
User: "vt_filtered",
Expand Down Expand Up @@ -736,6 +751,12 @@ func (tmc *testTMClient) AddVReplicationWorkflowsResponse(key string, resp *tabl
tmc.readVReplicationWorkflowsResponses[key] = append(tmc.readVReplicationWorkflowsResponses[key], resp)
}

func (tmc *testTMClient) AddUpdateVReplicationRequests(tabletUID uint32, req *tabletmanagerdatapb.UpdateVReplicationWorkflowsRequest) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
tmc.updateVReplicationWorklowsRequests[tabletUID] = req
}

func (tmc *testTMClient) getVReplicationWorkflowsResponse(key string) *tabletmanagerdatapb.ReadVReplicationWorkflowsResponse {
if len(tmc.readVReplicationWorkflowsResponses) == 0 {
return nil
Expand Down
Loading

0 comments on commit 4aab771

Please sign in to comment.