Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
128794: changefeedccl: add timers around key parts of the changefeed pipeline r=andyyang890 a=asg0451

Add timers around key parts of the changefeed
pipeline to help debug feeds experiencing issues.
We emit latency histograms for each stage using
the `changefeed.stage.<stage>.latency` metrics.
The metric respects the changefeed `scope` label
for debugging specific feeds.

The implementation exposes a mechanism to add new
timers for stages with ease.

Fixes: #121538

Release note (enterprise change): Added timers
around key parts of the changefeed pipeline to
help debug feeds experiencing issues. The
`changefeed.stage.<stage>.latency` metrics now
emit latency histograms for each stage. The metric
respects the changefeed `scope` label for
debugging specific feeds.


131309: crosscluster/logical: spruce up ldr debugging vtable r=dt a=msbutler

Epic: none

Release note: none

131330: catalog/lease: prevent panic inside upsertLeaseLocked r=fqazi a=fqazi

Previously, it was possible to panic inside upsertLeaseLocked, if the stored lease was nil. This could happen in tests which are designed to instantly release dereferenced leases. For example this was seen inside: TestAsOfSystemTimeUsesCache. To address this, this patch will add defensive code to check for a valid stored lease, before clearing the session ID stored inside the lease.

Fixes: #131300

Release note: None

131347: server/license: Initial support for serverless r=fqazi a=spilchen

Previously, the license enforcer was not initialized for secondary tenants. There are two modes for secondary tenants: when the tenant runs as a separate process from the system tenant (serverless), and when it shares the same process. In the shared process mode, the enforcer relied on a shared singleton—initialized for the system tenant and reused for secondary tenants. However, when the secondary tenant runs in a separate process (serverless), the enforcer had throttling fully disabled.

This change is the first step in supporting serverless. The main challenge in allowing secondary tenants to initialize the enforcer is that they don’t have access to the KV key stored in the system keyspace, which records the grace period's end when no license is installed. This change doesn’t resolve that yet, but it sets the foundation for future work. For now, it estimates the grace period by setting it to 7 days from the time the enforcer is created.

Several changes were made to support serverless in this form:
- Call the enforcer’s `Start()` function for secondary tenants as well.
- Allow `Start()` to be called multiple times.
- Move all parameters for `Start` into an options struct.
- Remove the enforcer singleton, as it caused more issues (especially in tests) than benefits.
- Secondary tenants that share the same process will still share the same enforcer, but now the enforcer is passed around by storing a copy in `SQLConfig`.

This change will be backported to 24.2, 24.1, 23.2 and 23.1.

Epic: CRDB-39988
Informs: CRDB-42309
Release note: None

131350: roachprod: ensure `CommandContext` returns when context is canceled r=herkolategan,srosenberg a=renatolabs

Without setting `WaitDelay`, there is a chance that we will hang waiting for child processes to exit if the IO pipes are not closed.

See documentation at:

https://pkg.go.dev/os/exec#Cmd

Informs: #131095

Release note: None

131351: sql/logictest: address flake inside synthetic_privileges r=fqazi a=fqazi

Previously, the logic test synthetic_privileges would flake with transaction retry errors querying from has_table_privilege. To address this, this patch makes the problematic invocation as retryable.

Fixes: #128370

Release note: None

Co-authored-by: Miles Frankel <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Matt Spilchen <[email protected]>
Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
6 people committed Sep 25, 2024
7 parents 2bdba49 + 1caa3e4 + 7632268 + 802ccfa + 101d3b9 + e6c909b + 21fb23e commit b82225e
Show file tree
Hide file tree
Showing 35 changed files with 535 additions and 174 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ GO_TARGETS = [
"//pkg/ccl/changefeedccl/schemafeed/schematestutils:schematestutils",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
"//pkg/ccl/changefeedccl/timers:timers",
"//pkg/ccl/changefeedccl:changefeedccl",
"//pkg/ccl/changefeedccl:changefeedccl_test",
"//pkg/ccl/cliccl/cliflagsccl:cliflagsccl",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/timers",
"//pkg/ccl/kvccl/kvfollowerreadsccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
ScopedTimers: ca.sliMetrics.Timers,
MonitoringCfg: monitoringCfg,
}, nil
}
Expand Down Expand Up @@ -1232,12 +1233,14 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
var err error
sli, err := cf.metrics.getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
cf.MoveToDraining(err)
return
}
cf.sliMetrics = sli

cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sli)

Expand Down Expand Up @@ -1644,6 +1647,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (bool, error) {
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()()

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(
Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
}
}

stop := c.metrics.Timers.Encode.Start()
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
return c.encodeForParquet(
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
Expand All @@ -444,10 +445,14 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
// Since we're done processing/converting this event, and will not use much more
// than len(key)+len(bytes) worth of resources, adjust allocation to match.
alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy)))
stop()

