diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index 0ba7b8698c8f..e51b3707ac01 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1455,6 +1455,7 @@ func TestFlushErrorHandling(t *testing.T) { lrw.purgatory.flush = lrw.flushBuffer lrw.purgatory.bytesGauge = lrw.metrics.RetryQueueBytes lrw.purgatory.eventsGauge = lrw.metrics.RetryQueueEvents + lrw.purgatory.debug = &streampb.DebugLogicalConsumerStatus{} lrw.bh = []BatchHandler{(mockBatchHandler(true))} diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index c7c9fa09f707..0700d977e3fa 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -231,6 +231,7 @@ func newLogicalReplicationWriterProcessor( checkpoint: lrw.checkpoint, bytesGauge: lrw.metrics.RetryQueueBytes, eventsGauge: lrw.metrics.RetryQueueEvents, + debug: &lrw.debug, } if err := lrw.Init(ctx, lrw, post, logicalReplicationWriterResultType, flowCtx, processorID, nil, /* memMonitor */ @@ -425,6 +426,7 @@ func (lrw *logicalReplicationWriterProcessor) close() { lrw.purgatory.bytesGauge.Dec(lrw.purgatory.bytes) for _, i := range lrw.purgatory.levels { lrw.purgatory.eventsGauge.Dec(int64(len(i.events))) + lrw.purgatory.debug.RecordPurgatory(-int64(len(i.events))) } lrw.InternalClose() @@ -444,11 +446,10 @@ func (lrw *logicalReplicationWriterProcessor) sendError(err error) { // consumeEvents handles processing events on the event queue and returns once // the event channel has closed. func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) error { - before := timeutil.Now() lastLog := timeutil.Now() + lrw.debug.RecordRecvStart() for event := range lrw.subscription.Events() { - lrw.debug.RecordRecv(timeutil.Since(before)) - before = timeutil.Now() + lrw.debug.RecordRecv() if err := lrw.handleEvent(ctx, event); err != nil { return err } @@ -458,6 +459,7 @@ func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) log.Infof(lrw.Ctx(), "lagging frontier: %s with span %s", lrw.frontier.Frontier(), lrw.frontier.PeekFrontierSpan()) } } + lrw.debug.RecordRecvStart() } return lrw.subscription.Err() } @@ -543,6 +545,7 @@ func (lrw *logicalReplicationWriterProcessor) checkpoint( return nil } lrw.metrics.CheckpointEvents.Inc(1) + lrw.debug.RecordCheckpoint(lrw.frontier.Frontier().GoTime()) return nil } diff --git a/pkg/ccl/crosscluster/logical/purgatory.go b/pkg/ccl/crosscluster/logical/purgatory.go index 4b992dd96f92..a9d7fa7475a0 100644 --- a/pkg/ccl/crosscluster/logical/purgatory.go +++ b/pkg/ccl/crosscluster/logical/purgatory.go @@ -58,6 +58,7 @@ type purgatory struct { bytes int64 levels []purgatoryLevel eventsGauge, bytesGauge *metric.Gauge + debug *streampb.DebugLogicalConsumerStatus } type purgatoryLevel struct { @@ -95,6 +96,7 @@ func (p *purgatory) Store( p.bytes += byteSize p.bytesGauge.Inc(byteSize) p.eventsGauge.Inc(int64(len(events))) + p.debug.RecordPurgatory(int64(len(events))) return nil } @@ -133,7 +135,9 @@ func (p *purgatory) Drain(ctx context.Context) error { } p.bytesGauge.Dec(levelBytes - p.levels[i].bytes) - p.eventsGauge.Dec(int64(levelCount - len(remaining))) + flushedEventCount := int64(levelCount - len(remaining)) + p.eventsGauge.Dec(flushedEventCount) + p.debug.RecordPurgatory(-flushedEventCount) // If we have resolved every prior level and all events in this level were // handled, we can resolve this level and emit its checkpoint, if any. diff --git a/pkg/ccl/crosscluster/logical/purgatory_test.go b/pkg/ccl/crosscluster/logical/purgatory_test.go index 820e49d249b5..d287e5bc7256 100644 --- a/pkg/ccl/crosscluster/logical/purgatory_test.go +++ b/pkg/ccl/crosscluster/logical/purgatory_test.go @@ -60,6 +60,7 @@ func TestPurgatory(t *testing.T) { resolved = int(sp[0].Timestamp.WallTime) return nil }, + debug: &streampb.DebugLogicalConsumerStatus{}, } sz := int64((&streampb.StreamEvent_KV{KeyValue: roachpb.KeyValue{Key: roachpb.Key("a")}}).Size()) diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index 1250aa6575a2..a2bfb15c86da 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -1089,22 +1089,23 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ nonSensitiveCols: NonSensitiveColumns{ "stream_id", "consumer", - "recv_wait", - "last_recv_wait", - "flush_count", + "state", + "recv_time", + "last_recv_time", + "ingest_time", "flush_time", + "flush_count", "flush_kvs", "flush_bytes", "flush_batches", - "last_time", - "last_kvs", - "last_bytes", + "last_flush_time", + "last_kvs_done", + "last_kvs_todo", + "last_batches", "last_slowest", - "cur_time", - "cur_kvs_done", - "cur_kvs_todo", - "cur_batches", - "cur_slowest", + "checkpoints", + "retry_size", + "resolved_age", }, }, } diff --git a/pkg/repstream/streampb/BUILD.bazel b/pkg/repstream/streampb/BUILD.bazel index 066054ac985d..5435089ad2a4 100644 --- a/pkg/repstream/streampb/BUILD.bazel +++ b/pkg/repstream/streampb/BUILD.bazel @@ -46,5 +46,8 @@ go_library( embed = [":streampb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/repstream/streampb", visibility = ["//visibility:public"], - deps = ["//pkg/util/syncutil"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], ) diff --git a/pkg/repstream/streampb/empty.go b/pkg/repstream/streampb/empty.go index f79d1b76fdd9..fe77fb69a63a 100644 --- a/pkg/repstream/streampb/empty.go +++ b/pkg/repstream/streampb/empty.go @@ -15,6 +15,7 @@ import ( time "time" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type DebugProducerStatus struct { @@ -129,23 +130,58 @@ type DebugLogicalConsumerStatus struct { } } +type LogicalConsumerState int + +const ( + Other LogicalConsumerState = iota + Flushing + Waiting +) + +func (s LogicalConsumerState) String() string { + switch s { + case Other: + return "other" + case Flushing: + return "flush" + case Waiting: + return "receive" + default: + return "unknown" + } +} + type DebugLogicalConsumerStats struct { Recv struct { + CurReceiveStart time.Time LastWaitNanos, TotalWaitNanos int64 } + Ingest struct { + // TotalIngestNanos is the total time spent not waiting for data. + CurIngestStart time.Time + TotalIngestNanos int64 + } + Flushes struct { Count, Nanos, KVs, Bytes, Batches int64 - Current struct { - StartedUnixMicros, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64 + Last struct { + CurFlushStart time.Time + LastFlushNanos, ProcessedKVs, TotalKVs, Batches, SlowestBatchNanos int64 // TODO(dt): BatchErrors atomic.Int64 // TODO(dt): LastBatchErr atomic.Value } - Last struct { - Nanos, KVs, Bytes, Batches, SlowestBatchNanos int64 - // TODO(dt): Errors atomic.Int64 - } + } + Checkpoints struct { + Count int64 + LastCheckpoint, Resolved time.Time + } + + CurrentState LogicalConsumerState + + Purgatory struct { + CurrentCount int64 } } @@ -154,62 +190,86 @@ func (d *DebugLogicalConsumerStatus) GetStats() DebugLogicalConsumerStats { defer d.mu.Unlock() return d.mu.stats } +func (d *DebugLogicalConsumerStatus) RecordRecvStart() { + d.mu.Lock() + defer d.mu.Unlock() + d.mu.stats.Recv.CurReceiveStart = timeutil.Now() + d.mu.stats.CurrentState = Waiting + if !d.mu.stats.Ingest.CurIngestStart.IsZero() { + d.mu.stats.Ingest.TotalIngestNanos += timeutil.Since(d.mu.stats.Ingest.CurIngestStart).Nanoseconds() + } + d.mu.stats.Recv.LastWaitNanos = 0 + d.mu.stats.Ingest.CurIngestStart = time.Time{} +} -func (d *DebugLogicalConsumerStatus) RecordRecv(wait time.Duration) { - nanos := wait.Nanoseconds() +func (d *DebugLogicalConsumerStatus) RecordRecv() { d.mu.Lock() + defer d.mu.Unlock() + nanos := timeutil.Since(d.mu.stats.Recv.CurReceiveStart).Nanoseconds() + d.mu.stats.CurrentState = Other d.mu.stats.Recv.LastWaitNanos = nanos d.mu.stats.Recv.TotalWaitNanos += nanos - d.mu.Unlock() + + d.mu.stats.Ingest.CurIngestStart = timeutil.Now() + d.mu.stats.Recv.CurReceiveStart = time.Time{} } func (d *DebugLogicalConsumerStatus) SetInjectedFailurePercent(percent uint32) { d.mu.Lock() d.mu.injectFailurePercent = percent - d.mu.Unlock() + d.mu.Unlock() // nolint:deferunlockcheck } func (d *DebugLogicalConsumerStatus) RecordFlushStart(start time.Time, keyCount int64) uint32 { - micros := start.UnixMicro() d.mu.Lock() - d.mu.stats.Flushes.Current.TotalKVs = keyCount - d.mu.stats.Flushes.Current.StartedUnixMicros = micros + defer d.mu.Unlock() + // Reset the incremental flush stats. + d.mu.stats.Flushes.Last.SlowestBatchNanos = 0 + d.mu.stats.Flushes.Last.Batches = 0 + d.mu.stats.Flushes.Last.ProcessedKVs = 0 + d.mu.stats.Flushes.Last.LastFlushNanos = 0 + + d.mu.stats.CurrentState = Flushing + d.mu.stats.Flushes.Last.CurFlushStart = start + d.mu.stats.Flushes.Last.TotalKVs = keyCount failPercent := d.mu.injectFailurePercent - d.mu.Unlock() return failPercent } func (d *DebugLogicalConsumerStatus) RecordBatchApplied(t time.Duration, keyCount int64) { nanos := t.Nanoseconds() d.mu.Lock() - d.mu.stats.Flushes.Current.Batches++ - d.mu.stats.Flushes.Current.ProcessedKVs += keyCount - if d.mu.stats.Flushes.Current.SlowestBatchNanos < nanos { // nolint:deferunlockcheck - d.mu.stats.Flushes.Current.SlowestBatchNanos = nanos + d.mu.stats.Flushes.Last.Batches++ + d.mu.stats.Flushes.Last.ProcessedKVs += keyCount + if d.mu.stats.Flushes.Last.SlowestBatchNanos < nanos { // nolint:deferunlockcheck + d.mu.stats.Flushes.Last.SlowestBatchNanos = nanos } d.mu.Unlock() // nolint:deferunlockcheck } func (d *DebugLogicalConsumerStatus) RecordFlushComplete(totalNanos, keyCount, byteSize int64) { d.mu.Lock() - + defer d.mu.Unlock() + d.mu.stats.CurrentState = Other d.mu.stats.Flushes.Count++ d.mu.stats.Flushes.Nanos += totalNanos d.mu.stats.Flushes.KVs += keyCount d.mu.stats.Flushes.Bytes += byteSize - d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Current.Batches - - d.mu.stats.Flushes.Last.Nanos = totalNanos - d.mu.stats.Flushes.Last.Batches = d.mu.stats.Flushes.Current.Batches - d.mu.stats.Flushes.Last.SlowestBatchNanos = d.mu.stats.Flushes.Current.SlowestBatchNanos - d.mu.stats.Flushes.Last.KVs = keyCount - d.mu.stats.Flushes.Last.Bytes = byteSize + d.mu.stats.Flushes.Batches += d.mu.stats.Flushes.Last.Batches + d.mu.stats.Flushes.Last.LastFlushNanos = totalNanos + d.mu.stats.Flushes.Last.CurFlushStart = time.Time{} +} - d.mu.stats.Flushes.Current.StartedUnixMicros = 0 - d.mu.stats.Flushes.Current.SlowestBatchNanos = 0 - d.mu.stats.Flushes.Current.Batches = 0 - d.mu.stats.Flushes.Current.TotalKVs = 0 - d.mu.stats.Flushes.Current.ProcessedKVs = 0 +func (d *DebugLogicalConsumerStatus) RecordCheckpoint(resolved time.Time) { + d.mu.Lock() + d.mu.stats.Checkpoints.Count++ + d.mu.stats.Checkpoints.Resolved = resolved + d.mu.stats.Checkpoints.LastCheckpoint = timeutil.Now() // nolint:deferunlockcheck + d.mu.Unlock() +} - d.mu.Unlock() // nolint:deferunlockcheck +func (d *DebugLogicalConsumerStatus) RecordPurgatory(netEvents int64) { + d.mu.Lock() + d.mu.stats.Purgatory.CurrentCount += netEvents + d.mu.Unlock() } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 2962c7f8de28..0725fa0e94dd 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiespb" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -9384,22 +9385,24 @@ var crdbInternalLDRProcessorTable = virtualSchemaTable{ CREATE TABLE crdb_internal.logical_replication_node_processors ( stream_id INT, consumer STRING, - recv_wait INTERVAL, - last_recv_wait INTERVAL, - flush_count INT, + state STRING, + recv_time INTERVAL, + last_recv_time INTERVAL, + ingest_time INTERVAL, flush_time INTERVAL, + flush_count INT, flush_kvs INT, flush_bytes INT, flush_batches INT, - last_time INTERVAL, - last_kvs INT, - last_bytes INT, + last_flush_time INTERVAL, + last_kvs_done INT, + last_kvs_todo INT, + last_batches INT, last_slowest INTERVAL, - cur_time INTERVAL, - cur_kvs_done INT, - cur_kvs_todo INT, - cur_batches INT, - cur_slowest INTERVAL + last_checkpoint INTERVAL, + checkpoints INT, + retry_size INT, + resolved_age INTERVAL );`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { sm, err := p.EvalContext().StreamManagerFactory.GetReplicationStreamManager(ctx) @@ -9411,43 +9414,51 @@ CREATE TABLE crdb_internal.logical_replication_node_processors ( return err } now := p.EvalContext().GetStmtTimestamp() + dur := func(nanos int64) tree.Datum { + return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata) + } + nanosSince := func(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return now.Sub(t).Nanoseconds() + } age := func(t time.Time) tree.Datum { if t.Unix() == 0 { return tree.DNull } return tree.NewDInterval(duration.Age(now, t), types.DefaultIntervalTypeMetadata) } - dur := func(nanos int64) tree.Datum { - return tree.NewDInterval(duration.MakeDuration(nanos, 0, 0), types.DefaultIntervalTypeMetadata) - } for _, container := range sm.DebugGetLogicalConsumerStatuses(ctx) { status := container.GetStats() - nullCur := func(x tree.Datum) tree.Datum { - if status.Flushes.Current.StartedUnixMicros == 0 { - return tree.DNull + curOrLast := func(currentNanos int64, lastNanos int64, currentState streampb.LogicalConsumerState) tree.Datum { + if status.CurrentState == currentState { + return dur(currentNanos) } - return x + return dur(lastNanos) } if err := addRow( tree.NewDInt(tree.DInt(container.StreamID)), tree.NewDString(fmt.Sprintf("%d[%d]", p.extendedEvalCtx.ExecCfg.JobRegistry.ID(), container.ProcessorID)), - dur(status.Recv.TotalWaitNanos), - dur(status.Recv.LastWaitNanos), - tree.NewDInt(tree.DInt(status.Flushes.Count)), - dur(status.Flushes.Nanos), - tree.NewDInt(tree.DInt(status.Flushes.KVs)), - tree.NewDInt(tree.DInt(status.Flushes.Bytes)), - tree.NewDInt(tree.DInt(status.Flushes.Batches)), - dur(status.Flushes.Last.Nanos), - tree.NewDInt(tree.DInt(status.Flushes.Last.KVs)), - tree.NewDInt(tree.DInt(status.Flushes.Last.Bytes)), - dur(status.Flushes.Last.SlowestBatchNanos), - nullCur(age(time.UnixMicro(status.Flushes.Current.StartedUnixMicros))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.TotalKVs))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.ProcessedKVs))), - nullCur(tree.NewDInt(tree.DInt(status.Flushes.Current.Batches))), - nullCur(dur(status.Flushes.Current.SlowestBatchNanos)), + tree.NewDString(status.CurrentState.String()), // current_state + dur(status.Recv.TotalWaitNanos+nanosSince(status.Recv.CurReceiveStart)), // recv_time + curOrLast(nanosSince(status.Recv.CurReceiveStart), status.Recv.LastWaitNanos, streampb.Waiting), // last_recv_time + dur(status.Ingest.TotalIngestNanos+nanosSince(status.Ingest.CurIngestStart)), // ingest_time + dur(status.Flushes.Nanos+nanosSince(status.Flushes.Last.CurFlushStart)), // flush_time + tree.NewDInt(tree.DInt(status.Flushes.Count)), // flush_count + tree.NewDInt(tree.DInt(status.Flushes.KVs)), // flush_kvs + tree.NewDInt(tree.DInt(status.Flushes.Bytes)), // flush_bytes + tree.NewDInt(tree.DInt(status.Flushes.Batches)), // flush_batches + curOrLast(nanosSince(status.Flushes.Last.CurFlushStart), status.Flushes.Last.LastFlushNanos, streampb.Flushing), // last_flush_time + tree.NewDInt(tree.DInt(status.Flushes.Last.TotalKVs)), // last_kvs_done + tree.NewDInt(tree.DInt(status.Flushes.Last.ProcessedKVs)), // last_kvs_todo + tree.NewDInt(tree.DInt(status.Flushes.Last.Batches)), // last_batches + dur(status.Flushes.Last.SlowestBatchNanos), // last_slowest + age(status.Checkpoints.LastCheckpoint), // last_checkpoint + tree.NewDInt(tree.DInt(status.Checkpoints.Count)), // checkpoints + tree.NewDInt(tree.DInt(status.Purgatory.CurrentCount)), // retry_size + age(status.Checkpoints.Resolved), // resolved_age ); err != nil { return err } diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index d1ba7e37c1a1..9c51c1b13703 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -400,7 +400,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967185 {"table": {"columns": [{"id": 1, "name": "grantee", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "role_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "is_grantable", "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967185, "name": "administrable_role_authorizations", "nextColumnId": 4, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967186, "version": "1"}} 4294967186 {"schema": {"defaultPrivileges": {"type": "SCHEMA"}, "id": 4294967186, "name": "information_schema", "privileges": {"ownerProto": "node", "users": [{"privileges": "512", "userProto": "public"}], "version": 3}, "version": "1"}} 4294967187 {"table": {"columns": [{"id": 1, "name": "object_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "schema_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "database_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "object_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "schema_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 6, "name": "database_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "fq_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967187, "name": "fully_qualified_names", "nextColumnId": 8, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "SELECT t.id, sc.id, db.id, t.name, sc.name, db.name, (((quote_ident(db.name) || '.') || quote_ident(sc.name)) || '.') || quote_ident(t.name) FROM system.namespace AS t JOIN system.namespace AS sc ON t.\"parentSchemaID\" = sc.id JOIN system.namespace AS db ON t.\"parentID\" = db.id WHERE (db.\"parentID\" = 0) AND pg_catalog.has_database_privilege(db.name, 'CONNECT')"}} -4294967188 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "recv_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 4, "name": "last_recv_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 5, "name": "flush_count", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 7, "name": "flush_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "flush_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "flush_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "last_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "last_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "last_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "cur_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 15, "name": "cur_kvs_done", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "cur_kvs_todo", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 17, "name": "cur_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 18, "name": "cur_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967188, "name": "logical_replication_node_processors", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967188 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "state", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 5, "name": "last_recv_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 6, "name": "ingest_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 7, "name": "flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 8, "name": "flush_count", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "flush_kvs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "flush_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 11, "name": "flush_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "last_flush_time", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_kvs_done", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_kvs_todo", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "last_batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "last_slowest", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 18, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 19, "name": "retry_size", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 20, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967188, "name": "logical_replication_node_processors", "nextColumnId": 21, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967189 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967189, "name": "cluster_replication_node_stream_checkpoints", "nextColumnId": 7, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967190 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "span_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "span_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967190, "name": "cluster_replication_node_stream_spans", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967191 {"table": {"columns": [{"id": 1, "name": "stream_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "consumer", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "spans", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "initial_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 5, "name": "prev_ts", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 6, "name": "batches", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "megabytes", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 9, "name": "last_checkpoint", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 10, "name": "produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 11, "name": "emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 12, "name": "last_produce_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 13, "name": "last_emit_wait", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 14, "name": "rf_checkpoints", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 15, "name": "rf_advances", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 16, "name": "rf_last_advance", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}, {"id": 17, "name": "rf_resolved", "nullable": true, "type": {"family": "DecimalFamily", "oid": 1700}}, {"id": 18, "name": "rf_resolved_age", "nullable": true, "type": {"family": "IntervalFamily", "intervalDurationField": {}, "oid": 1186}}], "formatVersion": 3, "id": 4294967191, "name": "cluster_replication_node_streams", "nextColumnId": 19, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 3}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}