Skip to content

Commit

Permalink
add retry and catch
Browse files Browse the repository at this point in the history
  • Loading branch information
jamoy committed Aug 3, 2020
1 parent 11bf252 commit 46cf356
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 57 deletions.
124 changes: 70 additions & 54 deletions lib/stepfunctions.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,6 @@ const { performance } = require('perf_hooks');
const jsonpath = require('jsonpath');
const aslValidator = require('asl-validator');

const comparators = [
'BooleanEquals',
'StringEquals',
'StringGreaterThan',
'StringGreaterThanEquals',
'StringLessThan',
'StringLessThanEquals',
'NumericEquals',
'NumericGreaterThan',
'NumericGreaterThanEquals',
'NumericLessThan',
'NumericLessThanEquals',
'TimestampEquals',
'TimestampGreaterThan',
'TimestampGreaterThanEquals',
'TimestampLessThan',
'TimestampLessThanEquals',
];

class ErrorState extends Error {
constructor(state, message, original) {
super(
Expand Down Expand Up @@ -313,27 +294,27 @@ class StepFunction extends EventEmitter {
* @param {Object} states
*/
async task(state, task, input, states) {
try {
const output = await this.retry(state, input, states, async (retries) => {
const output = await this.retry(state, input, states, async (retries) => {
try {
this.transition('TaskStateEntered', { task, input });
this.createContext(state, task, input, null, retries);
input = this.inputPath(state, input);
const output = await this.resolveResource(task, input, states);
this.transition('TaskStateExited', { task, output });
return output;
});
if (state.Next) {
return this.step(state.Next, output, states);
}
return output; // End: true
} catch (err) {
if (err.state === 'Internal.Aborted') {
this.transition('TaskStateAborted', { task });
} catch (err) {
if (err.state === 'Internal.Aborted') {
this.transition('TaskStateAborted', { task });
}
this.transition('TaskStateFailed', { task, error: err });
this.transition('TaskStateExited', { task });
throw new ErrorState(ErrorState.TaskFailed, 'TaskFailed', err);
}
this.transition('TaskStateFailed', { task, error: err });
this.transition('TaskStateExited', { task });
throw new ErrorState(ErrorState.TaskFailed, 'TaskFailed', err);
});
if (state.Next) {
return this.step(state.Next, output, states);
}
return output; // End: true
}

/**
Expand Down Expand Up @@ -498,7 +479,24 @@ class StepFunction extends EventEmitter {
if (variable === undefined) {
return undefined;
}
const operator = comparators
const operator = [
'BooleanEquals',
'StringEquals',
'StringGreaterThan',
'StringGreaterThanEquals',
'StringLessThan',
'StringLessThanEquals',
'NumericEquals',
'NumericGreaterThan',
'NumericGreaterThanEquals',
'NumericLessThan',
'NumericLessThanEquals',
'TimestampEquals',
'TimestampGreaterThan',
'TimestampGreaterThanEquals',
'TimestampLessThan',
'TimestampLessThanEquals',
]
.filter((operator) => Object.keys(choice).includes(operator))
.shift();

Expand Down Expand Up @@ -634,27 +632,27 @@ class StepFunction extends EventEmitter {
}
if (
error.toString().includes('TaskFailed') &&
err.state === ErrorState.TaskFailed
error.state === ErrorState.TaskFailed
) {
return true;
}
if (
error.toString().includes('Timeout') &&
err.state === ErrorState.Timeout
error.state === ErrorState.Timeout
) {
return true;
}
if (
err === ErrorState.ALL ||
error.state === ErrorState.ALL ||
error.toString().includes('Lambda.') ||
error.toString().includes(err) ||
(error.original && error.original.toString().includes(err)) ||
(error.original && error.original.constructor.name.includes(err)) ||
(error && error.constructor.name.includes(err)) ||
(error.toString().includes('TaskFailed') &&
err.state === ErrorState.TaskFailed) ||
error.state === ErrorState.TaskFailed) ||
(error.toString().includes('Timeout') &&
err.state === ErrorState.Timeout)
error.state === ErrorState.Timeout)
) {
return true;
}
Expand All @@ -673,7 +671,23 @@ class StepFunction extends EventEmitter {
return false;
});
if (errors.length > 0) {
const errorType = errors.shift();
let errorType;
if (error.state) {
errorType = error.state;
}
errors.find((err) => {
if (
error.toString().includes(err) ||
(error.original && error.original.toString().includes(err)) ||
(error.original && error.original.constructor.name.includes(err)) ||
(error && error.constructor.name.includes(err))
) {
errorType = err;
}
});
if (errors.find((e) => e === ErrorState.ALL)) {
errorType = ErrorState.ALL;
}
const output = this.outputPath(state, input, {
Error: errorType,
Cause: {
Expand Down Expand Up @@ -701,21 +715,23 @@ class StepFunction extends EventEmitter {
const output = await fn(increment);
return output;
} catch (err) {
const output = this.canCatch(state.Retry, input, originalState, err);
if (increment < output.state.MaxAttempts) {
increment += 1;
let timeoutRef;
await new Promise((resolve) => {
const waitFor =
output.state.IntervalSeconds *
(increment * output.state.BackoffRate) *
1000;
timeoutRef = setTimeout(() => {
resolve();
clearTimeout(timeoutRef);
}, waitFor);
});
return wrapperFn(cb, increment);
if (state.Retry) {
const output = this.canCatch(state.Retry, input, originalState, err);
if (increment < output.state.MaxAttempts) {
increment += 1;
let timeoutRef;
await new Promise((resolve) => {
const waitFor =
output.state.IntervalSeconds *
(increment * output.state.BackoffRate) *
1000;
timeoutRef = setTimeout(() => {
resolve();
clearTimeout(timeoutRef);
}, waitFor);
});
return wrapperFn(cb, increment);
}
}
throw err; // pass it to a catch if available or just fail
}
Expand Down
84 changes: 82 additions & 2 deletions test/stepfunctions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ describe('Stepfunctions', () => {
});
});

it('can catch multiple states', async () => {
it('can catch custom errors', async () => {
const sm = new Sfn({ StateMachine: require('./steps/catch.json') });
const firstFn = jest.fn(() => {
class CustomError extends Error {
Expand All @@ -647,6 +647,57 @@ describe('Stepfunctions', () => {
);
});

it('can catch custom States.ALL', async () => {
const sm = new Sfn({ StateMachine: require('./steps/catch-all.json') });
const firstFn = jest.fn(() => {
class Custom2Error extends Error {
// empty
}
throw new Custom2Error('something happened');
});
const errorFn = jest.fn((input) => input);
const lastFn = jest.fn((input) => {
return input;
});
sm.bindTaskResource('First', firstFn);
sm.bindTaskResource('All', errorFn);
sm.bindTaskResource('Last', lastFn);
await sm.startExecution({});
expect(errorFn).toHaveBeenCalled();
expect(lastFn).toHaveBeenCalled();
expect(sm.getExecutionResult()).toEqual(
expect.objectContaining({
error: expect.objectContaining({
Cause: expect.objectContaining({ errorType: 'States.ALL' }),
}),
}),
);
});

it('can catch custom States.TaskFailed', async () => {
const sm = new Sfn({ StateMachine: require('./steps/catch.json') });
const firstFn = jest.fn(() => {
throw new Error('fail the task');
});
const errorFn = jest.fn((input) => input);
const lastFn = jest.fn((input) => {
return input;
});
sm.bindTaskResource('First', firstFn);
sm.bindTaskResource('All', errorFn);
sm.bindTaskResource('Last', lastFn);
await sm.startExecution({});
expect(errorFn).toHaveBeenCalled();
expect(lastFn).toHaveBeenCalled();
expect(sm.getExecutionResult()).toEqual(
expect.objectContaining({
error: expect.objectContaining({
Cause: expect.objectContaining({ errorType: 'States.TaskFailed' }),
}),
}),
);
});

it('can retry failing tasks', async () => {
const sm = new Sfn({ StateMachine: require('./steps/retry.json') });
const firstFn = jest.fn((input) => {
Expand All @@ -666,10 +717,39 @@ describe('Stepfunctions', () => {
sm.bindTaskResource('All', errorFn);
sm.bindTaskResource('Last', lastFn);
await sm.startExecution({});
expect(firstFn).toHaveBeenCalled();
expect(errorFn).not.toHaveBeenCalled();
expect(lastFn).toHaveBeenCalled();
expect(sm.getExecutionResult()).toEqual(
expect.objectContaining({ retries: 2 }),
);
}, 8000);

it('can retry failing tasks and finally catch', async () => {});
it('can retry failing tasks and finally catch', async () => {
const sm = new Sfn({ StateMachine: require('./steps/retry.json') });
const firstFn = jest.fn(() => {
class CustomError extends Error {
// empty
}
throw new CustomError('something happened');
});
const errorFn = jest.fn((input) => input);
const lastFn = jest.fn((input) => {
return input;
});
sm.bindTaskResource('First', firstFn);
sm.bindTaskResource('All', errorFn);
sm.bindTaskResource('Last', lastFn);
await sm.startExecution({});
expect(firstFn).toHaveBeenCalled();
expect(errorFn).toHaveBeenCalled();
expect(lastFn).toHaveBeenCalled();
expect(sm.getExecutionResult()).toEqual(
expect.objectContaining({
error: expect.objectContaining({
Cause: expect.objectContaining({ errorType: 'CustomError' }),
}),
}),
);
});
});
27 changes: 27 additions & 0 deletions test/steps/catch-all.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"StartAt": "First",
"States": {
"First": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-1:123456789012:function:test",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "All"
}
],
"Next": "Last"
},
"All": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-1:123456789012:function:test",
"Next": "Last"
},
"Last": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-1:123456789012:function:test",
"End": true
}
}
}
2 changes: 1 addition & 1 deletion test/steps/catch.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"Resource": "arn:aws:lambda:ap-southeast-1:123456789012:function:test",
"Catch": [
{
"ErrorEquals": ["CustomError", "States.ALL", "States.TaskFailed"],
"ErrorEquals": ["CustomError", "States.TaskFailed"],
"ResultPath": "$.error",
"Next": "All"
},
Expand Down

0 comments on commit 46cf356

Please sign in to comment.