Skip to content

Commit

Permalink
kvserver: log traces from replicate queue on errors or slow processing
Browse files Browse the repository at this point in the history
While we previously had some logging from the replicate queue as a
result of the standard queue logging, this change adds logging to the
replicate queue when there are errors in processing a replica, or when
processing a replica exceeds a 50% of the timeout duration.
When there are errors or the duration threshold is exceeded,
any error messages are logged along with the collected tracing spans
from the operation.

Release note (ops change): Added logging on replicate queue processing
in the presence of errors or when the duration exceeds 50% of the
timeout.

Release justification: Low risk observability change.
  • Loading branch information
AlexTalks committed Aug 25, 2022
1 parent 036b50a commit 6c45224
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ go_test(
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/netutil",
Expand Down
21 changes: 15 additions & 6 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
//
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return makeRateLimitedTimeoutFuncByPermittedSlowdown(permittedRangeScanSlowdown, rateSettings...)
}

// permittedRangeScanSlowdown is the factor of the above the estimated duration
// for a range scan given the configured rate which we use to configure
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// makeRateLimitedTimeoutFuncByPermittedSlowdown creates a timeout function based on a permitted
// slowdown factor on the estimated queue processing duration based on the given rate settings.
// See makeRateLimitedTimeoutFunc for more information.
func makeRateLimitedTimeoutFuncByPermittedSlowdown(
permittedSlowdown int, rateSettings ...*settings.ByteSizeSetting,
) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
Expand All @@ -95,19 +109,14 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue
}
}
estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
timeout := estimatedDuration * time.Duration(permittedSlowdown)
if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
}
}

// permittedRangeScanSlowdown is the factor of the above the estimated duration
// for a range scan given the configured rate which we use to configure
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// PurgatoryError indicates a replica processing failure which indicates the
// replica can be placed into purgatory for faster retries than the replica
// scanner's interval.
Expand Down
47 changes: 46 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
)
Expand Down Expand Up @@ -511,6 +513,9 @@ type replicateQueue struct {
// descriptors.
updateCh chan time.Time
lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines
// logTracesThresholdFunc returns the threshold for logging traces from
// processing a replica.
logTracesThresholdFunc queueProcessTimeoutFunc
}

// newReplicateQueue returns a new instance of replicateQueue.
Expand All @@ -520,6 +525,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
allocator: allocator,
purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C,
updateCh: make(chan time.Time, 1),
logTracesThresholdFunc: makeRateLimitedTimeoutFuncByPermittedSlowdown(
permittedRangeScanSlowdown/2, rebalanceSnapshotRate, recoverySnapshotRate,
),
}
store.metrics.registry.AddMetricStruct(&rq.metrics)
rq.baseQueue = newBaseQueue(
Expand Down Expand Up @@ -660,7 +668,7 @@ func (rq *replicateQueue) process(
// usually signaling that a rebalancing reservation could not be made with the
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
requeue, err := rq.processOneChange(
requeue, err := rq.processOneChangeWithTracing(
ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */
)
if isSnapshotError(err) {
Expand Down Expand Up @@ -736,6 +744,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {}

var _ PurgatoryError = decommissionPurgatoryError{}

// processOneChangeWithTracing executes processOneChange within a tracing span,
// logging the resulting traces to the DEV channel in the case of errors or
// when the configured log traces threshold is exceeded.
func (rq *replicateQueue) processOneChangeWithTracing(
ctx context.Context,
repl *Replica,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
) (requeue bool, _ error) {
processStart := timeutil.Now()
ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica",
tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

requeue, err := rq.processOneChange(ctx, repl, canTransferLeaseFrom, scatter, dryRun)

// Utilize a new background context (properly annotated) to avoid writing
// traces from a child context into its parent.
{
ctx := repl.AnnotateCtx(rq.AnnotateCtx(context.Background()))
rec := sp.GetConfiguredRecording()
processDuration := timeutil.Since(processStart)
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold
if err != nil {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec)
} else if exceededDuration {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Infof(ctx, "processing replica took %s, exceeding threshold of %s\ntrace:\n%s",
processDuration, loggingThreshold, rec)
}
}

return requeue, err
}

func (rq *replicateQueue) processOneChange(
ctx context.Context,
repl *Replica,
Expand Down
125 changes: 125 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -617,6 +620,128 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) {
})
}

// TestReplicateQueueTracingOnError tests that an error or slowdown in
// processing a replica results in traces being logged.
func TestReplicateQueueTracingOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
s := log.ScopeWithoutShowLogs(t)
defer s.Close(t)

// NB: This test injects a fake failure during replica rebalancing, and we use
// this `rejectSnapshots` variable as a flag to activate or deactivate that
// injected failure.
var rejectSnapshots int64
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
return nil
},
}}},
},
)
defer tc.Stopper().Stop(ctx)

