Skip to content

Commit

Permalink
crosscluster/logical: spruce up ldr debugging vtable
Browse files Browse the repository at this point in the history
Epic: none

Release note: non
  • Loading branch information
msbutler committed Sep 25, 2024
1 parent 452a3a8 commit 7632268
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,7 @@ func TestFlushErrorHandling(t *testing.T) {
lrw.purgatory.flush = lrw.flushBuffer
lrw.purgatory.bytesGauge = lrw.metrics.RetryQueueBytes
lrw.purgatory.eventsGauge = lrw.metrics.RetryQueueEvents
lrw.purgatory.debug = &streampb.DebugLogicalConsumerStatus{}

lrw.bh = []BatchHandler{(mockBatchHandler(true))}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func newLogicalReplicationWriterProcessor(
checkpoint: lrw.checkpoint,
bytesGauge: lrw.metrics.RetryQueueBytes,
eventsGauge: lrw.metrics.RetryQueueEvents,
debug: &lrw.debug,
}

if err := lrw.Init(ctx, lrw, post, logicalReplicationWriterResultType, flowCtx, processorID, nil, /* memMonitor */
Expand Down Expand Up @@ -425,6 +426,7 @@ func (lrw *logicalReplicationWriterProcessor) close() {
lrw.purgatory.bytesGauge.Dec(lrw.purgatory.bytes)
for _, i := range lrw.purgatory.levels {
lrw.purgatory.eventsGauge.Dec(int64(len(i.events)))
lrw.purgatory.debug.RecordPurgatory(-int64(len(i.events)))
}

lrw.InternalClose()
Expand All @@ -444,11 +446,10 @@ func (lrw *logicalReplicationWriterProcessor) sendError(err error) {
// consumeEvents handles processing events on the event queue and returns once
// the event channel has closed.
func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) error {
before := timeutil.Now()
lastLog := timeutil.Now()
lrw.debug.RecordRecvStart()
for event := range lrw.subscription.Events() {
lrw.debug.RecordRecv(timeutil.Since(before))
before = timeutil.Now()
lrw.debug.RecordRecv()
if err := lrw.handleEvent(ctx, event); err != nil {
return err
}
Expand All @@ -458,6 +459,7 @@ func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context)
log.Infof(lrw.Ctx(), "lagging frontier: %s with span %s", lrw.frontier.Frontier(), lrw.frontier.PeekFrontierSpan())
}
}
lrw.debug.RecordRecvStart()
}
return lrw.subscription.Err()
}
Expand Down Expand Up @@ -543,6 +545,7 @@ func (lrw *logicalReplicationWriterProcessor) checkpoint(
return nil
}
lrw.metrics.CheckpointEvents.Inc(1)
lrw.debug.RecordCheckpoint(lrw.frontier.Frontier().GoTime())
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/crosscluster/logical/purgatory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type purgatory struct {
bytes int64
levels []purgatoryLevel
eventsGauge, bytesGauge *metric.Gauge
debug *streampb.DebugLogicalConsumerStatus
}

type purgatoryLevel struct {
Expand Down Expand Up @@ -95,6 +96,7 @@ func (p *purgatory) Store(
p.bytes += byteSize
p.bytesGauge.Inc(byteSize)
p.eventsGauge.Inc(int64(len(events)))
p.debug.RecordPurgatory(int64(len(events)))
return nil
}

Expand Down Expand Up @@ -133,7 +135,9 @@ func (p *purgatory) Drain(ctx context.Context) error {
}

p.bytesGauge.Dec(levelBytes - p.levels[i].bytes)
p.eventsGauge.Dec(int64(levelCount - len(remaining)))
flushedEventCount := int64(levelCount - len(remaining))
p.eventsGauge.Dec(flushedEventCount)
p.debug.RecordPurgatory(-flushedEventCount)

// If we have resolved every prior level and all events in this level were
// handled, we can resolve this level and emit its checkpoint, if any.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/purgatory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPurgatory(t *testing.T) {
resolved = int(sp[0].Timestamp.WallTime)
return nil
},
debug: &streampb.DebugLogicalConsumerStatus{},
}

