Skip to content

Commit

Permalink
fix: properly tear down partially initialized executors (#2348)
Browse files Browse the repository at this point in the history
Refactor execution timeout logic to fix a problem where the execution
worker would not be torn down correctly when failing to initialize. This
PR also moves the entire initialization timeout logic to the
`AbstractExecutionService`, simplifying the `SnapController`. Moving the
timeout logic fixes problems where job initialization was not cancelled
properly resulting in "Snap is already being executed", since execution
of the async functions inside `AbstractExecutionService` would continue
even after the timeout.

The ExecutionService gets an optional `maxInitTime` constructor argument
that can be set to define init time for the specific service. This
timeout value is spread between initiating the execution job and running
the first execution of the Snap. If the execution environment fails to
initiate we attempt to tear it down in case it has partially started
(this is at least the case for iframes). We make sure that the timeout
is applied to `initEnvStream` which has the smallest possible footprint
and worry for side-effects. Additionally in case an error happens when
terminating a Snap we catch that error an move on.
  • Loading branch information
FrederikBolding authored Apr 23, 2024
1 parent b3f38a7 commit ed4771f
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 114 deletions.
8 changes: 4 additions & 4 deletions packages/snaps-controllers/coverage.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"branches": 91.49,
"functions": 96.61,
"lines": 97.87,
"statements": 97.53
"branches": 91.55,
"functions": 96.62,
"lines": 97.88,
"statements": 97.55
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BasePostMessageStream } from '@metamask/post-message-stream';
import { HandlerType } from '@metamask/snaps-utils';
import { MOCK_SNAP_ID } from '@metamask/snaps-utils/test-utils';
import { Duration, inMilliseconds } from '@metamask/utils';

import { createService } from '../test-utils';
import type { ExecutionServiceArgs } from './AbstractExecutionService';
Expand All @@ -11,6 +12,7 @@ class MockExecutionService extends NodeThreadExecutionService {
super({
messenger,
setupSnapProvider,
initTimeout: inMilliseconds(5, Duration.Second),
});
}

Expand Down Expand Up @@ -154,4 +156,38 @@ describe('AbstractExecutionService', () => {
}),
).rejects.toThrow('The Snaps execution environment failed to start.');
});

it('throws an error if execution environment fails to init', async () => {
const { service } = createService(MockExecutionService);

// @ts-expect-error Accessing private property and returning unusable worker.
service.initEnvStream = async () =>
new Promise((_resolve) => {
// no-op
});

await expect(
service.executeSnap({
snapId: MOCK_SNAP_ID,
sourceCode: `
console.log('foo');
`,
endowments: ['console'],
}),
).rejects.toThrow('The Snaps execution environment failed to start.');
});

it('throws an error if Snap fails to init', async () => {
const { service } = createService(MockExecutionService);

await expect(
service.executeSnap({
snapId: MOCK_SNAP_ID,
sourceCode: `
while(true) {}
`,
endowments: ['console'],
}),
).rejects.toThrow(`${MOCK_SNAP_ID} failed to start.`);
});
});
132 changes: 88 additions & 44 deletions packages/snaps-controllers/src/services/AbstractExecutionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { pipeline } from 'readable-stream';
import type { Duplex } from 'readable-stream';

