Skip to content

Commit

Permalink
refactor(core): Move event and telemetry handling into workers in que…
Browse files Browse the repository at this point in the history
…ue mode (#7138)

# Motivation

In Queue mode, finished executions would cause the main instance to
always pull all execution data from the database, unflatten it and then
use it to send out event log events and telemetry events, as well as
required returns to Respond to Webhook nodes etc.

This could cause OOM errors when the data was large, since it had to be
fully unpacked and transformed on the main instance’s side, using up a
lot of memory (and time).

This PR attempts to limit this behaviour to only happen in those
required cases where the data has to be forwarded to some waiting
webhook, for example.

# Changes

Execution data is only required in cases, where the active execution has
a `postExecutePromise` attached to it. These usually forward the data to
some other endpoint (e.g. a listening webhook connection).

By adding a helper `getPostExecutePromiseCount()`, we can decide that in
cases where there is nothing listening at all, there is no reason to
pull the data on the main instance.

Previously, there would always be postExecutePromises because the
telemetry events were called. Now, these have been moved into the
workers, which have been given the various InternalHooks calls to their
hook function arrays, so they themselves issue these telemetry and event
calls.

This results in all event log messages to now be logged on the worker’s
event log, as well as the worker’s eventbus being the one to send out
the events to destinations. The main event log does…pretty much nothing.

We are not logging executions on the main event log any more, because
this would require all events to be replicated 1:1 from the workers to
the main instance(s) (this IS possible and implemented, see the worker’s
`replicateToRedisEventLogFunction` - but it is not enabled to reduce the
amount of traffic over redis).

Partial events in the main log could confuse the recovery process and
would result in, ironically, the recovery corrupting the execution data
by considering them crashed.

# Refactor

I have also used the opportunity to reduce duplicate code and move some
of the hook functionality into
`packages/cli/src/executionLifecycleHooks/shared/sharedHookFunctions.ts`
in preparation for a future full refactor of the hooks
  • Loading branch information
flipswitchingmonkey authored Sep 14, 2023
1 parent 07a6417 commit 0c6169e
Show file tree
Hide file tree
Showing 10 changed files with 594 additions and 493 deletions.
4 changes: 4 additions & 0 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ export class ActiveExecutions {
this.activeExecutions[executionId].responsePromise?.resolve(response);
}

getPostExecutePromiseCount(executionId: string): number {
return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0;
}

/**
* Remove an active execution
*
Expand Down
119 changes: 62 additions & 57 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import { NodeTypes } from './NodeTypes';
import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata';
import { ExecutionRepository } from '@db/repositories';
import { RoleService } from './services/role.service';
import type { EventPayloadWorkflow } from './eventbus/EventMessageClasses/EventMessageWorkflow';
import { determineFinalExecutionStatus } from './executionLifecycleHooks/shared/sharedHookFunctions';

function userToPayload(user: User): {
userId: string;
Expand Down Expand Up @@ -240,21 +242,35 @@ export class InternalHooks implements IInternalHooksClass {

async onWorkflowBeforeExecute(
executionId: string,
data: IWorkflowExecutionDataProcess,
data: IWorkflowExecutionDataProcess | IWorkflowBase,
): Promise<void> {
let payload: EventPayloadWorkflow;
// this hook is called slightly differently depending on whether it's from a worker or the main instance
// in the worker context, meaning in queue mode, only IWorkflowBase is available
if ('executionData' in data) {
payload = {
executionId,
userId: data.userId ?? undefined,
workflowId: data.workflowData.id?.toString(),
isManual: data.executionMode === 'manual',
workflowName: data.workflowData.name,
};
} else {
payload = {
executionId,
userId: undefined,
workflowId: (data as IWorkflowBase).id?.toString(),
isManual: false,
workflowName: (data as IWorkflowBase).name,
};
}
void Promise.all([
this.executionRepository.updateExistingExecution(executionId, {
status: 'running',
}),
eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.started',
payload: {
executionId,
userId: data.userId,
workflowId: data.workflowData.id?.toString(),
isManual: data.executionMode === 'manual',
workflowName: data.workflowData.name,
},
payload,
}),
]);
}
Expand Down Expand Up @@ -300,47 +316,41 @@ export class InternalHooks implements IInternalHooksClass {

const promises = [];

const properties: IExecutionTrackProperties = {
const telemetryProperties: IExecutionTrackProperties = {
workflow_id: workflow.id,
is_manual: false,
version_cli: N8N_VERSION,
success: false,
};

if (userId) {
properties.user_id = userId;
telemetryProperties.user_id = userId;
}

if (runData?.data.resultData.error?.message?.includes('canceled')) {
runData.status = 'canceled';
}

properties.success = !!runData?.finished;
telemetryProperties.success = !!runData?.finished;

let executionStatus: ExecutionStatus;
if (runData?.status === 'crashed') {
executionStatus = 'crashed';
} else if (runData?.status === 'waiting' || runData?.data?.waitTill) {
executionStatus = 'waiting';
} else if (runData?.status === 'canceled') {
executionStatus = 'canceled';
} else {
executionStatus = properties.success ? 'success' : 'failed';
}
// const executionStatus: ExecutionStatus = runData?.status ?? 'unknown';
const executionStatus: ExecutionStatus = runData
? determineFinalExecutionStatus(runData)
: 'unknown';

if (runData !== undefined) {
properties.execution_mode = runData.mode;
properties.is_manual = runData.mode === 'manual';
telemetryProperties.execution_mode = runData.mode;
telemetryProperties.is_manual = runData.mode === 'manual';

let nodeGraphResult: INodesGraphResult | null = null;

if (!properties.success && runData?.data.resultData.error) {
properties.error_message = runData?.data.resultData.error.message;
if (!telemetryProperties.success && runData?.data.resultData.error) {
telemetryProperties.error_message = runData?.data.resultData.error.message;
let errorNodeName =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.name
: undefined;
properties.error_node_type =
telemetryProperties.error_node_type =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.type
: undefined;
Expand All @@ -352,23 +362,23 @@ export class InternalHooks implements IInternalHooksClass {
);

if (lastNode !== undefined) {
properties.error_node_type = lastNode.type;
telemetryProperties.error_node_type = lastNode.type;
errorNodeName = lastNode.name;
}
}

if (properties.is_manual) {
if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
properties.node_graph = nodeGraphResult.nodeGraph;
properties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);

if (errorNodeName) {
properties.error_node_id = nodeGraphResult.nameIndices[errorNodeName];
telemetryProperties.error_node_id = nodeGraphResult.nameIndices[errorNodeName];
}
}
}

if (properties.is_manual) {
if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
}
Expand All @@ -386,10 +396,10 @@ export class InternalHooks implements IInternalHooksClass {
workflow_id: workflow.id,
status: executionStatus,
executionStatus: runData?.status ?? 'unknown',
error_message: properties.error_message as string,
error_node_type: properties.error_node_type,
node_graph_string: properties.node_graph_string as string,
error_node_id: properties.error_node_id as string,
error_message: telemetryProperties.error_message as string,
error_node_type: telemetryProperties.error_node_type,
node_graph_string: telemetryProperties.node_graph_string as string,
error_node_id: telemetryProperties.error_node_id as string,
webhook_domain: null,
sharing_role: userRole,
};
Expand Down Expand Up @@ -428,39 +438,34 @@ export class InternalHooks implements IInternalHooksClass {
}
}

const sharedEventPayload: EventPayloadWorkflow = {
executionId,
success: telemetryProperties.success,
userId: telemetryProperties.user_id,
workflowId: workflow.id,
isManual: telemetryProperties.is_manual,
workflowName: workflow.name,
metaData: runData?.data?.resultData?.metadata,
};
promises.push(
properties.success
telemetryProperties.success
? eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.success',
payload: {
executionId,
success: properties.success,
userId: properties.user_id,
workflowId: properties.workflow_id,
isManual: properties.is_manual,
workflowName: workflow.name,
metaData: runData?.data?.resultData?.metadata,
},
payload: sharedEventPayload,
})
: eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.failed',
payload: {
executionId,
success: properties.success,
userId: properties.user_id,
workflowId: properties.workflow_id,
...sharedEventPayload,
lastNodeExecuted: runData?.data.resultData.lastNodeExecuted,
errorNodeType: properties.error_node_type,
errorNodeId: properties.error_node_id?.toString(),
errorMessage: properties.error_message?.toString(),
isManual: properties.is_manual,
workflowName: workflow.name,
metaData: runData?.data?.resultData?.metadata,
errorNodeType: telemetryProperties.error_node_type,
errorNodeId: telemetryProperties.error_node_id?.toString(),
errorMessage: telemetryProperties.error_message?.toString(),
},
}),
);

void Promise.all([...promises, this.telemetry.trackWorkflowExecution(properties)]);
void Promise.all([...promises, this.telemetry.trackWorkflowExecution(telemetryProperties)]);
}

async onWorkflowSharingUpdate(workflowId: string, userId: string, userList: string[]) {
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type Bull from 'bull';
import { Service } from 'typedi';
import { type IExecuteResponsePromiseData } from 'n8n-workflow';
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import {
Expand All @@ -23,6 +23,7 @@ export interface JobData {

export interface JobResponse {
success: boolean;
error?: ExecutionError;
}

export interface WebhookResponse {
Expand Down
Loading

0 comments on commit 0c6169e

Please sign in to comment.