Skip to content

Commit

Permalink
Merge pull request #127372 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.1.3-rc-121691

release-24.1.3-rc: kvclient: don't drop ambiguous errors on incompatible transport
  • Loading branch information
andrewbaptist authored Jul 29, 2024
2 parents 4f18116 + 6634b51 commit 9e10212
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,13 +2429,13 @@ func maybeSetResumeSpan(
}
}

// noMoreReplicasErr produces the error to be returned from sendToReplicas when
// selectBestError produces the error to be returned from sendToReplicas when
// the transport is exhausted.
//
// ambiguousErr, if not nil, is the error we got from the first attempt when the
// success of the request cannot be ruled out by the error. lastAttemptErr is
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, replicaUnavailableErr, lastAttemptErr error) error {
func selectBestError(ambiguousErr, replicaUnavailableErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)",
ambiguousErr, lastAttemptErr)
Expand All @@ -2444,17 +2444,7 @@ func noMoreReplicasErr(ambiguousErr, replicaUnavailableErr, lastAttemptErr error
return replicaUnavailableErr
}

// Authentication and authorization errors should be propagated up rather than
// wrapped in a sendError and retried as they are likely to be fatal if they
// are returned from multiple servers.
if grpcutil.IsAuthError(lastAttemptErr) {
return lastAttemptErr
}
// TODO(bdarnell): The error from the last attempt is not necessarily the best
// one to return; we may want to remember the "best" error we've seen (for
// example, a NotLeaseHolderError conveys more information than a
// RangeNotFound).
return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error"))
return lastAttemptErr
}

// slowDistSenderRangeThreshold is a latency threshold for logging slow
Expand Down Expand Up @@ -3034,12 +3024,17 @@ func (ds *DistSender) sendToReplicas(
// regress. As such, advancing through each replica on the
// transport until it's exhausted is unlikely to achieve much.
//
// We bail early by returning a sendError. The expectation is
// for the client to retry with a fresher eviction token.
// We bail early by returning the best error we have
// seen so far. The expectation is for the client to
// retry with a fresher eviction token if possible.
log.VEventf(
ctx, 2, "transport incompatible with updated routing; bailing early",
)
return nil, newSendError(errors.Wrap(tErr, "leaseholder not found in transport; last error"))
return nil, selectBestError(
ambiguousError,
replicaUnavailableError,
newSendError(errors.Wrap(tErr, "leaseholder not found in transport; last error")),
)
}
}
// Check whether the request was intentionally sent to a follower
Expand Down Expand Up @@ -3308,15 +3303,21 @@ func skipStaleReplicas(
// RangeKeyMismatchError if there's even a replica. We'll bubble up an
// error and try with a new descriptor.
if !routing.Valid() {
return noMoreReplicasErr(
return selectBestError(
ambiguousError,
nil, // ignore the replicaUnavailableError, retry with new routing info
errors.Wrap(lastErr, "routing information detected to be stale"))
newSendError(errors.Wrap(lastErr, "routing information detected to be stale")))
}

for {
if transport.IsExhausted() {
return noMoreReplicasErr(ambiguousError, replicaUnavailableError, lastErr)
// Authentication and authorization errors should be propagated up rather than
// wrapped in a sendError and retried as they are likely to be fatal if they
// are returned from multiple servers.
if !grpcutil.IsAuthError(lastErr) {
lastErr = newSendError(errors.Wrap(lastErr, "sending to all replicas failed; last error"))
}
return selectBestError(ambiguousError, replicaUnavailableError, lastErr)
}

if _, ok := routing.Desc().GetReplicaDescriptorByID(transport.NextReplica().ReplicaID); ok {
Expand Down
195 changes: 195 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3606,6 +3606,201 @@ func TestGatewayNodeID(t *testing.T) {
}
}

// TestReplicaErrorsMerged tests cases where the different replicas return
// different errors. Specifically it is making sure that more important errors
// such as ambiguous errors are never dropped.
func TestReplicaErrorsMerged(t *testing.T) {
// Only one descriptor.
var initDescriptor = roachpb.RangeDescriptor{
Generation: 1,
RangeID: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
ReplicaID: 1,
},
{
NodeID: 2,
StoreID: 2,
ReplicaID: 2,
},
},
}
var initLease = roachpb.Lease{
Sequence: 1,
Replica: roachpb.ReplicaDescriptor{
NodeID: 1, StoreID: 1, ReplicaID: 1,
},
}
var descriptor2 = roachpb.RangeDescriptor{
Generation: 2,
RangeID: 1,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{
NodeID: 1,
StoreID: 1,
ReplicaID: 1,
},
{
NodeID: 3,
StoreID: 3,
ReplicaID: 2,
},
},
}
var lease3 = roachpb.Lease{
Sequence: 2,
Replica: roachpb.ReplicaDescriptor{
NodeID: 3, StoreID: 3, ReplicaID: 2,
},
}

