diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 75b6281467c51..26feeee589b28 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -123,6 +123,10 @@ export class ActiveExecutions { this.activeExecutions[executionId].responsePromise?.resolve(response); } + getPostExecutePromiseCount(executionId: string): number { + return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0; + } + /** * Remove an active execution * diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index 12bfe6bebe5d1..128616e4dea18 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -29,6 +29,8 @@ import { NodeTypes } from './NodeTypes'; import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; import { ExecutionRepository } from '@db/repositories'; import { RoleService } from './services/role.service'; +import type { EventPayloadWorkflow } from './eventbus/EventMessageClasses/EventMessageWorkflow'; +import { determineFinalExecutionStatus } from './executionLifecycleHooks/shared/sharedHookFunctions'; function userToPayload(user: User): { userId: string; @@ -240,21 +242,35 @@ export class InternalHooks implements IInternalHooksClass { async onWorkflowBeforeExecute( executionId: string, - data: IWorkflowExecutionDataProcess, + data: IWorkflowExecutionDataProcess | IWorkflowBase, ): Promise { + let payload: EventPayloadWorkflow; + // this hook is called slightly differently depending on whether it's from a worker or the main instance + // in the worker context, meaning in queue mode, only IWorkflowBase is available + if ('executionData' in data) { + payload = { + executionId, + userId: data.userId ?? undefined, + workflowId: data.workflowData.id?.toString(), + isManual: data.executionMode === 'manual', + workflowName: data.workflowData.name, + }; + } else { + payload = { + executionId, + userId: undefined, + workflowId: (data as IWorkflowBase).id?.toString(), + isManual: false, + workflowName: (data as IWorkflowBase).name, + }; + } void Promise.all([ this.executionRepository.updateExistingExecution(executionId, { status: 'running', }), eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.started', - payload: { - executionId, - userId: data.userId, - workflowId: data.workflowData.id?.toString(), - isManual: data.executionMode === 'manual', - workflowName: data.workflowData.name, - }, + payload, }), ]); } @@ -300,7 +316,7 @@ export class InternalHooks implements IInternalHooksClass { const promises = []; - const properties: IExecutionTrackProperties = { + const telemetryProperties: IExecutionTrackProperties = { workflow_id: workflow.id, is_manual: false, version_cli: N8N_VERSION, @@ -308,39 +324,33 @@ export class InternalHooks implements IInternalHooksClass { }; if (userId) { - properties.user_id = userId; + telemetryProperties.user_id = userId; } if (runData?.data.resultData.error?.message?.includes('canceled')) { runData.status = 'canceled'; } - properties.success = !!runData?.finished; + telemetryProperties.success = !!runData?.finished; - let executionStatus: ExecutionStatus; - if (runData?.status === 'crashed') { - executionStatus = 'crashed'; - } else if (runData?.status === 'waiting' || runData?.data?.waitTill) { - executionStatus = 'waiting'; - } else if (runData?.status === 'canceled') { - executionStatus = 'canceled'; - } else { - executionStatus = properties.success ? 'success' : 'failed'; - } + // const executionStatus: ExecutionStatus = runData?.status ?? 'unknown'; + const executionStatus: ExecutionStatus = runData + ? determineFinalExecutionStatus(runData) + : 'unknown'; if (runData !== undefined) { - properties.execution_mode = runData.mode; - properties.is_manual = runData.mode === 'manual'; + telemetryProperties.execution_mode = runData.mode; + telemetryProperties.is_manual = runData.mode === 'manual'; let nodeGraphResult: INodesGraphResult | null = null; - if (!properties.success && runData?.data.resultData.error) { - properties.error_message = runData?.data.resultData.error.message; + if (!telemetryProperties.success && runData?.data.resultData.error) { + telemetryProperties.error_message = runData?.data.resultData.error.message; let errorNodeName = 'node' in runData?.data.resultData.error ? runData?.data.resultData.error.node?.name : undefined; - properties.error_node_type = + telemetryProperties.error_node_type = 'node' in runData?.data.resultData.error ? runData?.data.resultData.error.node?.type : undefined; @@ -352,23 +362,23 @@ export class InternalHooks implements IInternalHooksClass { ); if (lastNode !== undefined) { - properties.error_node_type = lastNode.type; + telemetryProperties.error_node_type = lastNode.type; errorNodeName = lastNode.name; } } - if (properties.is_manual) { + if (telemetryProperties.is_manual) { nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes); - properties.node_graph = nodeGraphResult.nodeGraph; - properties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph); + telemetryProperties.node_graph = nodeGraphResult.nodeGraph; + telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph); if (errorNodeName) { - properties.error_node_id = nodeGraphResult.nameIndices[errorNodeName]; + telemetryProperties.error_node_id = nodeGraphResult.nameIndices[errorNodeName]; } } } - if (properties.is_manual) { + if (telemetryProperties.is_manual) { if (!nodeGraphResult) { nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes); } @@ -386,10 +396,10 @@ export class InternalHooks implements IInternalHooksClass { workflow_id: workflow.id, status: executionStatus, executionStatus: runData?.status ?? 'unknown', - error_message: properties.error_message as string, - error_node_type: properties.error_node_type, - node_graph_string: properties.node_graph_string as string, - error_node_id: properties.error_node_id as string, + error_message: telemetryProperties.error_message as string, + error_node_type: telemetryProperties.error_node_type, + node_graph_string: telemetryProperties.node_graph_string as string, + error_node_id: telemetryProperties.error_node_id as string, webhook_domain: null, sharing_role: userRole, }; @@ -428,39 +438,34 @@ export class InternalHooks implements IInternalHooksClass { } } + const sharedEventPayload: EventPayloadWorkflow = { + executionId, + success: telemetryProperties.success, + userId: telemetryProperties.user_id, + workflowId: workflow.id, + isManual: telemetryProperties.is_manual, + workflowName: workflow.name, + metaData: runData?.data?.resultData?.metadata, + }; promises.push( - properties.success + telemetryProperties.success ? eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.success', - payload: { - executionId, - success: properties.success, - userId: properties.user_id, - workflowId: properties.workflow_id, - isManual: properties.is_manual, - workflowName: workflow.name, - metaData: runData?.data?.resultData?.metadata, - }, + payload: sharedEventPayload, }) : eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.failed', payload: { - executionId, - success: properties.success, - userId: properties.user_id, - workflowId: properties.workflow_id, + ...sharedEventPayload, lastNodeExecuted: runData?.data.resultData.lastNodeExecuted, - errorNodeType: properties.error_node_type, - errorNodeId: properties.error_node_id?.toString(), - errorMessage: properties.error_message?.toString(), - isManual: properties.is_manual, - workflowName: workflow.name, - metaData: runData?.data?.resultData?.metadata, + errorNodeType: telemetryProperties.error_node_type, + errorNodeId: telemetryProperties.error_node_id?.toString(), + errorMessage: telemetryProperties.error_message?.toString(), }, }), ); - void Promise.all([...promises, this.telemetry.trackWorkflowExecution(properties)]); + void Promise.all([...promises, this.telemetry.trackWorkflowExecution(telemetryProperties)]); } async onWorkflowSharingUpdate(workflowId: string, userId: string, userList: string[]) { diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 27c59b411d40b..a3fe4b82dd473 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -1,6 +1,6 @@ import type Bull from 'bull'; import { Service } from 'typedi'; -import { type IExecuteResponsePromiseData } from 'n8n-workflow'; +import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import * as WebhookHelpers from '@/WebhookHelpers'; import { @@ -23,6 +23,7 @@ export interface JobData { export interface JobResponse { success: boolean; + error?: ExecutionError; } export interface WebhookResponse { diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index c5c05f4215726..427856d5f1ee1 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -558,220 +558,222 @@ export async function executeWebhook( { executionId }, ); - // Get a promise which resolves when the workflow did execute and send then response - const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( - executionId, - ) as Promise; - executePromise - .then(async (data) => { - if (data === undefined) { - if (!didSendResponse) { - responseCallback(null, { - data: { - message: 'Workflow executed successfully but no data was returned', - }, - responseCode, - }); - didSendResponse = true; + if (!didSendResponse) { + // Get a promise which resolves when the workflow did execute and send then response + const executePromise = Container.get(ActiveExecutions).getPostExecutePromise( + executionId, + ) as Promise; + executePromise + .then(async (data) => { + if (data === undefined) { + if (!didSendResponse) { + responseCallback(null, { + data: { + message: 'Workflow executed successfully but no data was returned', + }, + responseCode, + }); + didSendResponse = true; + } + return undefined; } - return undefined; - } - - if (workflowData.pinData) { - data.data.resultData.pinData = workflowData.pinData; - } - const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); - if (data.data.resultData.error || returnData?.error !== undefined) { - if (!didSendResponse) { - responseCallback(null, { - data: { - message: 'Error in workflow', - }, - responseCode: 500, - }); + if (workflowData.pinData) { + data.data.resultData.pinData = workflowData.pinData; } - didSendResponse = true; - return data; - } - if (responseMode === 'responseNode') { - if (!didSendResponse) { - // Return an error if no Webhook-Response node did send any data - responseCallback(null, { - data: { - message: 'Workflow executed successfully', - }, - responseCode, - }); + const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); + if (data.data.resultData.error || returnData?.error !== undefined) { + if (!didSendResponse) { + responseCallback(null, { + data: { + message: 'Error in workflow', + }, + responseCode: 500, + }); + } didSendResponse = true; + return data; } - return undefined; - } - if (returnData === undefined) { - if (!didSendResponse) { - responseCallback(null, { - data: { - message: 'Workflow executed successfully but the last node did not return any data', - }, - responseCode, - }); + if (responseMode === 'responseNode') { + if (!didSendResponse) { + // Return an error if no Webhook-Response node did send any data + responseCallback(null, { + data: { + message: 'Workflow executed successfully', + }, + responseCode, + }); + didSendResponse = true; + } + return undefined; } - didSendResponse = true; - return data; - } - - const additionalKeys: IWorkflowDataProxyAdditionalKeys = { - $executionId: executionId, - }; - if (!didSendResponse) { - let data: IDataObject | IDataObject[] | undefined; + if (returnData === undefined) { + if (!didSendResponse) { + responseCallback(null, { + data: { + message: + 'Workflow executed successfully but the last node did not return any data', + }, + responseCode, + }); + } + didSendResponse = true; + return data; + } - if (responseData === 'firstEntryJson') { - // Return the JSON data of the first entry + const additionalKeys: IWorkflowDataProxyAdditionalKeys = { + $executionId: executionId, + }; - if (returnData.data!.main[0]![0] === undefined) { - responseCallback(new Error('No item to return got found'), {}); - didSendResponse = true; - return undefined; - } + if (!didSendResponse) { + let data: IDataObject | IDataObject[] | undefined; - data = returnData.data!.main[0]![0].json; + if (responseData === 'firstEntryJson') { + // Return the JSON data of the first entry - const responsePropertyName = workflow.expression.getSimpleParameterValue( - workflowStartNode, - webhookData.webhookDescription.responsePropertyName, - executionMode, - additionalData.timezone, - additionalKeys, - undefined, - undefined, - ); + if (returnData.data!.main[0]![0] === undefined) { + responseCallback(new Error('No item to return got found'), {}); + didSendResponse = true; + return undefined; + } - if (responsePropertyName !== undefined) { - data = get(data, responsePropertyName as string) as IDataObject; - } + data = returnData.data!.main[0]![0].json; - const responseContentType = workflow.expression.getSimpleParameterValue( - workflowStartNode, - webhookData.webhookDescription.responseContentType, - executionMode, - additionalData.timezone, - additionalKeys, - undefined, - undefined, - ); + const responsePropertyName = workflow.expression.getSimpleParameterValue( + workflowStartNode, + webhookData.webhookDescription.responsePropertyName, + executionMode, + additionalData.timezone, + additionalKeys, + undefined, + undefined, + ); - if (responseContentType !== undefined) { - // Send the webhook response manually to be able to set the content-type - res.setHeader('Content-Type', responseContentType as string); - - // Returning an object, boolean, number, ... causes problems so make sure to stringify if needed - if ( - data !== null && - data !== undefined && - ['Buffer', 'String'].includes(data.constructor.name) - ) { - res.end(data); - } else { - res.end(JSON.stringify(data)); + if (responsePropertyName !== undefined) { + data = get(data, responsePropertyName as string) as IDataObject; } - responseCallback(null, { - noWebhookResponse: true, - }); - didSendResponse = true; - } - } else if (responseData === 'firstEntryBinary') { - // Return the binary data of the first entry - data = returnData.data!.main[0]![0]; + const responseContentType = workflow.expression.getSimpleParameterValue( + workflowStartNode, + webhookData.webhookDescription.responseContentType, + executionMode, + additionalData.timezone, + additionalKeys, + undefined, + undefined, + ); - if (data === undefined) { - responseCallback(new Error('No item was found to return'), {}); - didSendResponse = true; - return undefined; - } + if (responseContentType !== undefined) { + // Send the webhook response manually to be able to set the content-type + res.setHeader('Content-Type', responseContentType as string); + + // Returning an object, boolean, number, ... causes problems so make sure to stringify if needed + if ( + data !== null && + data !== undefined && + ['Buffer', 'String'].includes(data.constructor.name) + ) { + res.end(data); + } else { + res.end(JSON.stringify(data)); + } + + responseCallback(null, { + noWebhookResponse: true, + }); + didSendResponse = true; + } + } else if (responseData === 'firstEntryBinary') { + // Return the binary data of the first entry + data = returnData.data!.main[0]![0]; + + if (data === undefined) { + responseCallback(new Error('No item was found to return'), {}); + didSendResponse = true; + return undefined; + } - if (data.binary === undefined) { - responseCallback(new Error('No binary data was found to return'), {}); - didSendResponse = true; - return undefined; - } + if (data.binary === undefined) { + responseCallback(new Error('No binary data was found to return'), {}); + didSendResponse = true; + return undefined; + } - const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue( - workflowStartNode, - webhookData.webhookDescription.responseBinaryPropertyName, - executionMode, - additionalData.timezone, - additionalKeys, - undefined, - 'data', - ); + const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue( + workflowStartNode, + webhookData.webhookDescription.responseBinaryPropertyName, + executionMode, + additionalData.timezone, + additionalKeys, + undefined, + 'data', + ); - if (responseBinaryPropertyName === undefined && !didSendResponse) { - responseCallback(new Error("No 'responseBinaryPropertyName' is set"), {}); - didSendResponse = true; - } + if (responseBinaryPropertyName === undefined && !didSendResponse) { + responseCallback(new Error("No 'responseBinaryPropertyName' is set"), {}); + didSendResponse = true; + } - const binaryData = (data.binary as IBinaryKeyData)[ - responseBinaryPropertyName as string - ]; - if (binaryData === undefined && !didSendResponse) { - responseCallback( - new Error( - `The binary property '${responseBinaryPropertyName}' which should be returned does not exist`, - ), - {}, - ); - didSendResponse = true; - } + const binaryData = (data.binary as IBinaryKeyData)[ + responseBinaryPropertyName as string + ]; + if (binaryData === undefined && !didSendResponse) { + responseCallback( + new Error( + `The binary property '${responseBinaryPropertyName}' which should be returned does not exist`, + ), + {}, + ); + didSendResponse = true; + } - if (!didSendResponse) { - // Send the webhook response manually - res.setHeader('Content-Type', binaryData.mimeType); - if (binaryData.id) { - const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id); - await pipeline(stream, res); - } else { - res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); + if (!didSendResponse) { + // Send the webhook response manually + res.setHeader('Content-Type', binaryData.mimeType); + if (binaryData.id) { + const stream = BinaryDataManager.getInstance().getBinaryStream(binaryData.id); + await pipeline(stream, res); + } else { + res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); + } + + responseCallback(null, { + noWebhookResponse: true, + }); + } + } else if (responseData === 'noData') { + // Return without data + data = undefined; + } else { + // Return the JSON data of all the entries + data = []; + for (const entry of returnData.data!.main[0]!) { + data.push(entry.json); } + } + if (!didSendResponse) { responseCallback(null, { - noWebhookResponse: true, + data, + responseCode, }); } - } else if (responseData === 'noData') { - // Return without data - data = undefined; - } else { - // Return the JSON data of all the entries - data = []; - for (const entry of returnData.data!.main[0]!) { - data.push(entry.json); - } } + didSendResponse = true; + return data; + }) + .catch((e) => { if (!didSendResponse) { - responseCallback(null, { - data, - responseCode, - }); + responseCallback(new Error('There was a problem executing the workflow'), {}); } - } - didSendResponse = true; - - return data; - }) - .catch((e) => { - if (!didSendResponse) { - responseCallback(new Error('There was a problem executing the workflow'), {}); - } - - throw new ResponseHelper.InternalServerError(e.message); - }); + throw new ResponseHelper.InternalServerError(e.message); + }); + } return executionId; } catch (e) { const error = diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 1161c8c0edf70..2dad79507f471 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -36,7 +36,6 @@ import { WorkflowHooks, } from 'n8n-workflow'; -import pick from 'lodash/pick'; import { Container } from 'typedi'; import type { FindOptionsWhere } from 'typeorm'; import { LessThanOrEqual, In } from 'typeorm'; @@ -66,7 +65,11 @@ import { ExecutionRepository } from '@db/repositories'; import { EventsService } from '@/services/events.service'; import { SecretsHelper } from './SecretsHelpers'; import { OwnershipService } from './services/ownership.service'; -import { ExecutionMetadataService } from './services/executionMetadata.service'; +import { + determineFinalExecutionStatus, + prepareExecutionDataForDbUpdate, + updateExistingExecution, +} from './executionLifecycleHooks/shared/sharedHookFunctions'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -569,18 +572,11 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { saveDataSuccessExecution; } - const workflowHasCrashed = fullRunData.status === 'crashed'; - const workflowWasCanceled = fullRunData.status === 'canceled'; - const workflowDidSucceed = - !fullRunData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled; - let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed'; - if (workflowHasCrashed) workflowStatusFinal = 'crashed'; - if (workflowWasCanceled) workflowStatusFinal = 'canceled'; - if (fullRunData.waitTill) workflowStatusFinal = 'waiting'; + const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); if ( - (workflowDidSucceed && saveDataSuccessExecution === 'none') || - (!workflowDidSucceed && saveDataErrorExecution === 'none') + (workflowStatusFinal === 'success' && saveDataSuccessExecution === 'none') || + (workflowStatusFinal !== 'success' && saveDataErrorExecution === 'none') ) { if (!fullRunData.waitTill && !isManualMode) { executeErrorWorkflow( @@ -599,69 +595,19 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive // As a result, we should create an IWorkflowBase object with only the data we want to save in it. - const pristineWorkflowData: IWorkflowBase = pick(this.workflowData, [ - 'id', - 'name', - 'active', - 'createdAt', - 'updatedAt', - 'nodes', - 'connections', - 'settings', - 'staticData', - 'pinData', - ]); - - const fullExecutionData: IExecutionDb = { - data: fullRunData.data, - mode: fullRunData.mode, - finished: fullRunData.finished ? fullRunData.finished : false, - startedAt: fullRunData.startedAt, - stoppedAt: fullRunData.stoppedAt, - workflowData: pristineWorkflowData, - waitTill: fullRunData.waitTill, - status: workflowStatusFinal, - }; - - if (this.retryOf !== undefined) { - fullExecutionData.retryOf = this.retryOf?.toString(); - } - - const workflowId = this.workflowData.id; - if (isWorkflowIdValid(workflowId)) { - fullExecutionData.workflowId = workflowId; - } + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, + workflowData: this.workflowData, + workflowStatusFinal, + retryOf: this.retryOf, + }); - // Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here - Logger.debug(`Save execution data to database for execution ID ${this.executionId}`, { + await updateExistingExecution({ executionId: this.executionId, - workflowId, - finished: fullExecutionData.finished, - stoppedAt: fullExecutionData.stoppedAt, + workflowId: this.workflowData.id as string, + executionData: fullExecutionData, }); - await Container.get(ExecutionRepository).updateExistingExecution( - this.executionId, - fullExecutionData, - ); - - try { - if (fullRunData.data.resultData.metadata) { - await Container.get(ExecutionMetadataService).save( - this.executionId, - fullRunData.data.resultData.metadata, - ); - } - } catch (e) { - Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e); - } - - if (fullRunData.finished === true && this.retryOf !== undefined) { - await Container.get(ExecutionRepository).updateExistingExecution(this.retryOf, { - retrySuccessId: this.executionId, - }); - } - if (!isManualMode) { executeErrorWorkflow( this.workflowData, @@ -707,18 +653,40 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { * */ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { + const internalHooks = Container.get(InternalHooks); const eventsService = Container.get(EventsService); return { - nodeExecuteBefore: [], - nodeExecuteAfter: [], - workflowExecuteBefore: [], + nodeExecuteBefore: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName); + }, + ], + nodeExecuteAfter: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName); + }, + ], + workflowExecuteBefore: [ + async function (workflow: Workflow, data: IRunExecutionData): Promise { + void internalHooks.onWorkflowBeforeExecute(this.executionId, this.workflowData); + }, + ], workflowExecuteAfter: [ async function ( this: WorkflowHooks, fullRunData: IRun, newStaticData: IDataObject, ): Promise { + Logger.debug('Executing hook (hookFunctionsSaveWorker)', { + executionId: this.executionId, + workflowId: this.workflowData.id, + }); try { + // Prune old execution data + if (config.getEnv('executions.pruneData')) { + await pruneExecutionData.call(this); + } + if (isWorkflowIdValid(this.workflowData.id) && newStaticData) { // Workflow is saved so update in database try { @@ -735,16 +703,9 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { } } - const workflowHasCrashed = fullRunData.status === 'crashed'; - const workflowWasCanceled = fullRunData.status === 'canceled'; - const workflowDidSucceed = - !fullRunData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled; - let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed'; - if (workflowHasCrashed) workflowStatusFinal = 'crashed'; - if (workflowWasCanceled) workflowStatusFinal = 'canceled'; - if (fullRunData.waitTill) workflowStatusFinal = 'waiting'; + const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); - if (!workflowDidSucceed) { + if (workflowStatusFinal !== 'success') { executeErrorWorkflow( this.workflowData, fullRunData, @@ -754,54 +715,20 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { ); } - const fullExecutionData: IExecutionDb = { - data: fullRunData.data, - mode: fullRunData.mode, - finished: fullRunData.finished ? fullRunData.finished : false, - startedAt: fullRunData.startedAt, - stoppedAt: fullRunData.stoppedAt, + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const fullExecutionData = prepareExecutionDataForDbUpdate({ + runData: fullRunData, workflowData: this.workflowData, - waitTill: fullRunData.data.waitTill, - status: workflowStatusFinal, - }; - - if (this.retryOf !== undefined) { - fullExecutionData.retryOf = this.retryOf.toString(); - } - - const workflowId = this.workflowData.id; - if (isWorkflowIdValid(workflowId)) { - fullExecutionData.workflowId = workflowId; - } - - await Container.get(ExecutionRepository).updateExistingExecution( - this.executionId, - fullExecutionData, - ); - - // For reasons(tm) the execution status is not updated correctly in the first update, so has to be written again (tbd) - - await Container.get(ExecutionRepository).updateExistingExecution(this.executionId, { - status: fullExecutionData.status, + workflowStatusFinal, + retryOf: this.retryOf, }); - try { - if (fullRunData.data.resultData.metadata) { - await Container.get(ExecutionMetadataService).save( - this.executionId, - fullRunData.data.resultData.metadata, - ); - } - } catch (e) { - Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e); - } - - if (fullRunData.finished === true && this.retryOf !== undefined) { - // If the retry was successful save the reference it on the original execution - await Container.get(ExecutionRepository).updateExistingExecution(this.retryOf, { - retrySuccessId: this.executionId, - }); - } + await updateExistingExecution({ + executionId: this.executionId, + workflowId: this.workflowData.id as string, + executionData: fullExecutionData, + }); } catch (error) { executeErrorWorkflow( this.workflowData, @@ -814,6 +741,14 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData); } }, + async function ( + this: WorkflowHooks, + fullRunData: IRun, + newStaticData: IDataObject, + ): Promise { + // send tracking and event log events, but don't wait for them + void internalHooks.onWorkflowPostExecute(this.executionId, this.workflowData, fullRunData); + }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { @@ -1216,14 +1151,19 @@ export function getWorkflowHooksWorkerMain( optionalParameters?: IWorkflowHooksOptionalParameters, ): WorkflowHooks { optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsPush(); - const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); - for (const key of Object.keys(preExecuteFunctions)) { - if (hookFunctions[key] === undefined) { - hookFunctions[key] = []; - } - hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } + const hookFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); + + // TODO: why are workers pushing to frontend? + // TODO: simplifying this for now to just leave the bare minimum hooks + + // const hookFunctions = hookFunctionsPush(); + // const preExecuteFunctions = hookFunctionsPreExecute(optionalParameters.parentProcessMode); + // for (const key of Object.keys(preExecuteFunctions)) { + // if (hookFunctions[key] === undefined) { + // hookFunctions[key] = []; + // } + // hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); + // } // When running with worker mode, main process executes // Only workflowExecuteBefore + workflowExecuteAfter diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index b7e4b4eaacb48..6c7a5b77bf029 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -31,6 +31,7 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import { ExternalHooks } from '@/ExternalHooks'; import type { + IExecutionResponse, IProcessMessageDataHook, IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcessWithExecution, @@ -185,43 +186,44 @@ export class WorkflowRunner { executionId, responsePromise, ); - } else if (executionsProcess === 'main') { - executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); } else { - executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); + if (executionsProcess === 'main') { + executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); + } else { + executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); + } + void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); } - void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data); - - const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); - - const externalHooks = Container.get(ExternalHooks); - postExecutePromise - .then(async (executionData) => { - void Container.get(InternalHooks).onWorkflowPostExecute( - executionId!, - data.workflowData, - executionData, - data.userId, - ); - }) - .catch((error) => { - ErrorReporter.error(error); - console.error('There was a problem running internal hook "onWorkflowPostExecute"', error); - }); - - if (externalHooks.exists('workflow.postExecute')) { + // only run these when not in queue mode or when the execution is manual, + // since these calls are now done by the worker directly + if (executionsMode !== 'queue' || data.executionMode === 'manual') { + const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); + const externalHooks = Container.get(ExternalHooks); postExecutePromise .then(async (executionData) => { - await externalHooks.run('workflow.postExecute', [ - executionData, + void Container.get(InternalHooks).onWorkflowPostExecute( + executionId!, data.workflowData, - executionId, - ]); + executionData, + data.userId, + ); + if (externalHooks.exists('workflow.postExecute')) { + try { + await externalHooks.run('workflow.postExecute', [ + executionData, + data.workflowData, + executionId, + ]); + } catch (error) { + ErrorReporter.error(error); + console.error('There was a problem running hook "workflow.postExecute"', error); + } + } }) .catch((error) => { ErrorReporter.error(error); - console.error('There was a problem running hook "workflow.postExecute"', error); + console.error('There was a problem running internal hook "onWorkflowPostExecute"', error); }); } @@ -501,7 +503,7 @@ export class WorkflowRunner { const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval'); - const racingPromises: Array> = [jobData]; + const racingPromises: Array> = [jobData]; let clearWatchdogInterval; if (queueRecoveryInterval > 0) { @@ -519,7 +521,7 @@ export class WorkflowRunner { ************************************************ */ let watchDogInterval: NodeJS.Timeout | undefined; - const watchDog: Promise = new Promise((res) => { + const watchDog: Promise = new Promise((res) => { watchDogInterval = setInterval(async () => { const currentJob = await this.jobQueue.getJob(job.id); // When null means job is finished (not found in queue) @@ -540,8 +542,11 @@ export class WorkflowRunner { }; } + let racingPromisesResult: JobResponse = { + success: false, + }; try { - await Promise.race(racingPromises); + racingPromisesResult = await Promise.race(racingPromises); if (clearWatchdogInterval !== undefined) { clearWatchdogInterval(); } @@ -564,25 +569,48 @@ export class WorkflowRunner { reject(error); } + // optimization: only pull and unflatten execution data from the Db when it is needed + const executionHasPostExecutionPromises = + this.activeExecutions.getPostExecutePromiseCount(executionId) > 0; + + if (executionHasPostExecutionPromises) { + Logger.debug( + `Reading execution data for execution ${executionId} from db for PostExecutionPromise.`, + ); + } else { + Logger.debug( + `Skipping execution data for execution ${executionId} since there are no PostExecutionPromise.`, + ); + } + const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( executionId, { - includeData: true, - unflattenData: true, + includeData: executionHasPostExecutionPromises, + unflattenData: executionHasPostExecutionPromises, }, ); if (!fullExecutionData) { return reject(new Error(`Could not find execution with id "${executionId}"`)); } - const runData = { - data: fullExecutionData.data, + + const runData: IRun = { + data: {}, finished: fullExecutionData.finished, mode: fullExecutionData.mode, startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, } as IRun; + if (executionHasPostExecutionPromises) { + runData.data = (fullExecutionData as IExecutionResponse).data; + } + + // NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present, + // the execution data in runData.data MAY not be available here. + // This means that any function expecting with runData has to check if the runData.data defined from this point this.activeExecutions.remove(executionId, runData); + // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed. await hooks.executeHookFunctions('workflowExecuteAfter', [runData]); @@ -596,7 +624,7 @@ export class WorkflowRunner { workflowSettings.saveDataSuccessExecution ?? config.getEnv('executions.saveDataOnSuccess'); - const workflowDidSucceed = !runData.data.resultData.error; + const workflowDidSucceed = !racingPromisesResult.error; if ( (workflowDidSucceed && saveDataSuccessExecution === 'none') || (!workflowDidSucceed && saveDataErrorExecution === 'none') diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 8985d3addbcb5..30cf4be73e5c1 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -2,13 +2,18 @@ import express from 'express'; import http from 'http'; import type PCancelable from 'p-cancelable'; import { Container } from 'typedi'; -import * as os from 'os'; import { flags } from '@oclif/command'; import { WorkflowExecute } from 'n8n-core'; -import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; -import { Workflow, NodeOperationError, LoggerProxy, sleep, jsonParse } from 'n8n-workflow'; +import type { + ExecutionError, + ExecutionStatus, + IExecuteResponsePromiseData, + INodeTypes, + IRun, +} from 'n8n-workflow'; +import { Workflow, NodeOperationError, LoggerProxy, sleep } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; @@ -32,8 +37,7 @@ import { eventBus } from '../eventbus'; import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; -import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import { type RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; +import { getWorkerCommandReceivedHandler } from '../worker/workerCommandHandler'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -179,7 +183,9 @@ export class Worker extends BaseCommand { fullExecutionData.mode, job.data.executionId, fullExecutionData.workflowData, - { retryOf: fullExecutionData.retryOf as string }, + { + retryOf: fullExecutionData.retryOf as string, + }, ); try { @@ -193,7 +199,7 @@ export class Worker extends BaseCommand { ); await additionalData.hooks.executeHookFunctions('workflowExecuteAfter', [failedExecution]); } - return { success: true }; + return { success: true, error: error as ExecutionError }; } additionalData.hooks.hookFunctions.sendResponse = [ @@ -236,6 +242,9 @@ export class Worker extends BaseCommand { delete Worker.runningJobs[job.id]; + // do NOT call workflowExecuteAfter hook here, since it is being called from processSuccessExecution() + // already! + return { success: true, }; @@ -283,7 +292,12 @@ export class Worker extends BaseCommand { await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'WorkerCommandReceivedHandler', - this.getWorkerCommandReceivedHandler(), + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + getWorkerCommandReceivedHandler({ + uniqueInstanceId: this.uniqueInstanceId, + redisPublisher: this.redisPublisher, + getRunningJobIds: () => Object.keys(Worker.runningJobs), + }), ); } @@ -464,78 +478,4 @@ export class Worker extends BaseCommand { async catch(error: Error) { await this.exitWithCrash('Worker exiting due to an error.', error); } - - private getWorkerCommandReceivedHandler() { - const { uniqueInstanceId, redisPublisher } = this; - const getRunningJobIds = () => Object.keys(Worker.runningJobs); - return async (channel: string, messageString: string) => { - if (channel === COMMAND_REDIS_CHANNEL) { - if (!messageString) return; - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - LoggerProxy.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - if (message.targets && !message.targets.includes(uniqueInstanceId)) { - return; // early return if the message is not for this worker - } - switch (message.command) { - case 'getStatus': - await redisPublisher.publishToWorkerChannel({ - workerId: uniqueInstanceId, - command: message.command, - payload: { - workerId: uniqueInstanceId, - runningJobs: getRunningJobIds(), - freeMem: os.freemem(), - totalMem: os.totalmem(), - uptime: process.uptime(), - loadAvg: os.loadavg(), - cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), - arch: os.arch(), - platform: os.platform(), - hostname: os.hostname(), - net: Object.values(os.networkInterfaces()).flatMap( - (interfaces) => - interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', - ), - }, - }); - break; - case 'getId': - await redisPublisher.publishToWorkerChannel({ - workerId: uniqueInstanceId, - command: message.command, - }); - break; - case 'restartEventBus': - await eventBus.restart(); - await redisPublisher.publishToWorkerChannel({ - workerId: uniqueInstanceId, - command: message.command, - payload: { - result: 'success', - }, - }); - break; - case 'stopWorker': - // TODO: implement proper shutdown - // await this.stopProcess(); - break; - default: - LoggerProxy.debug( - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, - ); - break; - } - } - } - }; - } } diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts index 41cb442e395bd..146e0ffcd3229 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts @@ -9,7 +9,7 @@ import type { EventNamesWorkflowType } from '.'; // -------------------------------------- // EventMessage class for Workflow events // -------------------------------------- -interface EventPayloadWorkflow extends AbstractEventPayload { +export interface EventPayloadWorkflow extends AbstractEventPayload { msg?: string; workflowData?: IWorkflowBase; diff --git a/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts new file mode 100644 index 0000000000000..cf68a1930bed2 --- /dev/null +++ b/packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts @@ -0,0 +1,99 @@ +import type { ExecutionStatus, IRun, IWorkflowBase } from 'n8n-workflow'; +import type { IExecutionDb } from '@/Interfaces'; +import pick from 'lodash/pick'; +import { isWorkflowIdValid } from '@/utils'; +import { LoggerProxy } from 'n8n-workflow'; +import Container from 'typedi'; +import { ExecutionRepository } from '../../databases/repositories'; +import { ExecutionMetadataService } from '../../services/executionMetadata.service'; + +export function determineFinalExecutionStatus(runData: IRun): ExecutionStatus { + const workflowHasCrashed = runData.status === 'crashed'; + const workflowWasCanceled = runData.status === 'canceled'; + const workflowDidSucceed = + !runData.data.resultData.error && !workflowHasCrashed && !workflowWasCanceled; + let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed'; + if (workflowHasCrashed) workflowStatusFinal = 'crashed'; + if (workflowWasCanceled) workflowStatusFinal = 'canceled'; + if (runData.waitTill) workflowStatusFinal = 'waiting'; + return workflowStatusFinal; +} + +export function prepareExecutionDataForDbUpdate(parameters: { + runData: IRun; + workflowData: IWorkflowBase; + workflowStatusFinal: ExecutionStatus; + retryOf?: string; +}): IExecutionDb { + const { runData, workflowData, workflowStatusFinal, retryOf } = parameters; + // Although it is treated as IWorkflowBase here, it's being instantiated elsewhere with properties that may be sensitive + // As a result, we should create an IWorkflowBase object with only the data we want to save in it. + const pristineWorkflowData: IWorkflowBase = pick(workflowData, [ + 'id', + 'name', + 'active', + 'createdAt', + 'updatedAt', + 'nodes', + 'connections', + 'settings', + 'staticData', + 'pinData', + ]); + + const fullExecutionData: IExecutionDb = { + data: runData.data, + mode: runData.mode, + finished: runData.finished ? runData.finished : false, + startedAt: runData.startedAt, + stoppedAt: runData.stoppedAt, + workflowData: pristineWorkflowData, + waitTill: runData.waitTill, + status: workflowStatusFinal, + }; + + if (retryOf !== undefined) { + fullExecutionData.retryOf = retryOf.toString(); + } + + const workflowId = workflowData.id; + if (isWorkflowIdValid(workflowId)) { + fullExecutionData.workflowId = workflowId; + } + + return fullExecutionData; +} + +export async function updateExistingExecution(parameters: { + executionId: string; + workflowId: string; + executionData: Partial; +}) { + const { executionId, workflowId, executionData } = parameters; + // Leave log message before flatten as that operation increased memory usage a lot and the chance of a crash is highest here + LoggerProxy.debug(`Save execution data to database for execution ID ${executionId}`, { + executionId, + workflowId, + finished: executionData.finished, + stoppedAt: executionData.stoppedAt, + }); + + await Container.get(ExecutionRepository).updateExistingExecution(executionId, executionData); + + try { + if (executionData.data?.resultData.metadata) { + await Container.get(ExecutionMetadataService).save( + executionId, + executionData.data.resultData.metadata, + ); + } + } catch (e) { + LoggerProxy.error(`Failed to save metadata for execution ID ${executionId}`, e as Error); + } + + if (executionData.finished === true && executionData.retryOf !== undefined) { + await Container.get(ExecutionRepository).updateExistingExecution(executionData.retryOf, { + retrySuccessId: executionId, + }); + } +} diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts new file mode 100644 index 0000000000000..874ead410c542 --- /dev/null +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -0,0 +1,82 @@ +import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; +import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; +import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import * as os from 'os'; + +export function getWorkerCommandReceivedHandler(options: { + uniqueInstanceId: string; + redisPublisher: RedisServicePubSubPublisher; + getRunningJobIds: () => string[]; +}) { + return async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if (message.targets && !message.targets.includes(options.uniqueInstanceId)) { + return; // early return if the message is not for this worker + } + switch (message.command) { + case 'getStatus': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + workerId: options.uniqueInstanceId, + runningJobs: options.getRunningJobIds(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + net: Object.values(os.networkInterfaces()).flatMap( + (interfaces) => + interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', + ), + }, + }); + break; + case 'getId': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + }); + break; + case 'restartEventBus': + await eventBus.restart(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + result: 'success', + }, + }); + break; + case 'stopWorker': + // TODO: implement proper shutdown + // await this.stopProcess(); + break; + default: + LoggerProxy.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, + ); + break; + } + } + } + }; +}