Skip to content

Commit

Permalink
crosscluster/producer: modify lastEmitWait and lastProduceWait comput…
Browse files Browse the repository at this point in the history
…ation

This patch modifies the lastEmitWait and lastProduceWait in the
crdb_internal.cluster_replication_node streams vtable to be either the current
wait or previous wait, if the event stream is currently waiting on that given
state.

Epic: none

Release note: none
  • Loading branch information
msbutler committed Sep 25, 2024
1 parent b82225e commit 2957335
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 26 deletions.
16 changes: 8 additions & 8 deletions pkg/ccl/crosscluster/producer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type eventStream struct {
lastCheckpointTime time.Time
lastCheckpointLen int

lastPolled time.Time

debug streampb.DebugProducerStatus
}

Expand Down Expand Up @@ -113,7 +111,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
return errors.AssertionFailedf("expected to be started once")
}

s.lastPolled = timeutil.Now()
s.debug.State.Store(int64(streampb.Emitting))
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())

sourceTenantID, err := s.validateProducerJobAndSpec(ctx)
if err != nil {
Expand Down Expand Up @@ -227,11 +226,11 @@ func (s *eventStream) setErr(err error) bool {

// Next implements eval.ValueGenerator interface.
func (s *eventStream) Next(ctx context.Context) (bool, error) {
emitWait := int64(timeutil.Since(s.lastPolled))

s.debug.State.Store(int64(streampb.Producing))
emitWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000
s.debug.Flushes.LastEmitWaitNanos.Store(emitWait)
s.debug.Flushes.EmitWaitNanos.Add(emitWait)
s.lastPolled = timeutil.Now()
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())

select {
case <-ctx.Done():
Expand All @@ -244,10 +243,11 @@ func (s *eventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
default:
produceWait := int64(timeutil.Since(s.lastPolled))
s.debug.State.Store(int64(streampb.Emitting))
produceWait := (timeutil.Now().UnixMicro() - s.debug.LastPolledMicros.Load()) * 1000
s.debug.Flushes.ProduceWaitNanos.Add(produceWait)
s.debug.Flushes.LastProduceWaitNanos.Store(produceWait)
s.lastPolled = timeutil.Now()
s.debug.LastPolledMicros.Store(timeutil.Now().UnixMicro())
return true, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/zip_table_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{
"spans",
"initial_ts",
"prev_ts",
"state",
"batches",
"checkpoints",
"megabytes",
Expand Down
22 changes: 21 additions & 1 deletion pkg/repstream/streampb/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type DebugProducerStatus struct {
// Identification info.
StreamID StreamID
// Properties.
Spec StreamPartitionSpec
Spec StreamPartitionSpec
State atomic.Int64

RF struct {
Checkpoints, Advances atomic.Int64
Expand All @@ -37,6 +38,25 @@ type DebugProducerStatus struct {
Micros atomic.Int64
Spans atomic.Value
}
LastPolledMicros atomic.Int64
}

type ProducerState int64

const (
Producing ProducerState = iota
Emitting
)

func (p ProducerState) String() string {
switch p {
case Producing:
return "produce"
case Emitting:
return "emit"
default:
return "unknown"
}
}

// TODO(dt): this really should be per server instead of process-global, i.e. if
Expand Down
42 changes: 26 additions & 16 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9214,6 +9214,7 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
spans INT,
initial_ts DECIMAL,
prev_ts DECIMAL,
state STRING,
batches INT,
checkpoints INT,
Expand Down Expand Up @@ -9261,41 +9262,50 @@ CREATE TABLE crdb_internal.cluster_replication_node_streams (
return d
}

dur := func(nanos int64) tree.Datum {
return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata)
}

for _, s := range sm.DebugGetProducerStatuses(ctx) {
resolved := time.UnixMicro(s.RF.ResolvedMicros.Load())
resolvedDatum := tree.DNull
if resolved.Unix() != 0 {
resolvedDatum = shortenLogical(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: resolved.UnixNano()}))
}

curState := streampb.ProducerState(s.State.Load())
currentWait := duration.Age(now, time.UnixMicro(s.LastPolledMicros.Load())).Nanos()

curOrLast := func(currentNanos int64, lastNanos int64, statePredicate streampb.ProducerState) tree.Datum {
if curState == statePredicate {
return dur(currentNanos)
}
return dur(lastNanos)
}
currentWaitWithState := func(statePredicate streampb.ProducerState) int64 {
if curState == statePredicate {
return currentWait
}
return 0
}

if err := addRow(
tree.NewDInt(tree.DInt(s.StreamID)),
tree.NewDString(fmt.Sprintf("%d[%d]", s.Spec.ConsumerNode, s.Spec.ConsumerProc)),
tree.NewDInt(tree.DInt(len(s.Spec.Spans))),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.InitialScanTimestamp)),
shortenLogical(eval.TimestampToDecimalDatum(s.Spec.PreviousReplicatedTimestamp)),
tree.NewDString(curState.String()),

tree.NewDInt(tree.DInt(s.Flushes.Batches.Load())),
tree.NewDInt(tree.DInt(s.Flushes.Checkpoints.Load())),
tree.NewDFloat(tree.DFloat(math.Round(float64(s.Flushes.Bytes.Load())/float64(1<<18))/4)),
age(time.UnixMicro(s.LastCheckpoint.Micros.Load())),

tree.NewDInterval(
duration.MakeDuration(s.Flushes.ProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.EmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastProduceWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
tree.NewDInterval(
duration.MakeDuration(s.Flushes.LastEmitWaitNanos.Load(), 0, 0),
types.DefaultIntervalTypeMetadata,
),
dur(s.Flushes.ProduceWaitNanos.Load()+currentWaitWithState(streampb.Producing)),
dur(s.Flushes.EmitWaitNanos.Load()+currentWaitWithState(streampb.Emitting)),
curOrLast(currentWait, s.Flushes.LastProduceWaitNanos.Load(), streampb.Producing),
curOrLast(currentWait, s.Flushes.LastEmitWaitNanos.Load(), streampb.Emitting),

tree.NewDInt(tree.DInt(s.RF.Checkpoints.Load())),
tree.NewDInt(tree.DInt(s.RF.Advances.Load())),
Expand Down
Loading

0 comments on commit 2957335

Please sign in to comment.