From 0d3e06d087740d736ce6644283e443964c47b7a6 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Wed, 8 Jan 2025 16:38:15 +0100 Subject: [PATCH] feat: decode workflow name on the engine --- core/capabilities/compute/compute.go | 13 +------- core/capabilities/targets/write_target.go | 12 +------ core/services/workflows/delegate.go | 2 +- core/services/workflows/engine.go | 31 +++++++++++++++++-- core/services/workflows/engine_test.go | 4 +-- core/services/workflows/syncer/handler.go | 14 ++------- .../services/workflows/syncer/handler_test.go | 13 +++++--- 7 files changed, 44 insertions(+), 45 deletions(-) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index f27a3e29b0e..156c5154c99 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -3,7 +3,6 @@ package compute import ( "context" "crypto/sha256" - "encoding/hex" "encoding/json" "errors" "fmt" @@ -299,19 +298,9 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err) } - // best effort to decode the workflow name - var workflowName string - decodedWorkflowName, err := hex.DecodeString(req.Metadata.WorkflowName) - if err != nil { - c.log.Errorf("failed to decode WorkflowName %q: %v", req.Metadata.WorkflowName, err) - workflowName = req.Metadata.WorkflowName - } else { - workflowName = string(decodedWorkflowName) - } - cma := c.emitter.With( platform.KeyWorkflowID, req.Metadata.WorkflowId, - platform.KeyWorkflowName, workflowName, + platform.KeyWorkflowName, req.Metadata.DecodedWorkflowName, platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner, platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId, timestampKey, time.Now().UTC().Format(time.RFC3339Nano), diff --git a/core/capabilities/targets/write_target.go b/core/capabilities/targets/write_target.go index 2adca542533..3fc51270c86 100644 --- a/core/capabilities/targets/write_target.go +++ b/core/capabilities/targets/write_target.go @@ -346,20 +346,10 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap case commontypes.Failed, commontypes.Fatal: cap.lggr.Error("Transaction failed", "request", request, "transaction", txID) - // best effort to decode the workflow name - var workflowName string - decodedWorkflowName, err := hex.DecodeString(request.Metadata.WorkflowName) - if err != nil { - cap.lggr.Errorf("failed to decode WorkflowName %q: %v", request.Metadata.WorkflowName, err) - workflowName = request.Metadata.WorkflowName - } else { - workflowName = string(decodedWorkflowName) - } - msg := "failed to submit transaction with ID: " + txID.String() err = cap.emitter.With( platform.KeyWorkflowID, request.Metadata.WorkflowID, - platform.KeyWorkflowName, workflowName, + platform.KeyWorkflowName, request.Metadata.DecodedWorkflowName, platform.KeyWorkflowOwner, request.Metadata.WorkflowOwner, platform.KeyWorkflowExecutionID, request.Metadata.WorkflowExecutionID, ).Emit(ctx, msg) diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index 3fb377d3edc..9e50f5ec092 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -81,7 +81,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser type noopSecretsFetcher struct{} -func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) { +func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) { return map[string]string{}, nil } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 2aa3cfd129a..c96ce145170 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 { } type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) + SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) } // Engine handles the lifecycle of a single workflow and its executions. @@ -432,6 +432,12 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig t.config.Store(tc) + workflowName, err := hex.DecodeString(e.workflow.hexName) + if err != nil { + e.logger.Warnw("failed to decode workflow name: %s, using encoded", err) + workflowName = []byte(e.workflow.hexName) + } + triggerRegRequest := capabilities.TriggerRegistrationRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, @@ -440,6 +446,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: t.Ref, + DecodedWorkflowName: string(workflowName), }, Config: t.config.Load(), TriggerID: triggerID, @@ -868,7 +875,13 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, e.workflow.id) + workflowName, err := hex.DecodeString(e.workflow.hexName) + if err != nil { + e.logger.Warnw("failed to decode workflow name: %s, using encoded", err) + workflowName = []byte(e.workflow.hexName) + } + + secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName, string(workflowName), e.workflow.id) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } @@ -953,6 +966,12 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe } } + workflowName, err := hex.DecodeString(e.workflow.hexName) + if err != nil { + e.logger.Warnw("failed to decode workflow name: %s, using encoded", err) + workflowName = []byte(e.workflow.hexName) + } + tr := capabilities.CapabilityRequest{ Inputs: inputsMap, Config: config, @@ -964,6 +983,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion, ReferenceID: msg.stepRef, + DecodedWorkflowName: string(workflowName), }, } @@ -981,6 +1001,12 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe } func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, triggerIdx int) error { + workflowName, err := hex.DecodeString(e.workflow.hexName) + if err != nil { + e.logger.Warnw("failed to decode workflow name: %s, using encoded", err) + workflowName = []byte(e.workflow.hexName) + } + deregRequest := capabilities.TriggerRegistrationRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, @@ -989,6 +1015,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr WorkflowName: e.workflow.hexName, WorkflowOwner: e.workflow.owner, ReferenceID: t.Ref, + DecodedWorkflowName: string(workflowName), }, TriggerID: generateTriggerId(e.workflow.id, triggerIdx), Config: t.config.Load(), diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index f4a382d6696..839e3680f72 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, type mockSecretsFetcher struct{} -func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) { +func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) { return map[string]string{}, nil } @@ -1606,7 +1606,7 @@ type mockFetcher struct { retval map[string]string } -func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) { +func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) { return m.retval, nil } diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 4fd237e41a4..bae311c846b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -224,7 +224,7 @@ func (h *eventHandler) refreshSecrets(ctx context.Context, workflowOwner, workfl return updatedSecrets, nil } -func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) { +func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, decodedWorkflowName, workflowID string) (map[string]string, error) { secretsURLHash, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID) if err != nil { // The workflow record was found, but secrets_id was empty. @@ -243,21 +243,11 @@ func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflo msg := fmt.Sprintf("could not refresh secrets: proceeding with stale secrets for workflowID %s: %s", workflowID, innerErr) h.lggr.Error(msg) - // best effort to decode the workflow name - var workflowName string - decodedWorkflowName, innerErr2 := hex.DecodeString(hexWorkflowName) - if innerErr2 != nil { - h.lggr.Errorf("failed to decode WorkflowName %q: %v", hexWorkflowName, innerErr2) - workflowName = hexWorkflowName - } else { - workflowName = string(decodedWorkflowName) - } - logCustMsg( ctx, h.emitter.With( platform.KeyWorkflowID, workflowID, - platform.KeyWorkflowName, workflowName, + platform.KeyWorkflowName, decodedWorkflowName, platform.KeyWorkflowOwner, workflowOwner, ), msg, diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 994b820b5ce..3b3231a3d3d 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -780,6 +780,7 @@ func Test_Handler_SecretsFor(t *testing.T) { workflowOwner := hex.EncodeToString([]byte("anOwner")) workflowName := "aName" workflowID := "anID" + decodedWorkflowName := "decodedName" encryptionKey, err := workflowkey.New() require.NoError(t, err) @@ -820,7 +821,7 @@ func Test_Handler_SecretsFor(t *testing.T) { encryptionKey, ) - gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) require.NoError(t, err) expectedSecrets := map[string]string{ @@ -837,6 +838,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { workflowOwner := hex.EncodeToString([]byte("anOwner")) workflowName := "aName" workflowID := "anID" + decodedWorkflowName := "decodedName" encryptionKey, err := workflowkey.New() require.NoError(t, err) @@ -881,7 +883,7 @@ func Test_Handler_SecretsFor_RefreshesSecrets(t *testing.T) { encryptionKey, ) - gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) require.NoError(t, err) expectedSecrets := map[string]string{ @@ -898,6 +900,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { workflowOwner := hex.EncodeToString([]byte("anOwner")) workflowName := "aName" workflowID := "anID" + decodedWorkflowName := "decodedName" encryptionKey, err := workflowkey.New() require.NoError(t, err) @@ -943,7 +946,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { encryptionKey, ) - gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + gotSecrets, err := h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) require.NoError(t, err) expectedSecrets := map[string]string{ @@ -955,7 +958,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { // SecretsFor should still succeed. fetcher.responseMap[url] = mockFetchResp{} - gotSecrets, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + gotSecrets, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) require.NoError(t, err) assert.Equal(t, expectedSecrets, gotSecrets) @@ -963,7 +966,7 @@ func Test_Handler_SecretsFor_RefreshLogic(t *testing.T) { // Now advance so that we hit the freshness limit clock.Advance(48 * time.Hour) - _, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + _, err = h.SecretsFor(testutils.Context(t), workflowOwner, workflowName, decodedWorkflowName, workflowID) assert.ErrorContains(t, err, "unexpected end of JSON input") }