notLeaseHolderErr := kvpb.NewError(kvpb.NewNotLeaseHolderError(lease3, 0, &descriptor2, ""))
startedRequestError := errors.New("request might have started")
unavailableError1 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[0]))
unavailableError2 := kvpb.NewError(kvpb.NewReplicaUnavailableError(errors.New("unavailable"), &initDescriptor, initDescriptor.InternalReplicas[1]))

// withCommit changes the error handling behavior in sendPartialBatch.
// Specifically if the top level request was sent with a commit, then it
// will convert network errors that may have started to ambiguous errors and
// these are returned with higher priority. This prevents ambiguous errors
// from being retried incorrectly.
// See https://cockroachlabs.com/blog/demonic-nondeterminism/#appendix for
// the gory details.
testCases := []struct {
withCommit bool
sendErr1, sendErr2 error
err1, err2 *kvpb.Error
expErr string
}{
// The ambiguous error is returned with higher priority for withCommit.
{
withCommit: true,
sendErr1: startedRequestError,
err2: notLeaseHolderErr,
expErr: "result is ambiguous",
},
// The not leaseholder errors is the last error.
{
withCommit: false,
sendErr1: startedRequestError,
err2: notLeaseHolderErr,
expErr: "leaseholder not found in transport",
},
// The ambiguous error is returned with higher priority for withCommit.
{
withCommit: true,
sendErr1: startedRequestError,
err2: unavailableError2,
expErr: "result is ambiguous",
},
// The unavailable error is the last error.
{
withCommit: false,
sendErr1: startedRequestError,
err2: unavailableError2,
expErr: "unavailable",
},
// The unavailable error is returned with higher priority regardless of withCommit.
{
withCommit: true,
err1: unavailableError1,
err2: notLeaseHolderErr,
expErr: "unavailable",
},
// The unavailable error is returned with higher priority regardless of withCommit.
{
withCommit: false,
err1: unavailableError1,
err2: notLeaseHolderErr,
expErr: "unavailable",
},
}
clock := hlc.NewClockForTesting(nil)
ns := &mockNodeStore{
nodes: []roachpb.NodeDescriptor{
{
NodeID: 1,
Address: util.UnresolvedAddr{},
},
{
NodeID: 2,
Address: util.UnresolvedAddr{},
},
},
}
ctx := context.Background()
for i, tc := range testCases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
// We run every test case twice, to make sure error merging is commutative.
testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
st := cluster.MakeTestingClusterSettings()
rc := rangecache.NewRangeCache(st, nil /* db */, func() int64 { return 100 }, stopper)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: initDescriptor,
Lease: initLease,
})

transportFn := func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
br := &kvpb.BatchResponse{}
switch ba.Replica.NodeID {
case 1:
if tc.sendErr1 != nil {
return nil, tc.sendErr1
} else {
br.Error = tc.err1
}
return br, nil
case 2:
if tc.sendErr2 != nil {
return nil, tc.sendErr2
} else {
br.Error = tc.err2
}
return br, nil
default:
assert.Fail(t, "Unexpected replica n%d", ba.Replica.NodeID)
return nil, nil
}
}
cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: ns,
Stopper: stopper,
RangeDescriptorDB: MockRangeDescriptorDB(func(key roachpb.RKey, reverse bool) (
[]roachpb.RangeDescriptor, []roachpb.RangeDescriptor, error,
) {
// These tests only deal with the low-level sendToReplicas(). Nobody
// should be reading descriptor from the database, but the DistSender
// insists on having a non-nil one.
return nil, nil, errors.New("range desc db unexpectedly used")
}),
TransportFactory: adaptSimpleTransport(transportFn),
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)

ba := &kvpb.BatchRequest{}
ba.Add(kvpb.NewGet(roachpb.Key("a")))
tok, err := rc.LookupWithEvictionToken(ctx, roachpb.RKeyMin, rangecache.EvictionToken{}, false)
require.NoError(t, err)
br, err := ds.sendToReplicas(ctx, ba, tok, tc.withCommit)
log.Infof(ctx, "Error is %v", err)
require.ErrorContains(t, err, tc.expErr)
require.Nil(t, br)
})
})
}
}

// TestMultipleErrorsMerged tests that DistSender prioritizes errors that are
// returned from concurrent partial batches and returns the "best" one after
// merging the transaction metadata passed on the errors.
Expand Down

0 comments on commit 9e10212

Please sign in to comment.