Skip to content

Commit

Permalink
feat: decode workflow name on the engine
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Jan 8, 2025
1 parent 1f831d9 commit 0d3e06d
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 45 deletions.
13 changes: 1 addition & 12 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package compute
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -299,19 +298,9 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, err := hex.DecodeString(req.Metadata.WorkflowName)
if err != nil {
c.log.Errorf("failed to decode WorkflowName %q: %v", req.Metadata.WorkflowName, err)
workflowName = req.Metadata.WorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

cma := c.emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowName, req.Metadata.DecodedWorkflowName,

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Tests (ccip-deployment)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Tests (core)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 303 in core/capabilities/compute/compute.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

req.Metadata.DecodedWorkflowName undefined (type *"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb".FetchRequestMetadata has no field or method DecodedWorkflowName)
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
12 changes: 1 addition & 11 deletions core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,10 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap
case commontypes.Failed, commontypes.Fatal:
cap.lggr.Error("Transaction failed", "request", request, "transaction", txID)

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, err := hex.DecodeString(request.Metadata.WorkflowName)
if err != nil {
cap.lggr.Errorf("failed to decode WorkflowName %q: %v", request.Metadata.WorkflowName, err)
workflowName = request.Metadata.WorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

msg := "failed to submit transaction with ID: " + txID.String()
err = cap.emitter.With(
platform.KeyWorkflowID, request.Metadata.WorkflowID,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowName, request.Metadata.DecodedWorkflowName,

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Tests (ccip-deployment)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Tests (core)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / split-amd64

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / split-arm64

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / GolangCI Lint (.)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName) (typecheck)

Check failure on line 352 in core/capabilities/targets/write_target.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

request.Metadata.DecodedWorkflowName undefined (type capabilities.RequestMetadata has no field or method DecodedWorkflowName)
platform.KeyWorkflowOwner, request.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, request.Metadata.WorkflowExecutionID,
).Emit(ctx, msg)
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser

type noopSecretsFetcher struct{}

func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down
31 changes: 29 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -432,6 +432,12 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig

t.config.Store(tc)

workflowName, err := hex.DecodeString(e.workflow.hexName)
if err != nil {
e.logger.Warnw("failed to decode workflow name: %s, using encoded", err)
workflowName = []byte(e.workflow.hexName)
}

triggerRegRequest := capabilities.TriggerRegistrationRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
Expand All @@ -440,6 +446,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: t.Ref,
DecodedWorkflowName: string(workflowName),

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (ccip-deployment)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (core)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 449 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata
},
Config: t.config.Load(),
TriggerID: triggerID,
Expand Down Expand Up @@ -868,7 +875,13 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.id)
workflowName, err := hex.DecodeString(e.workflow.hexName)
if err != nil {
e.logger.Warnw("failed to decode workflow name: %s, using encoded", err)
workflowName = []byte(e.workflow.hexName)
}

secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, string(workflowName), e.workflow.id)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down Expand Up @@ -953,6 +966,12 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
}
}

workflowName, err := hex.DecodeString(e.workflow.hexName)
if err != nil {
e.logger.Warnw("failed to decode workflow name: %s, using encoded", err)
workflowName = []byte(e.workflow.hexName)
}

tr := capabilities.CapabilityRequest{
Inputs: inputsMap,
Config: config,
Expand All @@ -964,6 +983,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: msg.stepRef,
DecodedWorkflowName: string(workflowName),

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (ccip-deployment)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (core)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 986 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata
},
}

Expand All @@ -981,6 +1001,12 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
}

func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error {
workflowName, err := hex.DecodeString(e.workflow.hexName)
if err != nil {
e.logger.Warnw("failed to decode workflow name: %s, using encoded", err)
workflowName = []byte(e.workflow.hexName)
}

deregRequest := capabilities.TriggerRegistrationRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
Expand All @@ -989,6 +1015,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr
WorkflowName: e.workflow.hexName,
WorkflowOwner: e.workflow.owner,
ReferenceID: t.Ref,
DecodedWorkflowName: string(workflowName),

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (ccip-deployment)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests_integration)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Tests (core)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata

Check failure on line 1018 in core/services/workflows/engine.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

unknown field DecodedWorkflowName in struct literal of type capabilities.RequestMetadata
},
TriggerID: generateTriggerId(e.workflow.id, triggerIdx),
Config: t.config.Load(),
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
14 changes: 2 additions & 12 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (h *eventHandler) refreshSecrets(ctx context.Context, workflowOwner, workfl
return updatedSecrets, nil
}

func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) {
secretsURLHash, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
if err != nil {
// The workflow record was found, but secrets_id was empty.
Expand All @@ -243,21 +243,11 @@ func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflo
msg := fmt.Sprintf("could not refresh secrets: proceeding with stale secrets for workflowID %s: %s", workflowID, innerErr)
h.lggr.Error(msg)

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, innerErr2 := hex.DecodeString(hexWorkflowName)
if innerErr2 != nil {
h.lggr.Errorf("failed to decode WorkflowName %q: %v", hexWorkflowName, innerErr2)
workflowName = hexWorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

logCustMsg(
ctx,
h.emitter.With(
platform.KeyWorkflowID, workflowID,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowName, decodedWorkflowName,
platform.KeyWorkflowOwner, workflowOwner,
),
msg,
Expand Down
13 changes: 8 additions & 5 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ func Test_Handler_SecretsFor(t *testing.T) {
workflowOwner := hex.EncodeToString([]byte("anOwner"))
workflowName := "aName"
workflowID := "anID"
decodedWorkflowName := "decodedName"
encryptionKey, err := workflowkey.New()
require.NoError(t, err)

Expand Down Expand Up @@ -820,7 +821,7 @@ func Test_Handler_SecretsFor(t *testing.T) {
encryptionKey,
)

gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID)
gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID)
require.NoError(t, err)

expectedSecrets := map[string]string{
Expand All @@ -837,6 +838,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) {
workflowOwner := hex.EncodeToString([]byte("anOwner"))
workflowName := "aName"
workflowID := "anID"
decodedWorkflowName := "decodedName"
encryptionKey, err := workflowkey.New()
require.NoError(t, err)

Expand Down Expand Up @@ -881,7 +883,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) {
encryptionKey,
)

gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID)
gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID)
require.NoError(t, err)

expectedSecrets := map[string]string{
Expand All @@ -898,6 +900,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) {
workflowOwner := hex.EncodeToString([]byte("anOwner"))
workflowName := "aName"
workflowID := "anID"
decodedWorkflowName := "decodedName"
encryptionKey, err := workflowkey.New()
require.NoError(t, err)

Expand Down Expand Up @@ -943,7 +946,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) {
encryptionKey,
)

gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID)
gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID)
require.NoError(t, err)

expectedSecrets := map[string]string{
Expand All @@ -955,15 +958,15 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) {
// SecretsFor should still succeed.
fetcher.responseMap[url] = mockFetchResp{}

gotSecrets, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID)
gotSecrets, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID)
require.NoError(t, err)

assert.Equal(t, expectedSecrets, gotSecrets)

// Now advance so that we hit the freshness limit
clock.Advance(48 * time.Hour)

_, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID)
_, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID)
assert.ErrorContains(t, err, "unexpected end of JSON input")
}

Expand Down

0 comments on commit 0d3e06d

Please sign in to comment.