Skip to content

Commit

Permalink
Merge pull request #45 from devit-tel/feature/workflow-immediate-comp…
Browse files Browse the repository at this point in the history
…ensate-on-sync-worker

Feature/workflow immediate compensate on sync worker
  • Loading branch information
NV4RE authored Sep 3, 2020
2 parents 6e9cea1 + 55d99e0 commit 5f25b79
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 69 deletions.
28 changes: 4 additions & 24 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
},
"dependencies": {
"@koa/cors": "^3.0.0",
"@melonade/melonade-declaration": "^0.19.2",
"@melonade/melonade-declaration": "^0.22.0",
"cron-parser": "^2.13.0",
"debug": "^4.1.1",
"dotenv": "^8.2.0",
Expand Down
27 changes: 27 additions & 0 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import {
Task,
WorkflowDefinition,
} from '@melonade/melonade-declaration';
import * as R from 'ramda';
import { commandConsumerClient, dispatch, poll, sendEvent } from './kafka';
import { getTaskData, handleCancelWorkflow, processUpdateTask } from './state';
import {
distributedLockStore,
transactionInstanceStore,
Expand Down Expand Up @@ -51,6 +53,31 @@ export const processCancelTransactionCommand = async (
reason: command.reason,
},
});

try {
const tasksData = await getTaskData(workflow);
const syncWorkerTasks = R.values(tasksData).filter(
(t: Task.ITask) =>
t.syncWorker === true || [Task.TaskTypes.Schedule].includes(t.type),
);

for (const t of syncWorkerTasks) {
await processUpdateTask({
status: State.TaskStates.Failed,
taskId: t.taskId,
transactionId: t.transactionId,
isSystem: true,
doNotRetry: true,
output: {
reason: 'Workflow has been cancelled',
},
});
}

await handleCancelWorkflow(workflow, tasksData);
} catch (error) {
console.log(error);
}
await locker.unlock();
};

Expand Down
44 changes: 25 additions & 19 deletions src/server/routers/v1/transaction/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,31 @@ router.post('/update', async (ctx: koaRouter.IRouterContext) => {
const taskUpdate: ITaskUpdate = ctx.request.body;

const locker = await distributedLockStore.lock(taskUpdate.transactionId);
const task = await taskInstanceStore.update(taskUpdate);
if (!task) {
throw new Error('Cannot update');
}
try {
const task = await taskInstanceStore.update(taskUpdate);
if (!task) {
throw new Error('Cannot update');
}

switch (taskUpdate.status) {
case State.TaskStates.Completed:
await handleCompletedTask(task);
break;
case State.TaskStates.Failed:
case State.TaskStates.Timeout:
case State.TaskStates.AckTimeOut:
await handleFailedTask(task, taskUpdate.doNotRetry);
break;
default:
// Case Inprogress we did't need to do anything except update the status
break;
}
switch (taskUpdate.status) {
case State.TaskStates.Completed:
await handleCompletedTask(task);
break;
case State.TaskStates.Failed:
case State.TaskStates.Timeout:
case State.TaskStates.AckTimeOut:
await handleFailedTask(task, taskUpdate.doNotRetry);
break;
default:
// Case Inprogress we did't need to do anything except update the status
break;
}

await locker.unlock();
return task;
await locker.unlock();
return task;
} catch (error) {
console.log(error);
await locker.unlock();
throw error;
}
});
35 changes: 24 additions & 11 deletions src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as R from 'ramda';
import { poll, sendEvent, stateConsumerClient } from './kafka';
import {
distributedLockStore,
IDistributedLockInstance,
taskInstanceStore,
transactionInstanceStore,
workflowInstanceStore,
Expand Down Expand Up @@ -377,14 +378,21 @@ const handleCompletedCompensateThenRetryWorkflow = async (
}
};

