Skip to content

Commit

Permalink
Merge pull request #69 from devit-tel/feature/replace-time-keeper-wit…
Browse files Browse the repository at this point in the history
…h-reminder

Feature/replace time keeper with reminder
  • Loading branch information
NV4RE authored Aug 11, 2021
2 parents de6f9fd + fd16876 commit b857d35
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export const kafkaTopicName = {
command: `${prefix}.${Kafka.topicSuffix.command}`,
// Timer event (Cron, Delay task)
timer: `${prefix}.${Kafka.topicSuffix.timer}`,
reminder: process.env['reminder_topic'],
};

export const kafkaTopic = {
Expand Down
89 changes: 87 additions & 2 deletions src/kafka/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { Event, Task, Timer } from '@melonade/melonade-declaration';
import { CommandTypes } from '@melonade/melonade-declaration/build/command';
import { TaskStates } from '@melonade/melonade-declaration/build/state';
import { TimerTypes } from '@melonade/melonade-declaration/build/timer';
import {
AdminClient,
KafkaConsumer,
Expand Down Expand Up @@ -132,9 +135,63 @@ export const pollWithMessage = <T = any>(
);
});

export interface IReminderRequest {
when: Number;
topic: string;
payload: any;
}

export const sendReminder = (payload: IReminderRequest) => {
producerClient.produce(
config.kafkaTopicName.reminder,
null,
Buffer.from(JSON.stringify(payload)),
'',
Date.now(),
);
};

export const sendTimer = (
timer: Timer.IDelayTaskTimer | Timer.IScheduleTaskTimer,
) =>
) => {
switch (timer.type) {
case TimerTypes.delayTask:
sendReminder({
payload: {
type: CommandTypes.ReloadTask,
transactionId: timer.task.transactionId,
task: timer.task,
},
topic: config.kafkaTopicName.command,
when: timer.task.startTime,
});
break;

case TimerTypes.scheduleTask:
sendReminder({
payload: {
transactionId: timer.transactionId,
taskId: timer.taskId,
isSystem: true,
status: TaskStates.Inprogress,
},
topic: config.kafkaTopicName.event,
when: Date.now(),
});

sendReminder({
payload: {
transactionId: timer.transactionId,
taskId: timer.taskId,
isSystem: true,
status: TaskStates.Completed,
},
topic: config.kafkaTopicName.event,
when: timer.completedAt,
});
break;
}

producerClient.produce(
config.kafkaTopicName.timer,
null,
Expand All @@ -144,8 +201,9 @@ export const sendTimer = (
: timer.transactionId,
Date.now(),
);
};

export const dispatch = (task: Task.ITask) =>
export const dispatch = (task: Task.ITask) => {
producerClient.produce(
`${config.kafkaTopicName.task}.${task.taskName}`,
null,
Expand All @@ -154,6 +212,33 @@ export const dispatch = (task: Task.ITask) =>
Date.now(),
);

if (task.ackTimeout > 0) {
sendReminder({
payload: {
transactionId: task.transactionId,
taskId: task.taskId,
isSystem: true,
status: TaskStates.AckTimeOut,
},
topic: config.kafkaTopicName.event,
when: task.ackTimeout + Date.now(),
});
}

if (task.timeout > 0) {
sendReminder({
payload: {
transactionId: task.transactionId,
taskId: task.taskId,
isSystem: true,
status: TaskStates.Timeout,
},
topic: config.kafkaTopicName.event,
when: task.timeout + Date.now(),
});
}
};

// TODO Since we have distributed lock, this need to rewrite using state.processTask instead !!
// Use to send update event to another PM or itself to make sure ordering
export const sendUpdate = (taskUpdate: Event.ITaskUpdate) =>
Expand Down
8 changes: 8 additions & 0 deletions src/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,14 @@ export class TaskInstanceStore {
);

if (task.type === TaskTypes.SubTransaction) {
sendEvent({
transactionId: task.transactionId,
type: 'TASK',
isError: false,
timestamp,
details: task,
});

const workflowDefinition = await workflowDefinitionStore.get(
task.input?.workflowName,
task.input?.workflowRev,
Expand Down

0 comments on commit b857d35

Please sign in to comment.