import { log } from '../logging';
import { Timer } from '../snaps/Timer';
import { hasTimedOut, withTimeout } from '../utils';
import type {
ExecutionService,
Expand All @@ -38,6 +39,7 @@ export type SetupSnapProvider = (snapId: string, stream: Duplex) => void;
export type ExecutionServiceArgs = {
setupSnapProvider: SetupSnapProvider;
messenger: ExecutionServiceMessenger;
initTimeout?: number;
pingTimeout?: number;
terminationTimeout?: number;
};
Expand All @@ -55,6 +57,9 @@ export type Job<WorkerType> = {
worker: WorkerType;
};

export type TerminateJobArgs<WorkerType> = Partial<Job<WorkerType>> &
Pick<Job<WorkerType>, 'id'>;

export abstract class AbstractExecutionService<WorkerType>
implements ExecutionService
{
Expand All @@ -72,13 +77,16 @@ export abstract class AbstractExecutionService<WorkerType>

#messenger: ExecutionServiceMessenger;

#initTimeout: number;

#pingTimeout: number;

#terminationTimeout: number;

constructor({
setupSnapProvider,
messenger,
initTimeout = inMilliseconds(60, Duration.Second),
pingTimeout = inMilliseconds(2, Duration.Second),
terminationTimeout = inMilliseconds(1, Duration.Second),
}: ExecutionServiceArgs) {
Expand All @@ -88,6 +96,7 @@ export abstract class AbstractExecutionService<WorkerType>
this.#snapToJobMap = new Map();
this.#jobToSnapMap = new Map();
this.#messenger = messenger;
this.#initTimeout = initTimeout;
this.#pingTimeout = pingTimeout;
this.#terminationTimeout = terminationTimeout;

Expand All @@ -107,7 +116,7 @@ export abstract class AbstractExecutionService<WorkerType>

this.#messenger.registerActionHandler(
`${controllerName}:executeSnap`,
async (snapData: SnapExecutionData) => this.executeSnap(snapData),
async (data: SnapExecutionData) => this.executeSnap(data),
);

this.#messenger.registerActionHandler(
Expand All @@ -128,7 +137,7 @@ export abstract class AbstractExecutionService<WorkerType>
*
* @param job - The object corresponding to the job to be terminated.
*/
protected abstract terminateJob(job: Job<WorkerType>): void;
protected abstract terminateJob(job: TerminateJobArgs<WorkerType>): void;

/**
* Terminates the job with the specified ID and deletes all its associated
Expand All @@ -144,24 +153,28 @@ export abstract class AbstractExecutionService<WorkerType>
throw new Error(`Job with id "${jobId}" not found.`);
}

// Ping worker and tell it to run teardown, continue with termination if it takes too long
const result = await withTimeout(
this.command(jobId, {
jsonrpc: '2.0',
method: 'terminate',
params: [],
id: nanoid(),
}),
this.#terminationTimeout,
);
try {
// Ping worker and tell it to run teardown, continue with termination if it takes too long
const result = await withTimeout(
this.command(jobId, {
jsonrpc: '2.0',
method: 'terminate',
params: [],
id: nanoid(),
}),
this.#terminationTimeout,
);

if (result === hasTimedOut || result !== 'OK') {
// We tried to shutdown gracefully but failed. This probably means the Snap is in infinite loop and
// hogging down the whole JS process.
// TODO(ritave): It might be doing weird things such as posting a lot of setTimeouts. Add a test to ensure that this behaviour
// doesn't leak into other workers. Especially important in IframeExecutionEnvironment since they all share the same
// JS process.
logError(`Job "${jobId}" failed to terminate gracefully.`, result);
if (result === hasTimedOut || result !== 'OK') {
// We tried to shutdown gracefully but failed. This probably means the Snap is in infinite loop and
// hogging down the whole JS process.
// TODO(ritave): It might be doing weird things such as posting a lot of setTimeouts. Add a test to ensure that this behaviour
// doesn't leak into other workers. Especially important in IframeExecutionEnvironment since they all share the same
// JS process.
logError(`Job "${jobId}" failed to terminate gracefully.`, result);
}
} catch {
// Ignore
}

Object.values(jobWrapper.streams).forEach((stream) => {
Expand All @@ -175,21 +188,24 @@ export abstract class AbstractExecutionService<WorkerType>

this.terminateJob(jobWrapper);

this.#removeSnapAndJobMapping(jobId);
this.jobs.delete(jobId);
this.#removeSnapAndJobMapping(jobId);
log(`Job "${jobId}" terminated.`);
}

/**
* Initiates a job for a snap.
*
* Depending on the execution environment, this may run forever if the Snap fails to start up properly, therefore any call to this function should be wrapped in a timeout.
*
* @param jobId - The ID of the job to initiate.
* @param timer - The timer to use for timeouts.
* @returns Information regarding the created job.
* @throws If the execution service returns an error or execution times out.
*/
protected async initJob(): Promise<Job<WorkerType>> {
const jobId = nanoid();
const { streams, worker } = await this.initStreams(jobId);
protected async initJob(
jobId: string,
timer: Timer,
): Promise<Job<WorkerType>> {
const { streams, worker } = await this.initStreams(jobId, timer);
const rpcEngine = new JsonRpcEngine();

const jsonRpcConnection = createStreamMiddleware();
Expand Down Expand Up @@ -221,15 +237,24 @@ export abstract class AbstractExecutionService<WorkerType>
/**
* Sets up the streams for an initiated job.
*
* Depending on the execution environment, this may run forever if the Snap fails to start up properly, therefore any call to this function should be wrapped in a timeout.
*
* @param jobId - The id of the job.
* @param timer - The timer to use for timeouts.
* @returns The streams to communicate with the worker and the worker itself.
* @throws If the execution service returns an error or execution times out.
*/
protected async initStreams(
jobId: string,
timer: Timer,
): Promise<{ streams: JobStreams; worker: WorkerType }> {
const { worker, stream: envStream } = await this.initEnvStream(jobId);
const result = await withTimeout(this.initEnvStream(jobId), timer);

if (result === hasTimedOut) {
// For certain environments, such as the iframe we may have already created the worker and wish to terminate it.
this.terminateJob({ id: jobId });
throw new Error('The Snaps execution environment failed to start.');
}

const { worker, stream: envStream } = result;
const mux = setupMultiplex(envStream, `Job: "${jobId}"`);
const commandStream = mux.createStream(SNAP_STREAM_NAMES.COMMAND);

Expand Down Expand Up @@ -333,20 +358,29 @@ export abstract class AbstractExecutionService<WorkerType>
/**
* Initializes and executes a snap, setting up the communication channels to the snap etc.
*
* Depending on the execution environment, this may run forever if the Snap fails to start up properly, therefore any call to this function should be wrapped in a timeout.
*
* @param snapData - Data needed for Snap execution.
* @param snapData.snapId - The ID of the Snap to execute.
* @param snapData.sourceCode - The source code of the Snap to execute.
* @param snapData.endowments - The endowments available to the executing Snap.
* @returns A string `OK` if execution succeeded.
* @throws If the execution service returns an error.
* @throws If the execution service returns an error or execution times out.
*/
async executeSnap(snapData: SnapExecutionData): Promise<string> {
if (this.#snapToJobMap.has(snapData.snapId)) {
throw new Error(`Snap "${snapData.snapId}" is already being executed.`);
async executeSnap({
snapId,
sourceCode,
endowments,
}: SnapExecutionData): Promise<string> {
if (this.#snapToJobMap.has(snapId)) {
throw new Error(`Snap "${snapId}" is already being executed.`);
}

const jobId = nanoid();
const timer = new Timer(this.#initTimeout);

// This may resolve even if the environment has failed to start up fully
const job = await this.initJob();
this.#mapSnapAndJob(snapData.snapId, job.id);
const job = await this.initJob(jobId, timer);

this.#mapSnapAndJob(snapId, job.id);

// Ping the worker to ensure that it started up
const pingResult = await withTimeout(
Expand All @@ -364,15 +398,25 @@ export abstract class AbstractExecutionService<WorkerType>

const rpcStream = job.streams.rpc;

this.setupSnapProvider(snapData.snapId, rpcStream);
this.setupSnapProvider(snapId, rpcStream);

const result = await this.command(job.id, {
jsonrpc: '2.0',
method: 'executeSnap',
params: snapData,
id: nanoid(),
});
this.#createSnapHooks(snapData.snapId, job.id);
const remainingTime = timer.remaining;

const result = await withTimeout(
this.command(job.id, {
jsonrpc: '2.0',
method: 'executeSnap',
params: { snapId, sourceCode, endowments },
id: nanoid(),
}),
remainingTime,
);

if (result === hasTimedOut) {
throw new Error(`${snapId} failed to start.`);
}

this.#createSnapHooks(snapId, job.id);
return result as string;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface ExecutionService {
export type SnapExecutionData = {
snapId: string;
sourceCode: string;
endowments?: Json;
endowments: Json;
};

export type SnapErrorJson = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import type { BasePostMessageStream } from '@metamask/post-message-stream';
import { WindowPostMessageStream } from '@metamask/post-message-stream';
import { createWindow } from '@metamask/snaps-utils';

import type { Job, ExecutionServiceArgs } from '../AbstractExecutionService';
import type {
ExecutionServiceArgs,
TerminateJobArgs,
} from '../AbstractExecutionService';
import { AbstractExecutionService } from '../AbstractExecutionService';

type IframeExecutionEnvironmentServiceArgs = {
Expand All @@ -24,7 +27,7 @@ export class IframeExecutionService extends AbstractExecutionService<Window> {
this.iframeUrl = iframeUrl;
}

protected terminateJob(jobWrapper: Job<Window>): void {
protected terminateJob(jobWrapper: TerminateJobArgs<Window>): void {
document.getElementById(jobWrapper.id)?.remove();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ProcessParentMessageStream } from '@metamask/post-message-stream';
import type { ChildProcess } from 'child_process';
import { fork } from 'child_process';

import type { Job } from '..';
import type { TerminateJobArgs } from '..';
import { AbstractExecutionService } from '..';

export class NodeProcessExecutionService extends AbstractExecutionService<ChildProcess> {
Expand Down Expand Up @@ -36,7 +36,7 @@ export class NodeProcessExecutionService extends AbstractExecutionService<ChildP
return Promise.resolve({ worker, stream });
}

protected terminateJob(jobWrapper: Job<ChildProcess>): void {
jobWrapper.worker.kill();
protected terminateJob(jobWrapper: TerminateJobArgs<ChildProcess>): void {
jobWrapper.worker?.kill();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ThreadParentMessageStream } from '@metamask/post-message-stream';
// eslint-disable-next-line @typescript-eslint/no-shadow
import { Worker } from 'worker_threads';

import type { Job } from '..';
import type { TerminateJobArgs } from '..';
import { AbstractExecutionService } from '..';

export class NodeThreadExecutionService extends AbstractExecutionService<Worker> {
Expand Down Expand Up @@ -37,7 +37,9 @@ export class NodeThreadExecutionService extends AbstractExecutionService<Worker>
return Promise.resolve({ worker, stream });
}

protected async terminateJob(jobWrapper: Job<Worker>): Promise<void> {
await jobWrapper.worker.terminate();
protected async terminateJob(
jobWrapper: TerminateJobArgs<Worker>,
): Promise<void> {
await jobWrapper.worker?.terminate();
}
}
Loading

0 comments on commit ed4771f

Please sign in to comment.