diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 0ecbdb630aa5..6986e9a4182a 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/log/logpb", "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/netutil", diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c7b0497b12b0..62d9f09917e8 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -78,6 +78,20 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura // // The parameter controls which rate(s) to use. func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc { + return makeRateLimitedTimeoutFuncByPermittedSlowdown(permittedRangeScanSlowdown, rateSettings...) +} + +// permittedRangeScanSlowdown is the factor of the above the estimated duration +// for a range scan given the configured rate which we use to configure +// the operations's timeout. +const permittedRangeScanSlowdown = 10 + +// makeRateLimitedTimeoutFuncByPermittedSlowdown creates a timeout function based on a permitted +// slowdown factor on the estimated queue processing duration based on the given rate settings. +// See makeRateLimitedTimeoutFunc for more information. +func makeRateLimitedTimeoutFuncByPermittedSlowdown( + permittedSlowdown int, rateSettings ...*settings.ByteSizeSetting, +) queueProcessTimeoutFunc { return func(cs *cluster.Settings, r replicaInQueue) time.Duration { minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV) // NB: In production code this will type assertion will always succeed. @@ -95,7 +109,7 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue } } estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second - timeout := estimatedDuration * permittedRangeScanSlowdown + timeout := estimatedDuration * time.Duration(permittedSlowdown) if timeout < minimumTimeout { timeout = minimumTimeout } @@ -103,11 +117,6 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue } } -// permittedRangeScanSlowdown is the factor of the above the estimated duration -// for a range scan given the configured rate which we use to configure -// the operations's timeout. -const permittedRangeScanSlowdown = 10 - // PurgatoryError indicates a replica processing failure which indicates the // replica can be placed into purgatory for faster retries than the replica // scanner's interval. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..47d003e30b36 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -33,6 +33,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3" ) @@ -511,6 +513,9 @@ type replicateQueue struct { // descriptors. updateCh chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines + // logTracesThresholdFunc returns the threshold for logging traces from + // processing a replica. + logTracesThresholdFunc queueProcessTimeoutFunc } // newReplicateQueue returns a new instance of replicateQueue. @@ -520,6 +525,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica allocator: allocator, purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, updateCh: make(chan time.Time, 1), + logTracesThresholdFunc: makeRateLimitedTimeoutFuncByPermittedSlowdown( + permittedRangeScanSlowdown/2, rebalanceSnapshotRate, recoverySnapshotRate, + ), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -660,7 +668,7 @@ func (rq *replicateQueue) process( // usually signaling that a rebalancing reservation could not be made with the // selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - requeue, err := rq.processOneChange( + requeue, err := rq.processOneChangeWithTracing( ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */ ) if isSnapshotError(err) { @@ -736,6 +744,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {} var _ PurgatoryError = decommissionPurgatoryError{} +// processOneChangeWithTracing executes processOneChange within a tracing span, +// logging the resulting traces to the DEV channel in the case of errors or +// when the configured log traces threshold is exceeded. +func (rq *replicateQueue) processOneChangeWithTracing( + ctx context.Context, + repl *Replica, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, + scatter, dryRun bool, +) (requeue bool, _ error) { + processStart := timeutil.Now() + ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica", + tracing.WithRecording(tracingpb.RecordingVerbose)) + defer sp.Finish() + + requeue, err := rq.processOneChange(ctx, repl, canTransferLeaseFrom, scatter, dryRun) + + // Utilize a new background context (properly annotated) to avoid writing + // traces from a child context into its parent. + { + ctx := repl.AnnotateCtx(rq.AnnotateCtx(context.Background())) + rec := sp.GetConfiguredRecording() + processDuration := timeutil.Since(processStart) + loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl) + exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold + if err != nil { + // TODO(sarkesian): Utilize Allocator log channel once available. + log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec) + } else if exceededDuration { + // TODO(sarkesian): Utilize Allocator log channel once available. + log.Infof(ctx, "processing replica took %s, exceeding threshold of %s\ntrace:\n%s", + processDuration, loggingThreshold, rec) + } + } + + return requeue, err +} + func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 6c5827963b0b..18f2bcbe9a2c 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -44,6 +44,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" @@ -617,6 +620,128 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } +// TestReplicateQueueTracingOnError tests that an error or slowdown in +// processing a replica results in traces being logged. +func TestReplicateQueueTracingOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + s := log.ScopeWithoutShowLogs(t) + defer s.Close(t) + + // NB: This test injects a fake failure during replica rebalancing, and we use + // this `rejectSnapshots` variable as a flag to activate or deactivate that + // injected failure. + var rejectSnapshots int64 + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) == 1 { + return errors.Newf("boom") + } + return nil + }, + }}}, + }, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica to the second and third nodes, and then decommission the + // second node. Since there are only 4 nodes in the cluster, the + // decommissioning replica must be rebalanced to the fourth node. + const decomNodeIdx = 1 + const decomNodeID = 2 + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) + adminSrv := tc.Server(decomNodeIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + _, err = adminClient.Decommission( + ctx, &serverpb.DecommissionRequest{ + NodeIDs: []roachpb.NodeID{decomNodeID}, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Activate the above testing knob to start rejecting future rebalances and + // then attempt to rebalance the decommissioning replica away. We expect a + // purgatory error to be returned here. + atomic.StoreInt64(&rejectSnapshots, 1) + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) + require.NoError(t, err) + + testStartTs := timeutil.Now() + recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) + require.NoError(t, enqueueErr) + require.Error(t, processErr, "expected processing error") + + // Flush logs and get log messages from replicate_queue.go since just + // before calling store.Enqueue(..). + log.Flush() + entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), + math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData) + if err != nil { + t.Fatal(err) + } + + opName := "process replica" + errRegexp, err := regexp.Compile(`error processing replica:.*boom`) + require.NoError(t, err) + traceRegexp, err := regexp.Compile(`trace:.*`) + require.NoError(t, err) + opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName)) + require.NoError(t, err) + + // Validate that the error is logged, so that we can use the log entry to + // validate the trace output. + foundEntry := false + var entry logpb.Entry + for _, entry = range entries { + if errRegexp.MatchString(entry.Message) { + foundEntry = true + break + } + } + require.True(t, foundEntry) + + // Validate that the trace is included in the log message. + require.Regexp(t, traceRegexp, entry.Message) + require.Regexp(t, opRegexp, entry.Message) + + // Validate that the trace was logged with the correct tags for the replica. + require.Regexp(t, fmt.Sprintf("n%d", repl.NodeID()), entry.Tags) + require.Regexp(t, fmt.Sprintf("s%d", repl.StoreID()), entry.Tags) + require.Regexp(t, fmt.Sprintf("r%d/%d", repl.GetRangeID(), repl.ReplicaID()), entry.Tags) + require.Regexp(t, `replicate`, entry.Tags) + + // Validate that the returned tracing span includes the operation, but also + // that the stringified trace was not logged to the span or its parent. + processRecSpan, foundSpan := recording.FindSpan(opName) + require.True(t, foundSpan) + + foundParent := false + var parentRecSpan tracingpb.RecordedSpan + for _, parentRecSpan = range recording { + if parentRecSpan.SpanID == processRecSpan.ParentSpanID { + foundParent = true + break + } + } + require.True(t, foundParent) + spans := tracingpb.Recording{parentRecSpan, processRecSpan} + stringifiedSpans := spans.String() + require.NotRegexp(t, errRegexp, stringifiedSpans) + require.NotRegexp(t, traceRegexp, stringifiedSpans) +} + // TestReplicateQueueDecommissionPurgatoryError tests that failure to move a // decommissioning replica puts it in the replicate queue purgatory. func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {