Skip to content

Commit

Permalink
feat: Expose invocation ID in OpenTelemetry metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
disq committed Nov 19, 2024
1 parent 7076899 commit 286a892
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 10 deletions.
9 changes: 5 additions & 4 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *Metrics) Equal(other *Metrics) bool {
return true
}

func getOtelMeters(tableName string, clientID string) *OtelMeters {
func getOtelMeters(tableName, clientID, invocationID string) *OtelMeters {
resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources",
metric.WithDescription("Number of resources synced for a table"),
metric.WithUnit("/{tot}"),
Expand Down Expand Up @@ -136,21 +136,22 @@ func getOtelMeters(tableName string, clientID string) *OtelMeters {
attributes: []attribute.KeyValue{
attribute.Key("sync.client.id").String(clientID),
attribute.Key("sync.table.name").String(tableName),
attribute.Key("sync.invocation.id").String(invocationID),
},
}
}

func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) {
func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta, invocationID string) {
s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients))
for _, client := range clients {
tableName := table.Name
clientID := client.ID()
s.TableClient[tableName][clientID] = &TableClientMetrics{
otelMeters: getOtelMeters(tableName, clientID),
otelMeters: getOtelMeters(tableName, clientID, invocationID),
}
}
for _, relation := range table.Relations {
s.InitWithClients(relation, clients)
s.InitWithClients(relation, clients, invocationID)
}
}

Expand Down
2 changes: 1 addition & 1 deletion scheduler/queue/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestScheduler(t *testing.T) {
}

for _, tc := range tableClients {
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client})
m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}, scheduler.invocationID)
}

resolvedResources := make(chan *schema.Resource)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR
preInitialisedClients[i] = clients
// we do this here to avoid locks so we initialize the metrics structure once in the main goroutine
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
s.metrics.InitWithClients(table, clients)
s.metrics.InitWithClients(table, clients, s.invocationID)
}

// First interleave the tables like in round-robin
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche
preInitialisedClients[i] = clients
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
s.metrics.InitWithClients(table, clients)
s.metrics.InitWithClients(table, clients, s.invocationID)
}

tableClients := make([]tableClient, 0)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan<
preInitialisedClients[i] = clients
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
s.metrics.InitWithClients(table, clients)
s.metrics.InitWithClients(table, clients, s.invocationID)
}

tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- *
preInitialisedClients[i] = clients
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
s.metrics.InitWithClients(table, clients)
s.metrics.InitWithClients(table, clients, s.invocationID)
}

// First interleave the tables like in round-robin
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduler_shuffle_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
preInitialisedClients[i] = clients
// we do this here to avoid locks so we initial the metrics structure once in the main goroutines
// and then we can just read from it in the other goroutines concurrently given we are not writing to it.
s.metrics.InitWithClients(table, clients)
s.metrics.InitWithClients(table, clients, s.invocationID)
}

tableClients := roundRobinInterleave(s.tables, preInitialisedClients)
Expand Down

0 comments on commit 286a892

Please sign in to comment.