if err := c.sink.EmitRow(
ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc,
); err != nil {
c.metrics.Timers.EmitRow.Time(func() {
err = c.sink.EmitRow(
ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc,
)
})
if err != nil {
return err
}
if log.V(3) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/timers",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -54,6 +55,7 @@ go_test(
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/schemafeed/schematestutils",
"//pkg/ccl/changefeedccl/timers",
"//pkg/ccl/storageccl",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -93,6 +94,8 @@ type Config struct {

// Knobs are kvfeed testing knobs.
Knobs TestingKnobs

ScopedTimers *timers.ScopedTimers
}

// Run will run the kvfeed. The feed runs synchronously and returns an
Expand Down Expand Up @@ -126,7 +129,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
cfg.SchemaFeed,
sc, pff, bf, cfg.Targets, cfg.Knobs)
sc, pff, bf, cfg.Targets, cfg.ScopedTimers, cfg.Knobs)
f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback
f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback,
cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold)
Expand Down Expand Up @@ -259,6 +262,7 @@ type kvFeed struct {
schemaChangePolicy changefeedbase.SchemaChangePolicy

targets changefeedbase.Targets
timers *timers.ScopedTimers

// These dependencies are made available for test injection.
bufferFactory func() kvevent.Buffer
Expand All @@ -285,6 +289,7 @@ func newKVFeed(
pff physicalFeedFactory,
bf func() kvevent.Buffer,
targets changefeedbase.Targets,
ts *timers.ScopedTimers,
knobs TestingKnobs,
) *kvFeed {
return &kvFeed{
Expand All @@ -305,6 +310,7 @@ func newKVFeed(
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
}
}
Expand Down Expand Up @@ -579,6 +585,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
}

Expand All @@ -590,7 +597,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
// until a table event (i.e. a column is added/dropped) has occurred, which
// signals another possible scan.
g.GoCtx(func(ctx context.Context) error {
return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs)
return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs, f.timers)
})
g.GoCtx(func(ctx context.Context) error {
return f.physicalFeed.Run(ctx, memBuf, physicalCfg)
Expand Down Expand Up @@ -673,6 +680,7 @@ func copyFromSourceToDestUntilTableEvent(
schemaFeed schemafeed.SchemaFeed,
endTime hlc.Timestamp,
knobs TestingKnobs,
st *timers.ScopedTimers,
) error {
// Initially, the only copy boundary is the end time if one is specified.
// Once we discover a table event (which is before the end time), that will
Expand All @@ -689,6 +697,7 @@ func copyFromSourceToDestUntilTableEvent(
// from rangefeed) and checks if a table event was encountered at or before
// said timestamp. If so, it replaces the copy boundary with the table event.
checkForTableEvent = func(ts hlc.Timestamp) error {
defer st.KVFeedWaitForTableEvent.Start()()
// There's no need to check for table events again if we already found one
// since that should already be the earliest one.
if _, ok := boundary.(*errTableEventReached); ok {
Expand Down Expand Up @@ -782,6 +791,8 @@ func copyFromSourceToDestUntilTableEvent(

// writeToDest writes an event to the dest.
writeToDest = func(e kvevent.Event) error {
defer st.KVFeedBuffer.Start()()

switch e.Type() {
case kvevent.TypeKV, kvevent.TypeFlush:
return dest.Add(ctx, e)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
Expand Down Expand Up @@ -142,14 +143,15 @@ func TestKVFeed(t *testing.T) {
})
ref := rawEventFeed(tc.events)
tf := newRawTableFeed(tc.descs, tc.initialHighWater)
st := timers.New(time.Minute).GetOrCreateScopedTimers("")
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
tc.initialHighWater, tc.endTime,
codec,
tf, sf, rangefeedFactory(ref.run), bufferFactory,
changefeedbase.Targets{},
TestingKnobs{})
st, TestingKnobs{})
ctx, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -637,7 +639,7 @@ func TestCopyFromSourceToDestUntilTableEvent(t *testing.T) {
schemaFeed := &testSchemaFeed{tableEvents: tc.tableEvents}
endTime := tc.endTime

err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{})
err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{}, timers.New(1*time.Second).GetOrCreateScopedTimers(""))
require.Equal(t, tc.expectedErr, err)
require.Empty(t, src.events)
require.Equal(t, tc.expectedEvents, dest.events)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand All @@ -35,6 +36,7 @@ type rangeFeedConfig struct {
WithFiltering bool
RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
Timers *timers.ScopedTimers
}

// rangefeedFactory is a function that creates and runs a rangefeed.
Expand All @@ -55,6 +57,7 @@ type rangefeed struct {
// that the rangefeed uses to send event messages to.
eventCh <-chan kvcoord.RangeFeedMessage
knobs TestingKnobs
st *timers.ScopedTimers
}

// Run implements the physicalFeedFactory interface.
Expand Down Expand Up @@ -82,6 +85,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
cfg: cfg,
eventCh: eventCh,
knobs: cfg.Knobs,
st: cfg.Timers,
}
g := ctxgroup.WithContext(ctx)
g.GoCtx(feed.addEventsToBuffer)
Expand Down Expand Up @@ -118,11 +122,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
stop := p.st.RangefeedBufferValue.Start()
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
stop()
case *kvpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
Expand All @@ -133,11 +139,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
continue
}
stop := p.st.RangefeedBufferCheckpoint.Start()
if err := p.memBuf.Add(
ctx, kvevent.MakeResolvedEvent(e.RangeFeedEvent, jobspb.ResolvedSpan_NONE),
); err != nil {
return err
}
stop()
case *kvpb.RangeFeedSSTable:
// For now, we just error on SST ingestion, since we currently don't
// expect SST ingestion into spans with active changefeeds.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -85,6 +86,8 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

Timers *timers.Timers

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -163,6 +166,8 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

Timers *timers.ScopedTimers

mu struct {
syncutil.Mutex
id int64
Expand Down Expand Up @@ -1064,6 +1069,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
Expand Down Expand Up @@ -1133,6 +1139,9 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),

Timers: a.Timers.GetOrCreateScopedTimers(scope),

// TODO(#130358): Again, this doesn't belong here, but it's the most
// convenient way to feed this metric to changefeeds.
NetMetrics: a.NetMetrics,
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/changefeedccl/timers/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "timers",
srcs = ["timers.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/timeutil",
"@com_github_prometheus_client_golang//prometheus",
],
)
Loading

0 comments on commit b82225e

Please sign in to comment.