Skip to content

Commit

Permalink
Merge #131396
Browse files Browse the repository at this point in the history
131396: log/eventpb: add job ID to all changefeed structured log events r=kyle-a-wong,rharding6373 a=andyyang890

**log/eventpb: move changefeed structured log events to separate category** 

Release note: None

----

**log/eventpb: add job ID to all changefeed structured log events** 

Release note (ops change): `changefeed_failed` and `create_changefeed`
log events now include a `JobId` field for enterprise changefeeds.

----

Informs #121697


Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
craig[bot] and andyyang890 committed Sep 26, 2024
2 parents b56a7f6 + 440efa2 commit 83f4ec6
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 137 deletions.
3 changes: 2 additions & 1 deletion build/GNUmakefile.obsolete
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,8 @@ EVENTPB_PROTOS = \
pkg/util/log/eventpb/job_events.proto \
pkg/util/log/eventpb/health_events.proto \
pkg/util/log/eventpb/storage_events.proto \
pkg/util/log/eventpb/telemetry.proto
pkg/util/log/eventpb/telemetry.proto \
pkg/util/log/eventpb/changefeed_events.proto

EVENTLOG_PROTOS = pkg/util/log/logpb/event.proto $(EVENTPB_PROTOS)

Expand Down
149 changes: 79 additions & 70 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,85 @@ provided the `redactable` functionality is enabled on the logging sink.

Events not documented on this page will have an unstructured format in log messages.

## Changefeed telemetry events

Events in this category pertain to changefeed usage and metrics.

Events in this category are logged to the `TELEMETRY` channel.


### `changefeed_emitted_bytes`

An event of type `changefeed_emitted_bytes` is an event representing the bytes emitted by a changefeed over an interval.


| Field | Description | Sensitive |
|--|--|--|
| `EmittedBytes` | The number of bytes emitted. | no |
| `EmittedMessages` | The number of messages emitted. | no |
| `LoggingInterval` | The time period in nanoseconds between emitting telemetry events of this type (per-aggregator). | no |
| `Closing` | Flag to indicate that the changefeed is closing. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |
| `JobId` | The job id for enterprise changefeeds. | no |

### `changefeed_failed`

An event of type `changefeed_failed` is an event for any changefeed failure since the plan hook
was triggered.


| Field | Description | Sensitive |
|--|--|--|
| `FailureType` | The reason / environment with which the changefeed failed (ex: connection_closed, changefeed_behind). | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |
| `JobId` | The job id for enterprise changefeeds. | no |

### `create_changefeed`

An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query that
successfully starts running. Failed CREATE statements will show up as
ChangefeedFailed events.


| Field | Description | Sensitive |
|--|--|--|
| `Transformation` | Flag representing whether the changefeed is using CDC queries. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |
| `JobId` | The job id for enterprise changefeeds. | no |

## Cluster-level events

Events in this category pertain to an entire cluster and are
Expand Down Expand Up @@ -2891,76 +2970,6 @@ An event of type `captured_index_usage_stats`
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

### `changefeed_emitted_bytes`

An event of type `changefeed_emitted_bytes` is an event representing the bytes emitted by a changefeed over an interval.


| Field | Description | Sensitive |
|--|--|--|
| `JobId` | The job id for enterprise changefeeds. | no |
| `EmittedBytes` | The number of bytes emitted. | no |
| `EmittedMessages` | The number of messages emitted. | no |
| `LoggingInterval` | The time period in nanoseconds between emitting telemetry events of this type (per-aggregator). | no |
| `Closing` | Flag to indicate that the changefeed is closing. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |

### `changefeed_failed`

An event of type `changefeed_failed` is an event for any Changefeed failure since the plan hook
was triggered.


| Field | Description | Sensitive |
|--|--|--|
| `FailureType` | The reason / environment with which the changefeed failed (ex: connection_closed, changefeed_behind) | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |

### `create_changefeed`

An event of type `create_changefeed` is an event for any CREATE CHANGEFEED query that
successfully starts running. Failed CREATE statements will show up as
ChangefeedFailed events.


| Field | Description | Sensitive |
|--|--|--|
| `Transformation` | | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Description` | The description of that would show up in the job's description field, redacted | yes |
| `SinkType` | The type of sink being emitted to (ex: kafka, nodelocal, webhook-https). | no |
| `NumTables` | The number of tables listed in the query that the changefeed is to run on. | no |
| `Resolved` | The behavior of emitted resolved spans (ex: yes, no, 10s) | no |
| `InitialScan` | The desired behavior of initial scans (ex: yes, no, only) | no |
| `Format` | The data format being emitted (ex: JSON, Avro). | no |

### `hot_ranges_stats`

An event of type `hot_ranges_stats`
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func changefeedPlanHook(
// of protected timestamps.
var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
jr.JobID = jobID
{
var ptr *ptpb.Record
codec := p.ExecCfg().Codec
Expand All @@ -263,7 +264,7 @@ func changefeedPlanHook(
// must have specified transaction to use, and is responsible for committing
// transaction.

_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, *jr, jobID, p.InternalSQLTxn())
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, *jr, jr.JobID, p.InternalSQLTxn())
if err != nil {
return err
}
Expand All @@ -286,7 +287,7 @@ func changefeedPlanHook(
}

if err := p.ExecCfg().InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, *jr); err != nil {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jr.JobID, txn, *jr); err != nil {
return err
}
if ptr != nil {
Expand Down Expand Up @@ -1528,7 +1529,7 @@ func logChangefeedCreateTelemetry(ctx context.Context, jr *jobs.Record, isTransf
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if jr != nil {
changefeedDetails := jr.Details.(jobspb.ChangefeedDetails)
changefeedEventDetails = getCommonChangefeedEventDetails(ctx, changefeedDetails, jr.Description)
changefeedEventDetails = makeCommonChangefeedEventDetails(ctx, changefeedDetails, jr.Description, jr.JobID)
}

createChangefeedEvent := &eventpb.CreateChangefeed{
Expand All @@ -1545,7 +1546,7 @@ func logChangefeedFailedTelemetry(
var changefeedEventDetails eventpb.CommonChangefeedEventDetails
if job != nil {
changefeedDetails := job.Details().(jobspb.ChangefeedDetails)
changefeedEventDetails = getCommonChangefeedEventDetails(ctx, changefeedDetails, job.Payload().Description)
changefeedEventDetails = makeCommonChangefeedEventDetails(ctx, changefeedDetails, job.Payload().Description, job.ID())
}

changefeedFailedEvent := &eventpb.ChangefeedFailed{
Expand All @@ -1556,8 +1557,8 @@ func logChangefeedFailedTelemetry(
log.StructuredEvent(ctx, severity.INFO, changefeedFailedEvent)
}

func getCommonChangefeedEventDetails(
ctx context.Context, details jobspb.ChangefeedDetails, description string,
func makeCommonChangefeedEventDetails(
ctx context.Context, details jobspb.ChangefeedDetails, description string, jobID jobspb.JobID,
) eventpb.CommonChangefeedEventDetails {
opts := details.Opts

Expand Down Expand Up @@ -1602,6 +1603,7 @@ func getCommonChangefeedEventDetails(
Resolved: resolved,
Format: opts[changefeedbase.OptFormat],
InitialScan: initialScan,
JobId: int64(jobID),
}

return changefeedEventDetails
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func makePeriodicTelemetryLogger(
return &periodicTelemetryLogger{
ctx: ctx,
job: job,
changefeedDetails: getCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description),
changefeedDetails: makeCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description, job.ID()),
sinkTelemetryData: sinkTelemetryData{},
settings: s,
}, nil
Expand Down Expand Up @@ -112,7 +112,6 @@ func (ptl *periodicTelemetryLogger) maybeFlushLogs() {

continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{
CommonChangefeedEventDetails: ptl.changefeedDetails,
JobId: int64(ptl.job.ID()),
EmittedBytes: ptl.resetEmittedBytes(),
EmittedMessages: ptl.resetEmittedMessages(),
LoggingInterval: loggingInterval,
Expand All @@ -128,7 +127,6 @@ func (ptl *periodicTelemetryLogger) close() {

continuousTelemetryEvent := &eventpb.ChangefeedEmittedBytes{
CommonChangefeedEventDetails: ptl.changefeedDetails,
JobId: int64(ptl.job.ID()),
EmittedBytes: ptl.resetEmittedBytes(),
EmittedMessages: ptl.resetEmittedMessages(),
LoggingInterval: loggingInterval,
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/eventpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_test(
proto_library(
name = "eventpb_proto",
srcs = [
"changefeed_events.proto",
"cluster_events.proto",
"ddl_events.proto",
"debug_events.proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/util/log/eventpb/PROTOS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ EVENTPB_PROTOS = [
"health_events.proto",
"storage_events.proto",
"telemetry.proto",
"changefeed_events.proto",
]

EVENTPB_PROTO_DEPS = [ "//pkg/util/log/logpb:event.proto", ] + EVENTPB_PROTOS
Expand Down
61 changes: 61 additions & 0 deletions pkg/util/log/eventpb/changefeed_events.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

syntax = "proto3";
package cockroach.util.log.eventpb;
option go_package = "github.com/cockroachdb/cockroach/pkg/util/log/eventpb";

import "gogoproto/gogo.proto";
import "util/log/eventpb/events.proto";
import "util/log/logpb/event.proto";

// Category: Changefeed telemetry events
// Channel: TELEMETRY
//
// Events in this category pertain to changefeed usage and metrics.

// CreateChangefeed is an event for any CREATE CHANGEFEED query that
// successfully starts running. Failed CREATE statements will show up as
// ChangefeedFailed events.
message CreateChangefeed {
CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];

// Flag representing whether the changefeed is using CDC queries.
bool transformation = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
}

// ChangefeedFailed is an event for any changefeed failure since the plan hook
// was triggered.
message ChangefeedFailed {
CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];

// The reason / environment with which the changefeed failed
// (ex: connection_closed, changefeed_behind).
string failure_type = 2 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
}

// ChangefeedEmittedBytes is an event representing the bytes emitted by a changefeed over an interval.
message ChangefeedEmittedBytes {
CommonChangefeedEventDetails common = 1 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "", (gogoproto.embed) = true];

reserved 2;

// The number of bytes emitted.
int64 emitted_bytes = 3 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];

// The number of messages emitted.
int64 emitted_messages = 6 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];

// The time period in nanoseconds between emitting telemetry events of this type (per-aggregator).
int64 logging_interval = 4 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];

// Flag to indicate that the changefeed is closing.
bool closing = 5 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];
}
18 changes: 9 additions & 9 deletions pkg/util/log/eventpb/eventlog_channels_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/util/log/eventpb/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,7 @@ message CommonChangefeedEventDetails {

// The data format being emitted (ex: JSON, Avro).
string format = 7 [(gogoproto.jsontag) = ",omitempty", (gogoproto.moretags) = "redact:\"nonsensitive\""];

// The job id for enterprise changefeeds.
int64 job_id = 8 [(gogoproto.jsontag) = ",omitempty"];
}
Loading

0 comments on commit 83f4ec6

Please sign in to comment.