From 6c45224e0af91b1d4940f93e52f2bce4dba18414 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 11 Aug 2022 23:47:11 -0400 Subject: [PATCH] kvserver: log traces from replicate queue on errors or slow processing While we previously had some logging from the replicate queue as a result of the standard queue logging, this change adds logging to the replicate queue when there are errors in processing a replica, or when processing a replica exceeds a 50% of the timeout duration. When there are errors or the duration threshold is exceeded, any error messages are logged along with the collected tracing spans from the operation. Release note (ops change): Added logging on replicate queue processing in the presence of errors or when the duration exceeds 50% of the timeout. Release justification: Low risk observability change. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/queue.go | 21 ++-- pkg/kv/kvserver/replicate_queue.go | 47 ++++++++- pkg/kv/kvserver/replicate_queue_test.go | 125 ++++++++++++++++++++++++ 4 files changed, 187 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 0ecbdb630aa5..6986e9a4182a 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/log/logpb", "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/netutil", diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c7b0497b12b0..62d9f09917e8 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -78,6 +78,20 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura // // The parameter controls which rate(s) to use. func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc { + return makeRateLimitedTimeoutFuncByPermittedSlowdown(permittedRangeScanSlowdown, rateSettings...) +} + +// permittedRangeScanSlowdown is the factor of the above the estimated duration +// for a range scan given the configured rate which we use to configure +// the operations's timeout. +const permittedRangeScanSlowdown = 10 + +// makeRateLimitedTimeoutFuncByPermittedSlowdown creates a timeout function based on a permitted +// slowdown factor on the estimated queue processing duration based on the given rate settings. +// See makeRateLimitedTimeoutFunc for more information. +func makeRateLimitedTimeoutFuncByPermittedSlowdown( + permittedSlowdown int, rateSettings ...*settings.ByteSizeSetting, +) queueProcessTimeoutFunc { return func(cs *cluster.Settings, r replicaInQueue) time.Duration { minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV) // NB: In production code this will type assertion will always succeed. @@ -95,7 +109,7 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue } } estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second - timeout := estimatedDuration * permittedRangeScanSlowdown + timeout := estimatedDuration * time.Duration(permittedSlowdown) if timeout < minimumTimeout { timeout = minimumTimeout } @@ -103,11 +117,6 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue } } -// permittedRangeScanSlowdown is the factor of the above the estimated duration -// for a range scan given the configured rate which we use to configure -// the operations's timeout. -const permittedRangeScanSlowdown = 10 - // PurgatoryError indicates a replica processing failure which indicates the // replica can be placed into purgatory for faster retries than the replica // scanner's interval. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..47d003e30b36 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -33,6 +33,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3" ) @@ -511,6 +513,9 @@ type replicateQueue struct { // descriptors. updateCh chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines + // logTracesThresholdFunc returns the threshold for logging traces from + // processing a replica. + logTracesThresholdFunc queueProcessTimeoutFunc } // newReplicateQueue returns a new instance of replicateQueue. @@ -520,6 +525,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica allocator: allocator, purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, updateCh: make(chan time.Time, 1), + logTracesThresholdFunc: makeRateLimitedTimeoutFuncByPermittedSlowdown( + permittedRangeScanSlowdown/2, rebalanceSnapshotRate, recoverySnapshotRate, + ), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -660,7 +668,7 @@ func (rq *replicateQueue) process( // usually signaling that a rebalancing reservation could not be made with the // selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - requeue, err := rq.processOneChange( + requeue, err := rq.processOneChangeWithTracing( ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */ ) if isSnapshotError(err) { @@ -736,6 +744,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {} var _ PurgatoryError = decommissionPurgatoryError{} +// processOneChangeWithTracing executes processOneChange within a tracing span, +// logging the resulting traces to the DEV channel in the case of errors or +// when the configured log traces threshold is exceeded. +func (rq *replicateQueue) processOneChangeWithTracing( + ctx context.Context, + repl *Replica, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, + scatter, dryRun bool, +) (requeue bool, _ error) { + processStart := timeutil.Now() + ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica", + tracing.WithRecording(tracingpb.RecordingVerbose)) + defer sp.Finish() + + requeue, err := rq.processOneChange(ctx, repl, canTransferLeaseFrom, scatter, dryRun) + + // Utilize a new background context (properly annotated) to avoid writing + // traces from a child context into its parent. + { + ctx := repl.AnnotateCtx(rq.AnnotateCtx(context.Background())) + rec := sp.GetConfiguredRecording() + processDuration := timeutil.Since(processStart) + loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl) + exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold + if err != nil { + // TODO(sarkesian): Utilize Allocator log channel once available. + log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec) + } else if exceededDuration { + // TODO(sarkesian): Utilize Allocator log channel once available. + log.Infof(ctx, "processing replica took %s, exceeding threshold of %s\ntrace:\n%s", + processDuration, loggingThreshold, rec) + } + } + + return requeue, err +} + func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 6c5827963b0b..18f2bcbe9a2c 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -44,6 +44,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" @@ -617,6 +620,128 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } +// TestReplicateQueueTracingOnError tests that an error or slowdown in +// processing a replica results in traces being logged. +func TestReplicateQueueTracingOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + s := log.ScopeWithoutShowLogs(t) + defer s.Close(t) + + // NB: This test injects a fake failure during replica rebalancing, and we use + // this `rejectSnapshots` variable as a flag to activate or deactivate that + // injected failure. + var rejectSnapshots int64 + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) == 1 { + return errors.Newf("boom") + } + return nil + }, + }}}, + }, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica to the second and third nodes, and then decommission the + // second node. Since there are only 4 nodes in the cluster, the + // decommissioning replica must be rebalanced to the fourth node. + const decomNodeIdx = 1 + const decomNodeID = 2 + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) + adminSrv := tc.Server(decomNodeIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + _, err = adminClient.Decommission( + ctx, &serverpb.DecommissionRequest{ + NodeIDs: []roachpb.NodeID{decomNodeID}, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Activate the above testing knob to start rejecting future rebalances and + // then attempt to rebalance the decommissioning replica away. We expect a + // purgatory error to be returned here. + atomic.StoreInt64(&rejectSnapshots, 1) + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) + require.NoError(t, err) + + testStartTs := timeutil.Now() + recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) + require.NoError(t, enqueueErr) + require.Error(t, processErr, "expected processing error") + + // Flush logs and get log messages from replicate_queue.go since just + // before calling store.Enqueue(..). + log.Flush() + entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(), + math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData) + if err != nil { + t.Fatal(err) + } + + opName := "process replica" + errRegexp, err := regexp.Compile(`error processing replica:.*boom`) + require.NoError(t, err) + traceRegexp, err := regexp.Compile(`trace:.*`) + require.NoError(t, err) + opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName)) + require.NoError(t, err) + + // Validate that the error is logged, so that we can use the log entry to + // validate the trace output. + foundEntry := false + var entry logpb.Entry + for _, entry = range entries { + if errRegexp.MatchString(entry.Message) { + foundEntry = true + break + } + } + require.True(t, foundEntry) + + // Validate that the trace is included in the log message. + require.Regexp(t, traceRegexp, entry.Message) + require.Regexp(t, opRegexp, entry.Message) + + // Validate that the trace was logged with the correct tags for the replica. + require.Regexp(t, fmt.Sprintf("n%d", repl.NodeID()), entry.Tags) + require.Regexp(t, fmt.Sprintf("s%d", repl.StoreID()), entry.Tags) + require.Regexp(t, fmt.Sprintf("r%d/%d", repl.GetRangeID(), repl.ReplicaID()), entry.Tags) + require.Regexp(t, `replicate`, entry.Tags) + + // Validate that the returned tracing span includes the operation, but also + // that the stringified trace was not logged to the span or its parent. + processRecSpan, foundSpan := recording.FindSpan(opName) + require.True(t, foundSpan) + + foundParent := false + var parentRecSpan tracingpb.RecordedSpan + for _, parentRecSpan = range recording { + if parentRecSpan.SpanID == processRecSpan.ParentSpanID { + foundParent = true + break + } + } + require.True(t, foundParent) + spans := tracingpb.Recording{parentRecSpan, processRecSpan} + stringifiedSpans := spans.String() + require.NotRegexp(t, errRegexp, stringifiedSpans) + require.NotRegexp(t, traceRegexp, stringifiedSpans) +} + // TestReplicateQueueDecommissionPurgatoryError tests that failure to move a // decommissioning replica puts it in the replicate queue purgatory. func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {