Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85990: metric: add prometheus-based histogram r=aadityasondhi a=aadityasondhi

From cockroachdb#81181:

Our current histogram is based on `hdrhistogram`. This tends to create
lots of buckets and is inflexible w.r.t the bucket layout. In hindsight,
it was a poor choice of default implementation (I can say that since I
introduced it) and its cost is disincentivizing the introduction of
histograms that would be useful.

This commit introduces a histogram that is based on a completely vanilla
`prometheus.Histogram`. The only reason we need to wrap it is because we
want to export quantiles to CockraochDB's internal timeseries (it does
not support histograms) and this requires maintaining an internal windowed
histogram (on top of the cumulative histogram).

With this done, we can now introduce metrics with any kind of buckets we
want. Helpfully, we introduce two common kinds of buckets, suitable for
IO-type and RPC-type latencies. These are defined in a human-readable
format by explicitly listing out the buckets.

We can move existing metrics to HistogramV2 easily, assuming we are not
concerned with existing prometheus scrapers getting tripped up by the
changes in bucket boundaries. I assume this is not a big deal in
practice as long as it doesn't happen "all the time". In fact, I would
strongly suggest we move all metrics wholesale and remove the
hdrhistogram-based implementation. If this is not acceptable for some
reason, we ought to at least deprecated it.

We also slightly improve the existing `Histogram` code by unifying how
the windowed histograms are ticked and by making explicit where their
quantiles are recorded (this dependency was previously hidden via a
local interface assertion).

Resolves cockroachdb#10015.
Resolves cockroachdb#64962.
Alternative to dhartunian@eac3d06

Release justification: low risk, high benefit changes

86007: kvserver: log traces from replicate queue on errors or slow processing r=andreimatei a=AlexTalks

While we previously had some logging from the replicate queue as a
result of the standard queue logging, this change adds logging to the
replicate queue when there are errors in processing a replica, or when
processing a replica exceeds a 50% of the timeout duration.
When there are errors or the duration threshold is exceeded,
any error messages are logged along with the collected tracing spans
from the operation.

Release note (ops change): Added logging on replicate queue processing
in the presence of errors or when the duration exceeds 50% of the
timeout.

Release justification: Low risk observability change.

86255: upgrades,upgrade,upgrades_test: Added an upgrade for updating invalid column ID in seq's back references r=Xiang-Gu a=Xiang-Gu

commit 1: a small refactoring of existing function
commit 2: added a field to TenantDeps
commit 3: added a cluster version upgrade that attempts to update
invalid column ID in seq's back references due to bugs in prior versions.
See below for details
commit 4: wrote a test for this newly added upgrade

Previously, in version 21.1 and prior, `ADD COLUMN DEFAULT nextval('s')`
will incorrectly store a 0-valued column id in the sequence 's' back reference
`dependedOnBy` because we added this dependency before allocating an
ID to the newly added column. Customers ran into issues when upgrading
to v22.1 so we relaxed the validation logic as a quick short-term fix, as detailed
in cockroachdb#82859. 

Now we want to give this issue a more proper treatment by adding a cluster
upgrade (to v22.2) that attempts to detect such invalid column ID issues and 
update them with the correct column ID. This PR does exactly this.

Fixes: cockroachdb#83124

Release note: None

Release justification: fixed a release blocker that will resolve invalid column ID
appearance in sequence's back referenced, caused by bugs in older binaries.

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Xiang Gu <[email protected]>
  • Loading branch information
5 people committed Aug 26, 2022
4 parents 5cc1afa + d7d5838 + 6c45224 + 1cc5b55 commit db1554b
Show file tree
Hide file tree
Showing 26 changed files with 1,034 additions and 115 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.1-64 set the active cluster version in the format '<major>.<minor>'
version version 1000022.1-66 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>1000022.1-66</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ const (
// GCHintInReplicaState adds GC hint to replica state. When this version is
// enabled, replicas will populate GC hint and update them when necessary.
GCHintInReplicaState
// UpdateInvalidColumnIDsInSequenceBackReferences looks for invalid column
// ids in sequences' back references and attempts a best-effort-based matching
// to update those column IDs.
UpdateInvalidColumnIDsInSequenceBackReferences

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -484,6 +488,10 @@ var rawVersionsSingleton = keyedVersions{
Key: GCHintInReplicaState,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 64},
},
{
Key: UpdateInvalidColumnIDsInSequenceBackReferences,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 66},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ go_test(
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/netutil",
Expand Down
21 changes: 15 additions & 6 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
//
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return makeRateLimitedTimeoutFuncByPermittedSlowdown(permittedRangeScanSlowdown, rateSettings...)
}

// permittedRangeScanSlowdown is the factor of the above the estimated duration
// for a range scan given the configured rate which we use to configure
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// makeRateLimitedTimeoutFuncByPermittedSlowdown creates a timeout function based on a permitted
// slowdown factor on the estimated queue processing duration based on the given rate settings.
// See makeRateLimitedTimeoutFunc for more information.
func makeRateLimitedTimeoutFuncByPermittedSlowdown(
permittedSlowdown int, rateSettings ...*settings.ByteSizeSetting,
) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
Expand All @@ -95,19 +109,14 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue
}
}
estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
timeout := estimatedDuration * time.Duration(permittedSlowdown)
if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
}
}

// permittedRangeScanSlowdown is the factor of the above the estimated duration
// for a range scan given the configured rate which we use to configure
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// PurgatoryError indicates a replica processing failure which indicates the
// replica can be placed into purgatory for faster retries than the replica
// scanner's interval.
Expand Down
47 changes: 46 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
)
Expand Down Expand Up @@ -517,6 +519,9 @@ type replicateQueue struct {
// descriptors.
updateCh chan time.Time
lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines
// logTracesThresholdFunc returns the threshold for logging traces from
// processing a replica.
logTracesThresholdFunc queueProcessTimeoutFunc
}

// newReplicateQueue returns a new instance of replicateQueue.
Expand All @@ -526,6 +531,9 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
allocator: allocator,
purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C,
updateCh: make(chan time.Time, 1),
logTracesThresholdFunc: makeRateLimitedTimeoutFuncByPermittedSlowdown(
permittedRangeScanSlowdown/2, rebalanceSnapshotRate, recoverySnapshotRate,
),
}
store.metrics.registry.AddMetricStruct(&rq.metrics)
rq.baseQueue = newBaseQueue(
Expand Down Expand Up @@ -666,7 +674,7 @@ func (rq *replicateQueue) process(
// usually signaling that a rebalancing reservation could not be made with the
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
requeue, err := rq.processOneChange(
requeue, err := rq.processOneChangeWithTracing(
ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */
)
if isSnapshotError(err) {
Expand Down Expand Up @@ -742,6 +750,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {}

var _ PurgatoryError = decommissionPurgatoryError{}

// processOneChangeWithTracing executes processOneChange within a tracing span,
// logging the resulting traces to the DEV channel in the case of errors or
// when the configured log traces threshold is exceeded.
func (rq *replicateQueue) processOneChangeWithTracing(
ctx context.Context,
repl *Replica,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
) (requeue bool, _ error) {
processStart := timeutil.Now()
ctx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica",
tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

requeue, err := rq.processOneChange(ctx, repl, canTransferLeaseFrom, scatter, dryRun)

// Utilize a new background context (properly annotated) to avoid writing
// traces from a child context into its parent.
{
ctx := repl.AnnotateCtx(rq.AnnotateCtx(context.Background()))
rec := sp.GetConfiguredRecording()
processDuration := timeutil.Since(processStart)
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold
if err != nil {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec)
} else if exceededDuration {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Infof(ctx, "processing replica took %s, exceeding threshold of %s\ntrace:\n%s",
processDuration, loggingThreshold, rec)
}
}

return requeue, err
}

func (rq *replicateQueue) processOneChange(
ctx context.Context,
repl *Replica,
Expand Down
125 changes: 125 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -617,6 +620,128 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) {
})
}

// TestReplicateQueueTracingOnError tests that an error or slowdown in
// processing a replica results in traces being logged.
func TestReplicateQueueTracingOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
s := log.ScopeWithoutShowLogs(t)
defer s.Close(t)

// NB: This test injects a fake failure during replica rebalancing, and we use
// this `rejectSnapshots` variable as a flag to activate or deactivate that
// injected failure.
var rejectSnapshots int64
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
return nil
},
}}},
},
)
defer tc.Stopper().Stop(ctx)

