Skip to content

Commit

Permalink
adding custom message w/ workflow execution ID when duration exceeds …
Browse files Browse the repository at this point in the history
…15 minutes (#15241)

* adding custom message w/ workflow execution ID when duration exceeds 15 minutes

* adding logging for internal evs

* fixing Info --> Infof
  • Loading branch information
patrickhuie19 authored Nov 14, 2024
1 parent 99ab8b4 commit b805b82
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

const fifteenMinutesMs = 15 * 60 * 1000

type stepRequest struct {
stepRef string
state store.WorkflowExecution
Expand Down Expand Up @@ -622,7 +624,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
// the async nature of the workflow engine would provide no guarantees.
}
logCustMsg(ctx, cma, "execution status: "+status, l)
return e.finishExecution(ctx, state.ExecutionID, status)
return e.finishExecution(ctx, cma, state.ExecutionID, status)
}

// Finally, since the workflow hasn't timed out or completed, let's
Expand Down Expand Up @@ -669,9 +671,12 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {
}
}

func (e *Engine) finishExecution(ctx context.Context, executionID string, status string) error {
e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status).Info("finishing execution")
func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error {
l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status)
metrics := e.metrics.with("status", status)

l.Info("finishing execution")

err := e.executionStates.UpdateStatus(ctx, executionID, status)
if err != nil {
return err
Expand All @@ -687,6 +692,13 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status
e.stepUpdatesChMap.remove(executionID)
metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration)

if executionDuration > fifteenMinutesMs {
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration)
}
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l)
l.Infof("execution duration: %d", executionDuration)
e.onExecutionFinished(executionID)
return nil
}
Expand Down

0 comments on commit b805b82

Please sign in to comment.