Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap errors happening during PRS in a new vterrors code #17669

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
88 changes: 88 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
Expand Down Expand Up @@ -234,3 +236,89 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}

func TestBufferingErrorInTransaction(t *testing.T) {
clusterInstance := utils.SetupShardedReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)

keyspace := clusterInstance.Keyspaces[0]
vtParams := clusterInstance.GetVTParams(keyspace.Name)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

// Start by reparenting all the shards to the first tablet.
// Confirm that the replication is setup correctly in the beginning.
// tablets[0] is the primary tablet in the beginning.
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2]})

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
idx := 20

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
select {
case <-ctx.Done():
return
default:
_, err = conn.ExecuteFetch("begin", 0, false)
if err != nil {
log.Errorf("Begin error received - %v", err)
continue
}

for i := 0; i < 25; i++ {
idx += 5
_, err = conn.ExecuteFetch(utils.GetInsertMultipleValuesQuery(idx, idx+1, idx+2, idx+3), 0, false)
if err != nil {
log.Errorf("Insert error received - %v", err)
}
time.Sleep(10 * time.Millisecond)
}

time.Sleep(10 * time.Second)
var hasCommit bool
_, err = conn.ExecuteFetch("commit", 0, false)
if err != nil {
log.Errorf("Commit error received - %v", err)
} else {
log.Error("Commit successful")
hasCommit = true
}

if !hasCommit {
_, err = conn.ExecuteFetch("rollback", 0, false)
if err != nil {
log.Errorf("Rollback error received - %v", err)
}
}

time.Sleep(100 * time.Millisecond)
}
}
}()

// Reparent to the other replica
utils.ShardName = "-40"
defer func() {
utils.ShardName = "0"
}()

output, err := utils.Prs(t, clusterInstance, tablets[1])
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)

time.Sleep(5 * time.Second)

// We now restart the vttablet that became a replica.
utils.StopTablet(t, tablets[0], false)

time.Sleep(10 * time.Second)

tablets[0].VttabletProcess.ServingStatus = "SERVING"
err = tablets[0].VttabletProcess.Setup()
require.NoError(t, err)

time.Sleep(1 * time.Minute)
cancel()
}
20 changes: 13 additions & 7 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ import (
)

var (
KeyspaceName = "ks"
dbName = "vt_" + KeyspaceName
username = "vt_dba"
Hostname = "localhost"
insertVal = 1
insertSQL = "insert into vt_insert_test(id, msg) values (%d, 'test %d')"
sqlSchema = `
KeyspaceName = "ks"
dbName = "vt_" + KeyspaceName
username = "vt_dba"
Hostname = "localhost"
insertVal = 1
insertSQL = "insert into vt_insert_test(id, msg) values (%d, 'test %d')"
insertSQLMultipleValues = "insert into vt_insert_test(id, msg) values (%d, 'test %d'), (%d, 'test %d'), (%d, 'test %d'), (%d, 'test %d')"
sqlSchema = `
create table vt_insert_test (
id bigint,
msg varchar(64),
Expand Down Expand Up @@ -117,6 +118,11 @@ func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetInsertMultipleValuesQuery returns a built insert query to insert multiple rows at once.
func GetInsertMultipleValuesQuery(idx1, idx2, idx3, idx4 int) string {
return fmt.Sprintf(insertSQLMultipleValues, idx1, idx1, idx2, idx2, idx3, idx3, idx4, idx4)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ func TestReplicaTransactions(t *testing.T) {
require.NoError(t, err)
serving := replicaTablet.VttabletProcess.WaitForStatus("SERVING", 60*time.Second)
assert.Equal(t, serving, true, "Tablet did not become ready within a reasonable time")
utils.AssertContainsError(t, readConn, fetchAllCustomers, "not found")

// create a new connection, should be able to query again
readConn, err = mysql.Connect(ctx, &vtParams)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata
hc.mu.Unlock()
if thc == nil || thc.Conn == nil {
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
return nil, vterrors.VT15001(vtrpc.Code_NOT_FOUND, fmt.Sprintf("tablet: %v is either down or nonexistent", alias))
}
return thc.Connection(ctx), nil
}
Expand Down
38 changes: 38 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vterrors

import (
"fmt"
"strings"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -120,6 +121,8 @@ var (
VT14004 = errorWithoutState("VT14004", vtrpcpb.Code_UNAVAILABLE, "cannot find keyspace for: %s", "The specified keyspace could not be found.")
VT14005 = errorWithoutState("VT14005", vtrpcpb.Code_UNAVAILABLE, "cannot lookup sidecar database for keyspace: %s", "Failed to read sidecar database identifier.")

VT15001 = errorWithNoCode("VT15001", "transient error, please retry the transaction: %s", "The opened transaction should be closed by the application and re-tried.")

// Errors is a list of errors that must match all the variables
// defined above to enable auto-documentation of error codes.
Errors = []func(args ...any) *VitessError{
Expand Down Expand Up @@ -207,6 +210,10 @@ var (
VT14004,
VT14005,
}

ErrorsWithNoCode = []func(code vtrpcpb.Code, args ...any) *VitessError{
VT15001,
}
)

type VitessError struct {
Expand Down Expand Up @@ -258,3 +265,34 @@ func errorWithState(id string, code vtrpcpb.Code, state State, short, long strin
}
}
}

func errorWithNoCode(id string, short, long string) func(code vtrpcpb.Code, args ...any) *VitessError {
return func(code vtrpcpb.Code, args ...any) *VitessError {
s := short
if len(args) != 0 {
s = fmt.Sprintf(s, args...)
}

return &VitessError{
Err: New(code, id+": "+s),
Description: long,
ID: id,
}
}
}

func ErrorsHaveInvalidSession(errs []error) bool {
for _, err := range errs {
if IsInvalidSessionError(err) {
return true
}
}
return false
}

func IsInvalidSessionError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), VT15001(0).ID)
}
18 changes: 16 additions & 2 deletions go/vt/vterrors/vterrorsgen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"text/template"

"vitess.io/vitess/go/mysql/sqlerror"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/vterrors"
)
Expand All @@ -31,10 +32,14 @@ const (
tmpl = `
| ID | Description | Error | MySQL Error Code | SQL State |
| --- | --- | --- | --- | --- |
{{- range $err := . }}
{{- range $err := .Errors }}
{{- $data := (call $err) }}
| {{ $data.ID }} | {{ $data.Description }} | {{ FormatError $data.Err }} | {{ ConvertStateToMySQLErrorCode $data.State }} | {{ ConvertStateToMySQLState $data.State }} |
{{- end }}
{{- range $err := .ErrorsWithNoCode }}
{{- $data := (call $err 0) }}
| {{ $data.ID }} | {{ $data.Description }} | {{ FormatError $data.Err }} | | {{ ConvertStateToMySQLState $data.State }} |
{{- end }}
`
)

