Skip to content

Commit

Permalink
fix: hex decode workflowName when logged via beholder
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Dec 30, 2024
1 parent b5c671e commit 1f831d9
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 8 deletions.
13 changes: 12 additions & 1 deletion core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compute
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -298,9 +299,19 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", req.Metadata.WorkflowExecutionId, err)
}

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, err := hex.DecodeString(req.Metadata.WorkflowName)
if err != nil {
c.log.Errorf("failed to decode WorkflowName %q: %v", req.Metadata.WorkflowName, err)
workflowName = req.Metadata.WorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

cma := c.emitter.With(
platform.KeyWorkflowID, req.Metadata.WorkflowId,
platform.KeyWorkflowName, req.Metadata.WorkflowName,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowOwner, req.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, req.Metadata.WorkflowExecutionId,
timestampKey, time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
13 changes: 12 additions & 1 deletion core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,21 @@ func (cap *WriteTarget) Execute(ctx context.Context, rawRequest capabilities.Cap
return capabilities.CapabilityResponse{}, nil
case commontypes.Failed, commontypes.Fatal:
cap.lggr.Error("Transaction failed", "request", request, "transaction", txID)

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, err := hex.DecodeString(request.Metadata.WorkflowName)
if err != nil {
cap.lggr.Errorf("failed to decode WorkflowName %q: %v", request.Metadata.WorkflowName, err)
workflowName = request.Metadata.WorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

msg := "failed to submit transaction with ID: " + txID.String()
err = cap.emitter.With(
platform.KeyWorkflowID, request.Metadata.WorkflowID,
platform.KeyWorkflowName, request.Metadata.WorkflowName,
platform.KeyWorkflowName, workflowName,
platform.KeyWorkflowOwner, request.Metadata.WorkflowOwner,
platform.KeyWorkflowExecutionID, request.Metadata.WorkflowExecutionID,
).Emit(ctx, msg)
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser

type noopSecretsFetcher struct{}

func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
func (n *noopSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error)
SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down
4 changes: 2 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
return map[string]string{}, nil
}

Expand Down Expand Up @@ -1606,7 +1606,7 @@ type mockFetcher struct {
retval map[string]string
}

func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
return m.retval, nil
}

Expand Down
15 changes: 13 additions & 2 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (h *eventHandler) refreshSecrets(ctx context.Context, workflowOwner, workfl
return updatedSecrets, nil
}

func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) {
func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, hexWorkflowName, workflowID string) (map[string]string, error) {
secretsURLHash, secretsPayload, err := h.orm.GetContentsByWorkflowID(ctx, workflowID)
if err != nil {
// The workflow record was found, but secrets_id was empty.
Expand All @@ -238,10 +238,21 @@ func (h *eventHandler) SecretsFor(ctx context.Context, workflowOwner, workflowNa

lastFetchedAt, ok := h.lastFetchedAtMap.Get(secretsURLHash)
if !ok || h.clock.Now().Sub(lastFetchedAt) > h.secretsFreshnessDuration {
updatedSecrets, innerErr := h.refreshSecrets(ctx, workflowOwner, workflowName, workflowID, secretsURLHash)
updatedSecrets, innerErr := h.refreshSecrets(ctx, workflowOwner, hexWorkflowName, workflowID, secretsURLHash)
if innerErr != nil {
msg := fmt.Sprintf("could not refresh secrets: proceeding with stale secrets for workflowID %s: %s", workflowID, innerErr)
h.lggr.Error(msg)

// best effort to decode the workflow name
var workflowName string
decodedWorkflowName, innerErr2 := hex.DecodeString(hexWorkflowName)
if innerErr2 != nil {
h.lggr.Errorf("failed to decode WorkflowName %q: %v", hexWorkflowName, innerErr2)
workflowName = hexWorkflowName
} else {
workflowName = string(decodedWorkflowName)
}

logCustMsg(
ctx,
h.emitter.With(
Expand Down

0 comments on commit 1f831d9

Please sign in to comment.