Skip to content

Commit

Permalink
Merge pull request #131372 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2-128794

release-24.2: changefeedccl: add timers around key parts of the changefeed pipeline
  • Loading branch information
asg0451 authored Sep 26, 2024
2 parents 4b11721 + ff470fc commit e35a012
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ GO_TARGETS = [
"//pkg/ccl/changefeedccl/schemafeed/schematestutils:schematestutils",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
"//pkg/ccl/changefeedccl/timers:timers",
"//pkg/ccl/changefeedccl:changefeedccl",
"//pkg/ccl/changefeedccl:changefeedccl_test",
"//pkg/ccl/cliccl/cliflagsccl:cliflagsccl",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/timers",
"//pkg/ccl/kvccl/kvfollowerreadsccl",
"//pkg/ccl/utilccl",
"//pkg/cloud",
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
ScopedTimers: ca.sliMetrics.Timers,
MonitoringCfg: monitoringCfg,
}, nil
}
Expand Down Expand Up @@ -1232,12 +1233,14 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
var err error
sli, err := cf.metrics.getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
scope := cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]
sli, err := cf.metrics.getSLIMetrics(scope)
if err != nil {
cf.MoveToDraining(err)
return
}
cf.sliMetrics = sli

cf.sink, err = getResolvedTimestampSink(ctx, cf.FlowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sli)

