diff --git a/package-lock.json b/package-lock.json index 283c4dc..afd1a5a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1441,31 +1441,13 @@ } }, "@melonade/melonade-declaration": { - "version": "0.19.2", - "resolved": "https://registry.npmjs.org/@melonade/melonade-declaration/-/melonade-declaration-0.19.2.tgz", - "integrity": "sha512-gfRMqrptk42SH70jPftxM5vx6XvJ5zTcIL8nXr9fGpHslD6OSEc/wbXLVWfao0THyyxz2U49wyec/KIHkUJzGQ==", + "version": "0.22.0", + "resolved": "https://registry.npmjs.org/@melonade/melonade-declaration/-/melonade-declaration-0.22.0.tgz", + "integrity": "sha512-J8x7gfS9c4QC+nDuhPZD2zGhoYPfUk0n0iNqTiNn869i10J5+YwAO9csdjn6LZKTlswvnjw+qz2Yr33fIU6yCA==", "requires": { "ajv": "^6.12.0", "ramda": "^0.26.1", "tslib": "~1.10.0" - }, - "dependencies": { - "ajv": { - "version": "6.12.4", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.4.tgz", - "integrity": "sha512-eienB2c9qVQs2KWexhkrdMLVDoIQCz5KSeLxwg9Lzk4DOfBtIK9PQwwufcsn1jjGuf9WZmqPMbGxOzfcuphJCQ==", - "requires": { - "fast-deep-equal": "^3.1.1", - "fast-json-stable-stringify": "^2.0.0", - "json-schema-traverse": "^0.4.1", - "uri-js": "^4.2.2" - } - }, - "fast-deep-equal": { - "version": "3.1.3", - "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", - "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" - } } }, "@mrmlnc/readdir-enhanced": { @@ -1921,7 +1903,6 @@ "version": "6.12.4", "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.4.tgz", "integrity": "sha512-eienB2c9qVQs2KWexhkrdMLVDoIQCz5KSeLxwg9Lzk4DOfBtIK9PQwwufcsn1jjGuf9WZmqPMbGxOzfcuphJCQ==", - "dev": true, "requires": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -3758,8 +3739,7 @@ "fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", - "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", - "dev": true + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" }, "fast-glob": { "version": "2.2.7", diff --git a/package.json b/package.json index 5aa00e3..1bd5849 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ }, "dependencies": { "@koa/cors": "^3.0.0", - "@melonade/melonade-declaration": "^0.19.2", + "@melonade/melonade-declaration": "^0.22.0", "cron-parser": "^2.13.0", "debug": "^4.1.1", "dotenv": "^8.2.0", diff --git a/src/command.ts b/src/command.ts index c8434da..c336808 100644 --- a/src/command.ts +++ b/src/command.ts @@ -4,7 +4,9 @@ import { Task, WorkflowDefinition, } from '@melonade/melonade-declaration'; +import * as R from 'ramda'; import { commandConsumerClient, dispatch, poll, sendEvent } from './kafka'; +import { getTaskData, handleCancelWorkflow, processUpdateTask } from './state'; import { distributedLockStore, transactionInstanceStore, @@ -51,6 +53,31 @@ export const processCancelTransactionCommand = async ( reason: command.reason, }, }); + + try { + const tasksData = await getTaskData(workflow); + const syncWorkerTasks = R.values(tasksData).filter( + (t: Task.ITask) => + t.syncWorker === true || [Task.TaskTypes.Schedule].includes(t.type), + ); + + for (const t of syncWorkerTasks) { + await processUpdateTask({ + status: State.TaskStates.Failed, + taskId: t.taskId, + transactionId: t.transactionId, + isSystem: true, + doNotRetry: true, + output: { + reason: 'Workflow has been cancelled', + }, + }); + } + + await handleCancelWorkflow(workflow, tasksData); + } catch (error) { + console.log(error); + } await locker.unlock(); }; diff --git a/src/server/routers/v1/transaction/index.ts b/src/server/routers/v1/transaction/index.ts index 947c73b..8e3e9e9 100644 --- a/src/server/routers/v1/transaction/index.ts +++ b/src/server/routers/v1/transaction/index.ts @@ -51,25 +51,31 @@ router.post('/update', async (ctx: koaRouter.IRouterContext) => { const taskUpdate: ITaskUpdate = ctx.request.body; const locker = await distributedLockStore.lock(taskUpdate.transactionId); - const task = await taskInstanceStore.update(taskUpdate); - if (!task) { - throw new Error('Cannot update'); - } + try { + const task = await taskInstanceStore.update(taskUpdate); + if (!task) { + throw new Error('Cannot update'); + } - switch (taskUpdate.status) { - case State.TaskStates.Completed: - await handleCompletedTask(task); - break; - case State.TaskStates.Failed: - case State.TaskStates.Timeout: - case State.TaskStates.AckTimeOut: - await handleFailedTask(task, taskUpdate.doNotRetry); - break; - default: - // Case Inprogress we did't need to do anything except update the status - break; - } + switch (taskUpdate.status) { + case State.TaskStates.Completed: + await handleCompletedTask(task); + break; + case State.TaskStates.Failed: + case State.TaskStates.Timeout: + case State.TaskStates.AckTimeOut: + await handleFailedTask(task, taskUpdate.doNotRetry); + break; + default: + // Case Inprogress we did't need to do anything except update the status + break; + } - await locker.unlock(); - return task; + await locker.unlock(); + return task; + } catch (error) { + console.log(error); + await locker.unlock(); + throw error; + } }); diff --git a/src/state.ts b/src/state.ts index a98c589..3b02d22 100644 --- a/src/state.ts +++ b/src/state.ts @@ -10,6 +10,7 @@ import * as R from 'ramda'; import { poll, sendEvent, stateConsumerClient } from './kafka'; import { distributedLockStore, + IDistributedLockInstance, taskInstanceStore, transactionInstanceStore, workflowInstanceStore, @@ -377,14 +378,21 @@ const handleCompletedCompensateThenRetryWorkflow = async ( } }; -const handleCancelWorkflow = async ( +export const handleCancelWorkflow = async ( workflow: Workflow.IWorkflow, tasksData: { [taskReferenceName: string]: Task.ITask }, ) => { const tasksDataList = R.values(tasksData); const runningTasks = tasksDataList.filter((taskData: Task.ITask) => { - return [State.TaskStates.Inprogress, State.TaskStates.Scheduled].includes( - taskData.status, + return ( + [State.TaskStates.Inprogress, State.TaskStates.Scheduled].includes( + taskData.status, + ) && + [ + Task.TaskTypes.Task, + Task.TaskTypes.Compensate, + Task.TaskTypes.SubTransaction, + ].includes(taskData.type) ); }); @@ -410,13 +418,15 @@ export const handleCompletedTask = async (task: Task.ITask): Promise => { const { workflow, tasksData, nextTaskPath } = await getTaskInfo(task); // If workflow has cancelled if (workflow.status === State.WorkflowStates.Cancelled) { - if (nextTaskPath.parentTask) { + if (nextTaskPath.parentTask && nextTaskPath.isLastChild) { await processUpdateTask({ taskId: nextTaskPath.parentTask.taskId, transactionId: nextTaskPath.parentTask.transactionId, status: State.TaskStates.Completed, isSystem: true, }); + } else if (nextTaskPath.parentTask && !nextTaskPath.isLastChild) { + console.log('Wait for sibling task'); } else { await handleCancelWorkflow(workflow, tasksData); } @@ -626,16 +636,18 @@ export const handleFailedTask = async ( const { workflow, tasksData, nextTaskPath } = await getTaskInfo(task); // If workflow oncancle do not retry or anything if (workflow.status === State.WorkflowStates.Cancelled) { - if (nextTaskPath.parentTask) { + if (nextTaskPath.parentTask && nextTaskPath.isLastChild) { await processUpdateTask({ taskId: nextTaskPath.parentTask.taskId, transactionId: nextTaskPath.parentTask.transactionId, status: State.TaskStates.Completed, isSystem: true, }); + } else if (nextTaskPath.parentTask && !nextTaskPath.isLastChild) { + console.log('Wait for sibling task'); + } else { + await handleCancelWorkflow(workflow, tasksData); } - - await handleCancelWorkflow(workflow, tasksData); return; } @@ -690,6 +702,7 @@ export const processUpdateTask = async ( break; } } catch (error) { + console.log(error, taskUpdate); sendEvent({ transactionId: taskUpdate.transactionId, type: 'SYSTEM', @@ -704,10 +717,9 @@ export const processUpdateTask = async ( export const processUpdateTasks = async ( tasksUpdate: Event.ITaskUpdate[], ): Promise => { + var locker: IDistributedLockInstance; try { - const locker = await distributedLockStore.lock( - tasksUpdate[0].transactionId, - ); + locker = await distributedLockStore.lock(tasksUpdate[0].transactionId); for (const taskUpdate of tasksUpdate) { const hrStart = process.hrtime(); await processUpdateTask(taskUpdate); @@ -718,9 +730,10 @@ export const processUpdateTasks = async ( } take ${hrEnd[0]}s ${hrEnd[1] / 1000000}ms`, ); } - await locker.unlock(); } catch (error) { console.error('processUpdateTasks', error); + } finally { + await locker?.unlock(); } }; diff --git a/src/store/index.ts b/src/store/index.ts index 89a20f1..57a8a25 100644 --- a/src/store/index.ts +++ b/src/store/index.ts @@ -976,34 +976,38 @@ export class TaskInstanceStore { startTime: timestamp, endTime: null, retries: - +mapParametersToValue(workflowTask?.retry?.limit, { + mapParametersToNumber(workflowTask?.retry?.limit, { ...tasksData, workflow, - }) || - taskDefinition?.retry?.limit || + }) ?? + taskDefinition?.retry?.limit ?? 0, retryDelay: - +mapParametersToValue(workflowTask?.retry?.delay, { + mapParametersToNumber(workflowTask?.retry?.delay, { ...tasksData, workflow, - }) || - taskDefinition?.retry?.delay || + }) ?? + taskDefinition?.retry?.delay ?? 0, ackTimeout: - +mapParametersToValue(workflowTask?.ackTimeout, { + mapParametersToNumber(workflowTask?.ackTimeout, { ...tasksData, workflow, - }) || - taskDefinition?.ackTimeout || + }) ?? + taskDefinition?.ackTimeout ?? 0, timeout: - +mapParametersToValue(workflowTask?.timeout, { + mapParametersToNumber(workflowTask?.timeout, { ...tasksData, workflow, - }) || - taskDefinition?.timeout || + }) ?? + taskDefinition?.timeout ?? 0, taskPath, + syncWorker: + (workflowTask)?.syncWorker ?? + taskDefinition?.syncWorker ?? + false, ...overideTask, }); @@ -1101,6 +1105,22 @@ export class DistributedLockStore { } } +const mapParametersToNumber = ( + parameters: any, + tasksData: { + [taskReferenceName: string]: Workflow.IWorkflow | Task.ITask; + }, +): number | undefined => { + if (parameters === undefined) { + return undefined; + } + const val = mapParametersToValue(parameters, tasksData); + if (Number.isNaN(+val)) { + return undefined; + } + return +val; +}; + // This's global instance export const taskDefinitionStore = new TaskDefinitionStore(); export const workflowDefinitionStore = new WorkflowDefinitionStore(); diff --git a/src/utils/task.test.ts b/src/utils/task.test.ts index 9a4b80b..57c5605 100755 --- a/src/utils/task.test.ts +++ b/src/utils/task.test.ts @@ -837,6 +837,37 @@ describe('mapParametersToValue', () => { }); test('Date from now', () => { + const timeAppend = 10000; + const fromNow = task.mapParametersToValue('fromNow(${t1.output.a})', { + t1: { + taskName: 'taskName', + taskReferenceName: 't1', + taskId: 'taskId', + workflowId: 'workflowId', + transactionId: 'transactionId', + type: Task.TaskTypes.Task, + status: State.TaskStates.Completed, + output: { + a: new Date(Date.now() + timeAppend).toString(), + }, + input: {}, + ackTimeout: 0, + createTime: 0, + endTime: 0, + logs: [], + retries: 0, + isRetried: false, + retryDelay: 0, + timeout: 0, + startTime: 0, + taskPath: [0], + }, + }); + expect(fromNow).toBeGreaterThanOrEqual(timeAppend - 2000); + expect(fromNow).toBeLessThan(timeAppend); + }); + + test('Date from now min value (100)', () => { expect( task.mapParametersToValue('fromNow(${t1.output.a})', { t1: { @@ -863,6 +894,6 @@ describe('mapParametersToValue', () => { taskPath: [0], }, }), - ).toBeLessThanOrEqual(0); + ).toBe(100); }); }); diff --git a/src/utils/task.ts b/src/utils/task.ts index 4198242..5552fe0 100755 --- a/src/utils/task.ts +++ b/src/utils/task.ts @@ -95,7 +95,7 @@ const parseTemplate = (template: string, values: any) => { const dateFromNowMatch = /(^fromNow\()(.+)(\)$)/i.exec(template); if (dateFromNowMatch?.length) { - return +dateParse(dateFromNowMatch[2], values) - Date.now(); + return Math.max(+dateParse(dateFromNowMatch[2], values) - Date.now(), 100); } // string append template