// Add a replica to the second and third nodes, and then decommission the
// second node. Since there are only 4 nodes in the cluster, the
// decommissioning replica must be rebalanced to the fourth node.
const decomNodeIdx = 1
const decomNodeID = 2
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx))
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1))
adminSrv := tc.Server(decomNodeIdx)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
_, err = adminClient.Decommission(
ctx, &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{decomNodeID},
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
},
)
require.NoError(t, err)

// Activate the above testing knob to start rejecting future rebalances and
// then attempt to rebalance the decommissioning replica away. We expect a
// purgatory error to be returned here.
atomic.StoreInt64(&rejectSnapshots, 1)
store := tc.GetFirstStoreFromServer(t, 0)
repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID)
require.NoError(t, err)

testStartTs := timeutil.Now()
recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.Error(t, processErr, "expected processing error")

// Flush logs and get log messages from replicate_queue.go since just
// before calling store.Enqueue(..).
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData)
if err != nil {
t.Fatal(err)
}

opName := "process replica"
errRegexp, err := regexp.Compile(`error processing replica:.*boom`)
require.NoError(t, err)
traceRegexp, err := regexp.Compile(`trace:.*`)
require.NoError(t, err)
opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName))
require.NoError(t, err)

// Validate that the error is logged, so that we can use the log entry to
// validate the trace output.
foundEntry := false
var entry logpb.Entry
for _, entry = range entries {
if errRegexp.MatchString(entry.Message) {
foundEntry = true
break
}
}
require.True(t, foundEntry)

// Validate that the trace is included in the log message.
require.Regexp(t, traceRegexp, entry.Message)
require.Regexp(t, opRegexp, entry.Message)

// Validate that the trace was logged with the correct tags for the replica.
require.Regexp(t, fmt.Sprintf("n%d", repl.NodeID()), entry.Tags)
require.Regexp(t, fmt.Sprintf("s%d", repl.StoreID()), entry.Tags)
require.Regexp(t, fmt.Sprintf("r%d/%d", repl.GetRangeID(), repl.ReplicaID()), entry.Tags)
require.Regexp(t, `replicate`, entry.Tags)

// Validate that the returned tracing span includes the operation, but also
// that the stringified trace was not logged to the span or its parent.
processRecSpan, foundSpan := recording.FindSpan(opName)
require.True(t, foundSpan)

foundParent := false
var parentRecSpan tracingpb.RecordedSpan
for _, parentRecSpan = range recording {
if parentRecSpan.SpanID == processRecSpan.ParentSpanID {
foundParent = true
break
}
}
require.True(t, foundParent)
spans := tracingpb.Recording{parentRecSpan, processRecSpan}
stringifiedSpans := spans.String()
require.NotRegexp(t, errRegexp, stringifiedSpans)
require.NotRegexp(t, traceRegexp, stringifiedSpans)
}

// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a
// decommissioning replica puts it in the replicate queue purgatory.
func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
Expand Down

0 comments on commit 6c45224

Please sign in to comment.