Skip to content

Commit

Permalink
Add compute metrics (#15246)
Browse files Browse the repository at this point in the history
* Adds compute metrics

* Fixes lint

* Removes execution ID from metrics labels

* Fixes CI

* Fixes CI

* Fixes CI
  • Loading branch information
vyzaldysanchez authored Nov 15, 2024
1 parent a4d3c22 commit cb9c185
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 8 deletions.
23 changes: 19 additions & 4 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,7 @@ import (
capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
"github.com/smartcontractkit/chainlink-common/pkg/services"
coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
Expand Down Expand Up @@ -75,8 +77,9 @@ var (
var _ capabilities.ActionCapability = (*Compute)(nil)

type Compute struct {
stopCh services.StopChan
log logger.Logger
stopCh services.StopChan
log logger.Logger
metrics *computeMetricsLabeler

// emitter is used to emit messages from the WASM module to a configured collector.
emitter custmsg.MessageEmitter
Expand Down Expand Up @@ -328,6 +331,13 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err)
}

c.metrics.with(
"status", strconv.FormatUint(uint64(response.StatusCode), 10),
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
).incrementHTTPRequestCounter(ctx)

// Only log if the response is not in the 200 range
if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices {
msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode)
Expand Down Expand Up @@ -357,17 +367,22 @@ func NewAction(
handler *webapi.OutgoingConnectorHandler,
idGenerator func() string,
opts ...func(*Compute),
) *Compute {
) (*Compute, error) {
if config.NumWorkers == 0 {
config.NumWorkers = defaultNumWorkers
}
metricsLabeler, err := newComputeMetricsLabeler(metrics.NewLabeler().With("capability", CapabilityIDCompute))
if err != nil {
return nil, fmt.Errorf("failed to create compute metrics labeler: %w", err)
}
var (
lggr = logger.Named(log, "CustomCompute")
labeler = custmsg.NewLabeler()
compute = &Compute{
stopCh: make(services.StopChan),
log: lggr,
emitter: labeler,
metrics: metricsLabeler,
registry: registry,
modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3),
transformer: NewTransformer(lggr, labeler),
Expand All @@ -382,5 +397,5 @@ func NewAction(
opt(compute)
}

return compute
return compute, nil
}
4 changes: 3 additions & 1 deletion core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"

corecapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
Expand Down Expand Up @@ -60,7 +61,8 @@ func setup(t *testing.T, config Config) testHarness {
connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
compute, err := NewAction(config, log, registry, connectorHandler, idGeneratorFn)
require.NoError(t, err)
compute.modules.clock = clockwork.NewFakeClock()

return testHarness{
Expand Down
35 changes: 35 additions & 0 deletions core/capabilities/compute/monitoring.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,38 @@
package compute

import (
"context"
"fmt"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"

localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

const timestampKey = "computeTimestamp"

type computeMetricsLabeler struct {
metrics.Labeler
computeHTTPRequestCounter metric.Int64Counter
}

func newComputeMetricsLabeler(l metrics.Labeler) (*computeMetricsLabeler, error) {
computeHTTPRequestCounter, err := beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count")
if err != nil {
return nil, fmt.Errorf("failed to register compute http request counter: %w", err)
}

return &computeMetricsLabeler{Labeler: l, computeHTTPRequestCounter: computeHTTPRequestCounter}, nil
}

func (c *computeMetricsLabeler) with(keyValues ...string) *computeMetricsLabeler {
return &computeMetricsLabeler{c.With(keyValues...), c.computeHTTPRequestCounter}
}

func (c *computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
c.computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
6 changes: 5 additions & 1 deletion core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/compute"
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
Expand Down Expand Up @@ -253,7 +254,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
return uuid.New().String()
}

computeSrvc := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn)
computeSrvc, err := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn)
if err != nil {
return nil, err
}
return []job.ServiceCtx{handler, computeSrvc}, nil
}

Expand Down
6 changes: 4 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,8 @@ func TestEngine_WithCustomComputeStep(t *testing.T) {
require.NoError(t, err)

idGeneratorFn := func() string { return "validRequestID" }
compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
require.NoError(t, err)
require.NoError(t, compute.Start(ctx))
defer compute.Close()

Expand Down Expand Up @@ -1513,7 +1514,8 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) {
require.NoError(t, err)

idGeneratorFn := func() string { return "validRequestID" }
compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn)
require.NoError(t, err)
require.NoError(t, compute.Start(ctx))
defer compute.Close()

Expand Down

0 comments on commit cb9c185

Please sign in to comment.