Expand Down Expand Up @@ -1644,6 +1647,8 @@ func (cf *changeFrontier) maybeCheckpointJob(
func (cf *changeFrontier) checkpointJobProgress(
frontier hlc.Timestamp, checkpoint jobspb.ChangefeedProgress_Checkpoint,
) (bool, error) {
defer cf.sliMetrics.Timers.CheckpointJobProgress.Start()()

if cf.knobs.RaiseRetryableError != nil {
if err := cf.knobs.RaiseRetryableError(); err != nil {
return false, changefeedbase.MarkRetryableError(
Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
}
}

stop := c.metrics.Timers.Encode.Start()
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
return c.encodeForParquet(
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
Expand All @@ -444,10 +445,14 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
// Since we're done processing/converting this event, and will not use much more
// than len(key)+len(bytes) worth of resources, adjust allocation to match.
alloc.AdjustBytesToTarget(ctx, int64(len(keyCopy)+len(valueCopy)))
stop()

if err := c.sink.EmitRow(
ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc,
); err != nil {
c.metrics.Timers.EmitRow.Time(func() {
err = c.sink.EmitRow(
ctx, topic, keyCopy, valueCopy, schemaTS, updatedRow.MvccTimestamp, alloc,
)
})
if err != nil {
return err
}
if log.V(3) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/timers",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -54,6 +55,7 @@ go_test(
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/schemafeed/schematestutils",
"//pkg/ccl/changefeedccl/timers",
"//pkg/ccl/storageccl",
"//pkg/jobs/jobspb",
"//pkg/keys",
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -93,6 +94,8 @@ type Config struct {

// Knobs are kvfeed testing knobs.
Knobs TestingKnobs

ScopedTimers *timers.ScopedTimers
}

// Run will run the kvfeed. The feed runs synchronously and returns an
Expand Down Expand Up @@ -126,7 +129,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
cfg.SchemaFeed,
sc, pff, bf, cfg.Targets, cfg.Knobs)
sc, pff, bf, cfg.Targets, cfg.ScopedTimers, cfg.Knobs)
f.onBackfillCallback = cfg.MonitoringCfg.OnBackfillCallback
f.rangeObserver = startLaggingRangesObserver(g, cfg.MonitoringCfg.LaggingRangesCallback,
cfg.MonitoringCfg.LaggingRangesPollingInterval, cfg.MonitoringCfg.LaggingRangesThreshold)
Expand Down Expand Up @@ -259,6 +262,7 @@ type kvFeed struct {
schemaChangePolicy changefeedbase.SchemaChangePolicy

targets changefeedbase.Targets
timers *timers.ScopedTimers

// These dependencies are made available for test injection.
bufferFactory func() kvevent.Buffer
Expand All @@ -285,6 +289,7 @@ func newKVFeed(
pff physicalFeedFactory,
bf func() kvevent.Buffer,
targets changefeedbase.Targets,
ts *timers.ScopedTimers,
knobs TestingKnobs,
) *kvFeed {
return &kvFeed{
Expand All @@ -305,6 +310,7 @@ func newKVFeed(
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
}
}
Expand Down Expand Up @@ -579,6 +585,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
}

Expand All @@ -590,7 +597,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
// until a table event (i.e. a column is added/dropped) has occurred, which
// signals another possible scan.
g.GoCtx(func(ctx context.Context) error {
return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs)
return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs, f.timers)
})
g.GoCtx(func(ctx context.Context) error {
return f.physicalFeed.Run(ctx, memBuf, physicalCfg)
Expand Down Expand Up @@ -673,6 +680,7 @@ func copyFromSourceToDestUntilTableEvent(
schemaFeed schemafeed.SchemaFeed,
endTime hlc.Timestamp,
knobs TestingKnobs,
st *timers.ScopedTimers,
) error {
// Initially, the only copy boundary is the end time if one is specified.
// Once we discover a table event (which is before the end time), that will
Expand All @@ -689,6 +697,7 @@ func copyFromSourceToDestUntilTableEvent(
// from rangefeed) and checks if a table event was encountered at or before
// said timestamp. If so, it replaces the copy boundary with the table event.
checkForTableEvent = func(ts hlc.Timestamp) error {
defer st.KVFeedWaitForTableEvent.Start()()
// There's no need to check for table events again if we already found one
// since that should already be the earliest one.
if _, ok := boundary.(*errTableEventReached); ok {
Expand Down Expand Up @@ -782,6 +791,8 @@ func copyFromSourceToDestUntilTableEvent(

// writeToDest writes an event to the dest.
writeToDest = func(e kvevent.Event) error {
defer st.KVFeedBuffer.Start()()

switch e.Type() {
case kvevent.TypeKV, kvevent.TypeFlush:
return dest.Add(ctx, e)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
Expand Down Expand Up @@ -142,14 +143,15 @@ func TestKVFeed(t *testing.T) {
})
ref := rawEventFeed(tc.events)
tf := newRawTableFeed(tc.descs, tc.initialHighWater)
st := timers.New(time.Minute).GetOrCreateScopedTimers("")
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
tc.initialHighWater, tc.endTime,
codec,
tf, sf, rangefeedFactory(ref.run), bufferFactory,
changefeedbase.Targets{},
TestingKnobs{})
st, TestingKnobs{})
ctx, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -637,7 +639,7 @@ func TestCopyFromSourceToDestUntilTableEvent(t *testing.T) {
schemaFeed := &testSchemaFeed{tableEvents: tc.tableEvents}
endTime := tc.endTime

err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{})
err = copyFromSourceToDestUntilTableEvent(ctx, dest, src, frontier, schemaFeed, endTime, TestingKnobs{}, timers.New(1*time.Second).GetOrCreateScopedTimers(""))
require.Equal(t, tc.expectedErr, err)
require.Empty(t, src.events)
require.Equal(t, tc.expectedEvents, dest.events)
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand All @@ -35,6 +36,7 @@ type rangeFeedConfig struct {
WithFiltering bool
RangeObserver kvcoord.RangeObserver
Knobs TestingKnobs
Timers *timers.ScopedTimers
}

// rangefeedFactory is a function that creates and runs a rangefeed.
Expand All @@ -55,6 +57,7 @@ type rangefeed struct {
// that the rangefeed uses to send event messages to.
eventCh <-chan kvcoord.RangeFeedMessage
knobs TestingKnobs
st *timers.ScopedTimers
}

// Run implements the physicalFeedFactory interface.
Expand Down Expand Up @@ -82,6 +85,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
cfg: cfg,
eventCh: eventCh,
knobs: cfg.Knobs,
st: cfg.Timers,
}
g := ctxgroup.WithContext(ctx)
g.GoCtx(feed.addEventsToBuffer)
Expand Down Expand Up @@ -118,11 +122,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
return err
}
}
stop := p.st.RangefeedBufferValue.Start()
if err := p.memBuf.Add(
ctx, kvevent.MakeKVEvent(e.RangeFeedEvent),
); err != nil {
return err
}
stop()
case *kvpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
// RangeFeed happily forwards any closed timestamps it receives as
Expand All @@ -133,11 +139,13 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
if p.knobs.ShouldSkipCheckpoint != nil && p.knobs.ShouldSkipCheckpoint(t) {
continue
}
stop := p.st.RangefeedBufferCheckpoint.Start()
if err := p.memBuf.Add(
ctx, kvevent.MakeResolvedEvent(e.RangeFeedEvent, jobspb.ResolvedSpan_NONE),
); err != nil {
return err
}
stop()
case *kvpb.RangeFeedSSTable:
// For now, we just error on SST ingestion, since we currently don't
// expect SST ingestion into spans with active changefeeds.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -85,6 +86,8 @@ type AggMetrics struct {
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

Timers *timers.Timers

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -163,6 +166,8 @@ type sliMetrics struct {
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

Timers *timers.ScopedTimers

mu struct {
syncutil.Mutex
id int64
Expand Down Expand Up @@ -1064,6 +1069,7 @@ func newAggregateMetrics(histogramWindow time.Duration, lookup *cidr.Lookup) *Ag
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
Timers: timers.New(histogramWindow),
NetMetrics: lookup.MakeNetMetrics(metaNetworkBytesOut, metaNetworkBytesIn, "sink"),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
Expand Down Expand Up @@ -1133,6 +1139,9 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
TotalRanges: a.TotalRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),

Timers: a.Timers.GetOrCreateScopedTimers(scope),

// TODO(#130358): Again, this doesn't belong here, but it's the most
// convenient way to feed this metric to changefeeds.
NetMetrics: a.NetMetrics,
Expand Down
14 changes: 14 additions & 0 deletions pkg/ccl/changefeedccl/timers/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "timers",
srcs = ["timers.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/timeutil",
"@com_github_prometheus_client_golang//prometheus",
],
)
Loading

0 comments on commit e35a012

Please sign in to comment.