Expand All @@ -53,7 +58,16 @@ func main() {
})
t = template.Must(t.Parse(tmpl))

err := t.ExecuteTemplate(os.Stdout, "template", vterrors.Errors)
type data struct {
Errors []func(args ...any) *vterrors.VitessError
ErrorsWithNoCode []func(code vtrpcpb.Code, args ...any) *vterrors.VitessError
}
d := data{
Errors: vterrors.Errors,
ErrorsWithNoCode: vterrors.ErrorsWithNoCode,
}

err := t.ExecuteTemplate(os.Stdout, "template", d)
if err != nil {
log.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/buffer/shard_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"time"

"vitess.io/vitess/go/vt/discovery"

"vitess.io/vitess/go/vt/vtgate/errorsanitizer"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/errorsanitizer"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// bufferState represents the different states a shardBuffer object can be in.
Expand Down Expand Up @@ -488,7 +489,7 @@ func (sb *shardBuffer) recordKeyspaceEvent(alias *topodatapb.TabletAlias, stillS
sb.mu.Lock()
defer sb.mu.Unlock()

log.V(2).Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v",
log.Infof("disruption in shard %s/%s resolved (serving: %v), movetable state %#v",
sb.keyspace, sb.shard, stillServing, keyspaceEvent.MoveTablesState)

if !topoproto.TabletAliasEqual(alias, sb.currentPrimary) {
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,12 @@ func (vc *VCursorImpl) Execute(ctx context.Context, method string, query string,

err := vc.markSavepoint(ctx, rollbackOnError, map[string]*querypb.BindVariable{})
if err != nil {
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError)
return nil, err
}

qr, err := vc.executor.Execute(ctx, nil, method, session, vc.marginComments.Leading+query+vc.marginComments.Trailing, bindVars)
vc.setRollbackOnPartialExecIfRequired(err != nil, rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), err != nil, rollbackOnError)

return qr, err
}
Expand Down Expand Up @@ -794,11 +795,12 @@ func (vc *VCursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(noOfShards))
err := vc.markSavepoint(ctx, rollbackOnError && (noOfShards > 1), map[string]*querypb.BindVariable{})
if err != nil {
vc.setRollbackOnPartialExecIfRequired(vterrors.IsInvalidSessionError(err), true, rollbackOnError)
return nil, []error{err}
}

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.SafeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.observer, fetchLastInsertID)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError)
vc.logShardsQueried(primitive, len(rss))
if qr != nil && qr.InsertIDUpdated() {
vc.SafeSession.LastInsertId = qr.InsertID
Expand All @@ -818,7 +820,7 @@ func (vc *VCursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine.
}

errs := vc.executor.StreamExecuteMulti(ctx, primitive, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.SafeSession, autocommit, callback, vc.observer, fetchLastInsertID)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.setRollbackOnPartialExecIfRequired(vterrors.ErrorsHaveInvalidSession(errs), len(errs) != len(rss), rollbackOnError)

return errs
}
Expand Down Expand Up @@ -903,8 +905,8 @@ func (vc *VCursorImpl) AutocommitApproval() bool {
// when the query gets successfully executed on at least one shard,
// there does not exist any old savepoint for which rollback is already set
// and rollback on error is allowed.
func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(atleastOneSuccess bool, rollbackOnError bool) {
if atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() {
func (vc *VCursorImpl) setRollbackOnPartialExecIfRequired(required bool, atleastOneSuccess bool, rollbackOnError bool) {
if required || atleastOneSuccess && rollbackOnError && !vc.SafeSession.IsRollbackSet() {
vc.SafeSession.SetRollbackCommand()
}
}
Expand Down
Loading
Loading