sz := int64((&streampb.StreamEvent_KV{KeyValue: roachpb.KeyValue{Key: roachpb.Key("a")}}).Size())
Expand Down
23 changes: 12 additions & 11 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,22 +1089,23 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
nonSensitiveCols: NonSensitiveColumns{
"stream_id",
"consumer",
"recv_wait",
"last_recv_wait",
"flush_count",
"state",
"recv_time",
"last_recv_time",
"ingest_time",
"flush_time",
"flush_count",
"flush_kvs",
"flush_bytes",
"flush_batches",
"last_time",
"last_kvs",
"last_bytes",
"last_flush_time",
"last_kvs_done",
"last_kvs_todo",
"last_batches",
"last_slowest",
"cur_time",
"cur_kvs_done",
"cur_kvs_todo",
"cur_batches",
"cur_slowest",
"checkpoints",
"retry_size",
"resolved_age",
},
},
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/repstream/streampb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ go_library(
embed = [":streampb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/repstream/streampb",
visibility = ["//visibility:public"],
deps = ["//pkg/util/syncutil"],
deps = [
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)
124 changes: 92 additions & 32 deletions pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
time "time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

type DebugProducerStatus struct {
Expand Down Expand Up @@ -129,23 +130,58 @@ type DebugLogicalConsumerStatus struct {
}
}

type LogicalConsumerState int

const (
Other LogicalConsumerState = iota
Flushing
Waiting
)

func (s LogicalConsumerState) String() string {
switch s {
case Other:
return "other"
case Flushing:
return "flush"
case Waiting:
return "receive"
default:
return "unknown"
}
}

type DebugLogicalConsumerStats struct {
Recv struct {
CurReceiveStart time.Time
LastWaitNanos, TotalWaitNanos int64
}

Ingest struct {
// TotalIngestNanos is the total time spent not waiting for data.
CurIngestStart time.Time
TotalIngestNanos int64
}

Flushes struct {
Count, Nanos, KVs, Bytes, Batches int64

Current struct {
StartedUnixMicros, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64
Last struct {
CurFlushStart time.Time
LastFlushNanos, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64
// TODO(dt): BatchErrors atomic.Int64
// TODO(dt): LastBatchErr atomic.Value
}
Last struct {
Nanos, KVs, Bytes, Batches, SlowestBatchNanos int64
// TODO(dt): Errors atomic.Int64
}
}
Checkpoints struct {
Count int64
LastCheckpoint, Resolved time.Time
}

CurrentState LogicalConsumerState

Purgatory struct {
CurrentCount int64
}
}

Expand All @@ -154,62 +190,86 @@ func (d *DebugLogicalConsumerStatus) GetStats() DebugLogicalConsumerStats {
defer d.mu.Unlock()
return d.mu.stats
}
func (d *DebugLogicalConsumerStatus) RecordRecvStart() {
d.mu.Lock()
defer d.mu.Unlock()
d.mu.stats.Recv.CurReceiveStart = timeutil.Now()
d.mu.stats.CurrentState = Waiting
if !d.mu.stats.Ingest.CurIngestStart.IsZero() {
d.mu.stats.Ingest.TotalIngestNanos += timeutil.Since(d.mu.stats.Ingest.CurIngestStart).Nanoseconds()
}
d.mu.stats.Recv.LastWaitNanos = 0
d.mu.stats.Ingest.CurIngestStart = time.Time{}
}

func (d *DebugLogicalConsumerStatus) RecordRecv(wait time.Duration) {
nanos := wait.Nanoseconds()
func (d *DebugLogicalConsumerStatus) RecordRecv() {
d.mu.Lock()
defer d.mu.Unlock()
nanos := timeutil.Since(d.mu.stats.Recv.CurReceiveStart).Nanoseconds()
d.mu.stats.CurrentState = Other
d.mu.stats.Recv.LastWaitNanos = nanos
d.mu.stats.Recv.TotalWaitNanos += nanos
d.mu.Unlock()

d.mu.stats.Ingest.CurIngestStart = timeutil.Now()
d.mu.stats.Recv.CurReceiveStart = time.Time{}
}

func (d *DebugLogicalConsumerStatus) SetInjectedFailurePercent(percent uint32) {
d.mu.Lock()
d.mu.injectFailurePercent = percent
d.mu.Unlock()
d.mu.Unlock() // nolint:deferunlockcheck
}

func (d *DebugLogicalConsumerStatus) RecordFlushStart(start time.Time, keyCount int64) uint32 {
micros := start.UnixMicro()
d.mu.Lock()
d.mu.stats.Flushes.Current.TotalKVs = keyCount
d.mu.stats.Flushes.Current.StartedUnixMicros = micros
defer d.mu.Unlock()
// Reset the incremental flush stats.
d.mu.stats.Flushes.Last.SlowestBatchNanos = 0
d.mu.stats.Flushes.Last.Batches = 0
d.mu.stats.Flushes.Last.ProcessedKVs = 0
d.mu.stats.Flushes.Last.LastFlushNanos = 0

d.mu.stats.CurrentState = Flushing
d.mu.stats.Flushes.Last.CurFlushStart = start
d.mu.stats.Flushes.Last.TotalKVs = keyCount
failPercent := d.mu.injectFailurePercent
d.mu.Unlock()
return failPercent
}

func (d *DebugLogicalConsumerStatus) RecordBatchApplied(t time.Duration, keyCount int64) {
nanos := t.Nanoseconds()
d.mu.Lock()
d.mu.stats.Flushes.Current.Batches++
d.mu.stats.Flushes.Current.ProcessedKVs += keyCount
if d.mu.stats.Flushes.Current.SlowestBatchNanos < nanos { // nolint:deferunlockcheck
d.mu.stats.Flushes.Current.SlowestBatchNanos = nanos
d.mu.stats.Flushes.Last.Batches++
d.mu.stats.Flushes.Last.ProcessedKVs += keyCount
if d.mu.stats.Flushes.Last.SlowestBatchNanos < nanos { // nolint:deferunlockcheck
d.mu.stats.Flushes.Last.SlowestBatchNanos = nanos
}
d.mu.Unlock() // nolint:deferunlockcheck
}

func (d *DebugLogicalConsumerStatus) RecordFlushComplete(totalNanos, keyCount, byteSize int64) {
d.mu.Lock()

defer d.mu.Unlock()
d.mu.stats.CurrentState = Other
d.mu.stats.Flushes.Count++
d.mu.stats.Flushes.Nanos += totalNanos
d.mu.stats.Flushes.KVs += keyCount
d.mu.stats.Flushes.Bytes += byteSize
d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Current.Batches

d.mu.stats.Flushes.Last.Nanos = totalNanos
d.mu.stats.Flushes.Last.Batches = d.mu.stats.Flushes.Current.Batches
d.mu.stats.Flushes.Last.SlowestBatchNanos = d.mu.stats.Flushes.Current.SlowestBatchNanos
d.mu.stats.Flushes.Last.KVs = keyCount
d.mu.stats.Flushes.Last.Bytes = byteSize
d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Last.Batches
d.mu.stats.Flushes.Last.LastFlushNanos = totalNanos
d.mu.stats.Flushes.Last.CurFlushStart = time.Time{}
}

d.mu.stats.Flushes.Current.StartedUnixMicros = 0
d.mu.stats.Flushes.Current.SlowestBatchNanos = 0
d.mu.stats.Flushes.Current.Batches = 0
d.mu.stats.Flushes.Current.TotalKVs = 0
d.mu.stats.Flushes.Current.ProcessedKVs = 0
func (d *DebugLogicalConsumerStatus) RecordCheckpoint(resolved time.Time) {
d.mu.Lock()
d.mu.stats.Checkpoints.Count++
d.mu.stats.Checkpoints.Resolved = resolved
d.mu.stats.Checkpoints.LastCheckpoint = timeutil.Now() // nolint:deferunlockcheck
d.mu.Unlock()
}

d.mu.Unlock() // nolint:deferunlockcheck
func (d *DebugLogicalConsumerStatus) RecordPurgatory(netEvents int64) {
d.mu.Lock()
d.mu.stats.Purgatory.CurrentCount += netEvents
d.mu.Unlock()
}
Loading

0 comments on commit 7632268

Please sign in to comment.