// Add a replica to the second and third nodes, and then decommission the
// second node. Since there are only 4 nodes in the cluster, the
// decommissioning replica must be rebalanced to the fourth node.
const decomNodeIdx = 1
const decomNodeID = 2
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx))
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1))
adminSrv := tc.Server(decomNodeIdx)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
_, err = adminClient.Decommission(
ctx, &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{decomNodeID},
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
},
)
require.NoError(t, err)

// Activate the above testing knob to start rejecting future rebalances and
// then attempt to rebalance the decommissioning replica away. We expect a
// purgatory error to be returned here.
atomic.StoreInt64(&rejectSnapshots, 1)
store := tc.GetFirstStoreFromServer(t, 0)
repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID)
require.NoError(t, err)

testStartTs := timeutil.Now()
recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.Error(t, processErr, "expected processing error")

// Flush logs and get log messages from replicate_queue.go since just
// before calling store.Enqueue(..).
log.Flush()
entries, err := log.FetchEntriesFromFiles(testStartTs.UnixNano(),
math.MaxInt64, 100, regexp.MustCompile(`replicate_queue\.go`), log.WithMarkedSensitiveData)
if err != nil {
t.Fatal(err)
}

opName := "process replica"
errRegexp, err := regexp.Compile(`error processing replica:.*boom`)
require.NoError(t, err)
traceRegexp, err := regexp.Compile(`trace:.*`)
require.NoError(t, err)
opRegexp, err := regexp.Compile(fmt.Sprintf(`operation:%s`, opName))
require.NoError(t, err)

// Validate that the error is logged, so that we can use the log entry to
// validate the trace output.
foundEntry := false
var entry logpb.Entry
for _, entry = range entries {
if errRegexp.MatchString(entry.Message) {
foundEntry = true
break
}
}
require.True(t, foundEntry)

// Validate that the trace is included in the log message.
require.Regexp(t, traceRegexp, entry.Message)
require.Regexp(t, opRegexp, entry.Message)

// Validate that the trace was logged with the correct tags for the replica.
require.Regexp(t, fmt.Sprintf("n%d", repl.NodeID()), entry.Tags)
require.Regexp(t, fmt.Sprintf("s%d", repl.StoreID()), entry.Tags)
require.Regexp(t, fmt.Sprintf("r%d/%d", repl.GetRangeID(), repl.ReplicaID()), entry.Tags)
require.Regexp(t, `replicate`, entry.Tags)

// Validate that the returned tracing span includes the operation, but also
// that the stringified trace was not logged to the span or its parent.
processRecSpan, foundSpan := recording.FindSpan(opName)
require.True(t, foundSpan)

foundParent := false
var parentRecSpan tracingpb.RecordedSpan
for _, parentRecSpan = range recording {
if parentRecSpan.SpanID == processRecSpan.ParentSpanID {
foundParent = true
break
}
}
require.True(t, foundParent)
spans := tracingpb.Recording{parentRecSpan, processRecSpan}
stringifiedSpans := spans.String()
require.NotRegexp(t, errRegexp, stringifiedSpans)
require.NotRegexp(t, traceRegexp, stringifiedSpans)
}

// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a
// decommissioning replica puts it in the replicate queue purgatory.
func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_codahale_hdrhistogram//:hdrhistogram",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_elastic_gosigar//:gosigar",
"@com_github_shirou_gopsutil_v3//net",
Expand Down Expand Up @@ -148,6 +147,7 @@ go_test(
"//pkg/util/system",
"//pkg/util/timeutil",
"@com_github_kr_pretty//:pretty",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_shirou_gopsutil_v3//net",
],
)
Expand Down
Loading

0 comments on commit db1554b

Please sign in to comment.