diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index cdb8ee19a6a..7b11072b161 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "strconv" "strings" "sync" "time" @@ -20,6 +21,7 @@ import ( capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" "github.com/smartcontractkit/chainlink-common/pkg/services" coretypes "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" @@ -75,8 +77,9 @@ var ( var _ capabilities.ActionCapability = (*Compute)(nil) type Compute struct { - stopCh services.StopChan - log logger.Logger + stopCh services.StopChan + log logger.Logger + metrics *computeMetricsLabeler // emitter is used to emit messages from the WASM module to a configured collector. emitter custmsg.MessageEmitter @@ -328,6 +331,13 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq return nil, fmt.Errorf("failed to unmarshal fetch response: %w", err) } + c.metrics.with( + "status", strconv.FormatUint(uint64(response.StatusCode), 10), + platform.KeyWorkflowID, req.Metadata.WorkflowId, + platform.KeyWorkflowName, req.Metadata.WorkflowName, + platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, + ).incrementHTTPRequestCounter(ctx) + // Only log if the response is not in the 200 range if response.StatusCode < http.StatusOK || response.StatusCode >= http.StatusMultipleChoices { msg := fmt.Sprintf("compute fetch request failed with status code %d", response.StatusCode) @@ -357,10 +367,14 @@ func NewAction( handler *webapi.OutgoingConnectorHandler, idGenerator func() string, opts ...func(*Compute), -) *Compute { +) (*Compute, error) { if config.NumWorkers == 0 { config.NumWorkers = defaultNumWorkers } + metricsLabeler, err := newComputeMetricsLabeler(metrics.NewLabeler().With("capability", CapabilityIDCompute)) + if err != nil { + return nil, fmt.Errorf("failed to create compute metrics labeler: %w", err) + } var ( lggr = logger.Named(log, "CustomCompute") labeler = custmsg.NewLabeler() @@ -368,6 +382,7 @@ func NewAction( stopCh: make(services.StopChan), log: lggr, emitter: labeler, + metrics: metricsLabeler, registry: registry, modules: newModuleCache(clockwork.NewRealClock(), 1*time.Minute, 10*time.Minute, 3), transformer: NewTransformer(lggr, labeler), @@ -382,5 +397,5 @@ func NewAction( opt(compute) } - return compute + return compute, nil } diff --git a/core/capabilities/compute/compute_test.go b/core/capabilities/compute/compute_test.go index e0f5a2f9750..c4146b7408e 100644 --- a/core/capabilities/compute/compute_test.go +++ b/core/capabilities/compute/compute_test.go @@ -18,6 +18,7 @@ import ( cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/values" + corecapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" @@ -60,7 +61,8 @@ func setup(t *testing.T, config Config) testHarness { connectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, config.ServiceConfig, ghcapabilities.MethodComputeAction, log) require.NoError(t, err) - compute := NewAction(config, log, registry, connectorHandler, idGeneratorFn) + compute, err := NewAction(config, log, registry, connectorHandler, idGeneratorFn) + require.NoError(t, err) compute.modules.clock = clockwork.NewFakeClock() return testHarness{ diff --git a/core/capabilities/compute/monitoring.go b/core/capabilities/compute/monitoring.go index 4b676c25f7d..71354c014a7 100644 --- a/core/capabilities/compute/monitoring.go +++ b/core/capabilities/compute/monitoring.go @@ -1,3 +1,38 @@ package compute +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" + + localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring" +) + const timestampKey = "computeTimestamp" + +type computeMetricsLabeler struct { + metrics.Labeler + computeHTTPRequestCounter metric.Int64Counter +} + +func newComputeMetricsLabeler(l metrics.Labeler) (*computeMetricsLabeler, error) { + computeHTTPRequestCounter, err := beholder.GetMeter().Int64Counter("capabilities_compute_http_request_count") + if err != nil { + return nil, fmt.Errorf("failed to register compute http request counter: %w", err) + } + + return &computeMetricsLabeler{Labeler: l, computeHTTPRequestCounter: computeHTTPRequestCounter}, nil +} + +func (c *computeMetricsLabeler) with(keyValues ...string) *computeMetricsLabeler { + return &computeMetricsLabeler{c.With(keyValues...), c.computeHTTPRequestCounter} +} + +func (c *computeMetricsLabeler) incrementHTTPRequestCounter(ctx context.Context) { + otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels) + c.computeHTTPRequestCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index ceb69883e90..f4c0f360bf3 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/compute" gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" @@ -253,7 +254,10 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return uuid.New().String() } - computeSrvc := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn) + computeSrvc, err := compute.NewAction(cfg, log, d.registry, handler, idGeneratorFn) + if err != nil { + return nil, err + } return []job.ServiceCtx{handler, computeSrvc}, nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index e6667fe0bc6..3a2bc17bc36 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -1448,7 +1448,8 @@ func TestEngine_WithCustomComputeStep(t *testing.T) { require.NoError(t, err) idGeneratorFn := func() string { return "validRequestID" } - compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + require.NoError(t, err) require.NoError(t, compute.Start(ctx)) defer compute.Close() @@ -1513,7 +1514,8 @@ func TestEngine_CustomComputePropagatesBreaks(t *testing.T) { require.NoError(t, err) idGeneratorFn := func() string { return "validRequestID" } - compute := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + compute, err := compute.NewAction(cfg, log, reg, handler, idGeneratorFn) + require.NoError(t, err) require.NoError(t, compute.Start(ctx)) defer compute.Close()