const handleCancelWorkflow = async (
export const handleCancelWorkflow = async (
workflow: Workflow.IWorkflow,
tasksData: { [taskReferenceName: string]: Task.ITask },
) => {
const tasksDataList = R.values(tasksData);
const runningTasks = tasksDataList.filter((taskData: Task.ITask) => {
return [State.TaskStates.Inprogress, State.TaskStates.Scheduled].includes(
taskData.status,
return (
[State.TaskStates.Inprogress, State.TaskStates.Scheduled].includes(
taskData.status,
) &&
[
Task.TaskTypes.Task,
Task.TaskTypes.Compensate,
Task.TaskTypes.SubTransaction,
].includes(taskData.type)
);
});

Expand All @@ -410,13 +418,15 @@ export const handleCompletedTask = async (task: Task.ITask): Promise<void> => {
const { workflow, tasksData, nextTaskPath } = await getTaskInfo(task);
// If workflow has cancelled
if (workflow.status === State.WorkflowStates.Cancelled) {
if (nextTaskPath.parentTask) {
if (nextTaskPath.parentTask && nextTaskPath.isLastChild) {
await processUpdateTask({
taskId: nextTaskPath.parentTask.taskId,
transactionId: nextTaskPath.parentTask.transactionId,
status: State.TaskStates.Completed,
isSystem: true,
});
} else if (nextTaskPath.parentTask && !nextTaskPath.isLastChild) {
console.log('Wait for sibling task');
} else {
await handleCancelWorkflow(workflow, tasksData);
}
Expand Down Expand Up @@ -626,16 +636,18 @@ export const handleFailedTask = async (
const { workflow, tasksData, nextTaskPath } = await getTaskInfo(task);
// If workflow oncancle do not retry or anything
if (workflow.status === State.WorkflowStates.Cancelled) {
if (nextTaskPath.parentTask) {
if (nextTaskPath.parentTask && nextTaskPath.isLastChild) {
await processUpdateTask({
taskId: nextTaskPath.parentTask.taskId,
transactionId: nextTaskPath.parentTask.transactionId,
status: State.TaskStates.Completed,
isSystem: true,
});
} else if (nextTaskPath.parentTask && !nextTaskPath.isLastChild) {
console.log('Wait for sibling task');
} else {
await handleCancelWorkflow(workflow, tasksData);
}

await handleCancelWorkflow(workflow, tasksData);
return;
}

Expand Down Expand Up @@ -690,6 +702,7 @@ export const processUpdateTask = async (
break;
}
} catch (error) {
console.log(error, taskUpdate);
sendEvent({
transactionId: taskUpdate.transactionId,
type: 'SYSTEM',
Expand All @@ -704,10 +717,9 @@ export const processUpdateTask = async (
export const processUpdateTasks = async (
tasksUpdate: Event.ITaskUpdate[],
): Promise<any> => {
var locker: IDistributedLockInstance;
try {
const locker = await distributedLockStore.lock(
tasksUpdate[0].transactionId,
);
locker = await distributedLockStore.lock(tasksUpdate[0].transactionId);
for (const taskUpdate of tasksUpdate) {
const hrStart = process.hrtime();
await processUpdateTask(taskUpdate);
Expand All @@ -718,9 +730,10 @@ export const processUpdateTasks = async (
} take ${hrEnd[0]}s ${hrEnd[1] / 1000000}ms`,
);
}
await locker.unlock();
} catch (error) {
console.error('processUpdateTasks', error);
} finally {
await locker?.unlock();
}
};

Expand Down
44 changes: 32 additions & 12 deletions src/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -976,34 +976,38 @@ export class TaskInstanceStore {
startTime: timestamp,
endTime: null,
retries:
+mapParametersToValue(workflowTask?.retry?.limit, {
mapParametersToNumber(workflowTask?.retry?.limit, {
...tasksData,
workflow,
}) ||
taskDefinition?.retry?.limit ||
}) ??
taskDefinition?.retry?.limit ??
0,
retryDelay:
+mapParametersToValue(workflowTask?.retry?.delay, {
mapParametersToNumber(workflowTask?.retry?.delay, {
...tasksData,
workflow,
}) ||
taskDefinition?.retry?.delay ||
}) ??
taskDefinition?.retry?.delay ??
0,
ackTimeout:
+mapParametersToValue(workflowTask?.ackTimeout, {
mapParametersToNumber(workflowTask?.ackTimeout, {
...tasksData,
workflow,
}) ||
taskDefinition?.ackTimeout ||
}) ??
taskDefinition?.ackTimeout ??
0,
timeout:
+mapParametersToValue(workflowTask?.timeout, {
mapParametersToNumber(workflowTask?.timeout, {
...tasksData,
workflow,
}) ||
taskDefinition?.timeout ||
}) ??
taskDefinition?.timeout ??
0,
taskPath,
syncWorker:
(<WorkflowDefinition.ITaskTask>workflowTask)?.syncWorker ??
taskDefinition?.syncWorker ??
false,
...overideTask,
});

Expand Down Expand Up @@ -1101,6 +1105,22 @@ export class DistributedLockStore {
}
}

const mapParametersToNumber = (
parameters: any,
tasksData: {
[taskReferenceName: string]: Workflow.IWorkflow | Task.ITask;
},
): number | undefined => {
if (parameters === undefined) {
return undefined;
}
const val = mapParametersToValue(parameters, tasksData);
if (Number.isNaN(+val)) {
return undefined;
}
return +val;
};

// This's global instance
export const taskDefinitionStore = new TaskDefinitionStore();
export const workflowDefinitionStore = new WorkflowDefinitionStore();
Expand Down
33 changes: 32 additions & 1 deletion src/utils/task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,37 @@ describe('mapParametersToValue', () => {
});

test('Date from now', () => {
const timeAppend = 10000;
const fromNow = task.mapParametersToValue('fromNow(${t1.output.a})', {
t1: {
taskName: 'taskName',
taskReferenceName: 't1',
taskId: 'taskId',
workflowId: 'workflowId',
transactionId: 'transactionId',
type: Task.TaskTypes.Task,
status: State.TaskStates.Completed,
output: {
a: new Date(Date.now() + timeAppend).toString(),
},
input: {},
ackTimeout: 0,
createTime: 0,
endTime: 0,
logs: [],
retries: 0,
isRetried: false,
retryDelay: 0,
timeout: 0,
startTime: 0,
taskPath: [0],
},
});
expect(fromNow).toBeGreaterThanOrEqual(timeAppend - 2000);
expect(fromNow).toBeLessThan(timeAppend);
});

test('Date from now min value (100)', () => {
expect(
task.mapParametersToValue('fromNow(${t1.output.a})', {
t1: {
Expand All @@ -863,6 +894,6 @@ describe('mapParametersToValue', () => {
taskPath: [0],
},
}),
).toBeLessThanOrEqual(0);
).toBe(100);
});
});
2 changes: 1 addition & 1 deletion src/utils/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ const parseTemplate = (template: string, values: any) => {

const dateFromNowMatch = /(^fromNow\()(.+)(\)$)/i.exec(template);
if (dateFromNowMatch?.length) {
return +dateParse(dateFromNowMatch[2], values) - Date.now();
return Math.max(+dateParse(dateFromNowMatch[2], values) - Date.now(), 100);
}

// string append template
Expand Down

0 comments on commit 5f25b79

Please sign in to comment.