Skip to content

Commit

Permalink
Merge branch 'master' into v6
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 12, 2024
2 parents bae2024 + c8abdc7 commit 8617fb5
Show file tree
Hide file tree
Showing 22 changed files with 343 additions and 199 deletions.
17 changes: 17 additions & 0 deletions docs/gitbook/bullmq-pro/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# [7.17.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.16.0...v7.17.0) (2024-10-12)


### Features

* **repeat:** deprecate immediately on job scheduler ([ed047f7](https://github.com/taskforcesh/bullmq/commit/ed047f7ab69ebdb445343b6cb325e90b95ee9dc5))
* **job:** expose priority value ([#2804](https://github.com/taskforcesh/bullmq/issues/2804)) ([9abec3d](https://github.com/taskforcesh/bullmq/commit/9abec3dbc4c69f2496c5ff6b5d724f4d1a5ca62f))
* **job:** add deduplication logic ([#2796](https://github.com/taskforcesh/bullmq/issues/2796)) ([0a4982d](https://github.com/taskforcesh/bullmq/commit/0a4982d05d27c066248290ab9f59349b802d02d5))
* **queue:** add new upsertJobScheduler, getJobSchedulers and removeJobSchedulers methods ([dd6b6b2](https://github.com/taskforcesh/bullmq/commit/dd6b6b2263badd8f29db65d1fa6bcdf5a1e9f6e2))
* **worker-fork:** allow passing fork options ([#2795](https://github.com/taskforcesh/bullmq/issues/2795)) ([f7a4292](https://github.com/taskforcesh/bullmq/commit/f7a4292e064b41236f4489b3d7785a4c599a6435))
* **worker-thread:** allow passing Worker options ([#2791](https://github.com/taskforcesh/bullmq/issues/2791)) ref [#1555](https://github.com/taskforcesh/bullmq/issues/1555) ([6a1f7a9](https://github.com/taskforcesh/bullmq/commit/6a1f7a9f0303561d6ec7b2005ba0227132b89e07))

### Bug Fixes

* **repeat:** also consider startDate when using "every" ([25bbaa8](https://github.com/taskforcesh/bullmq/commit/25bbaa81af87f9944a64bc4fb7e0c76ef223ada4))
* **sandbox:** catch exit errors ([#2800](https://github.com/taskforcesh/bullmq/issues/2800)) ([6babb9e](https://github.com/taskforcesh/bullmq/commit/6babb9e2f355feaf9bd1a8ed229c1001e6de7144))

# [7.16.0](https://github.com/taskforcesh/bullmq-pro/compare/v7.15.4...v7.16.0) (2024-09-24)


Expand Down
21 changes: 21 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
## [5.19.1](https://github.com/taskforcesh/bullmq/compare/v5.19.0...v5.19.1) (2024-10-12)


### Bug Fixes

* **sandbox:** fix serialization of error with circular references are present ([#2815](https://github.com/taskforcesh/bullmq/issues/2815)) fix [#2813](https://github.com/taskforcesh/bullmq/issues/2813) ([a384d92](https://github.com/taskforcesh/bullmq/commit/a384d926bee15bffa84178a8fad7b94a6a08b572))

# [5.19.0](https://github.com/taskforcesh/bullmq/compare/v5.18.0...v5.19.0) (2024-10-11)


### Features

* **repeat:** deprecate immediately on job scheduler ([ed047f7](https://github.com/taskforcesh/bullmq/commit/ed047f7ab69ebdb445343b6cb325e90b95ee9dc5))

# [5.18.0](https://github.com/taskforcesh/bullmq/compare/v5.17.1...v5.18.0) (2024-10-09)


### Features

* **job:** expose priority value ([#2804](https://github.com/taskforcesh/bullmq/issues/2804)) ([9abec3d](https://github.com/taskforcesh/bullmq/commit/9abec3dbc4c69f2496c5ff6b5d724f4d1a5ca62f))

## [5.17.1](https://github.com/taskforcesh/bullmq/compare/v5.17.0...v5.17.1) (2024-10-07)


Expand Down
4 changes: 2 additions & 2 deletions docs/gitbook/guide/job-schedulers/repeat-strategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Below is the supported format for cron expressions in cron-parser:

This format includes the optional second field, which is not typically available in standard cron schedules, allowing for even more precise scheduling.

Cron expressions are quite powerful as in they support seemless handling timezone differences and daylight saving time transitions, crucial for tasks that depend on local times. And also because of the use of special characters to denote specific days or things like the last day of the month, providing flexibility for monthly and weekly tasks. 
Cron expressions are quite powerful as in they support seemless handling timezone differences and daylight saving time transitions, crucial for tasks that depend on local times. And also because of the use of special characters to denote specific days or things like the last day of the month, providing flexibility for monthly and weekly tasks.

If you are new to Cron expressions, [Wikipedia](https://en.wikipedia.org/wiki/Cron) is an excelent starting point to learn how to use them.

Expand All @@ -79,7 +79,7 @@ const myQueue = new Queue('my-cron-jobs', { connection });
await myQueue.upsertJobScheduler(
'weekday-morning-job',
{
cron: '0 0 9 * * 1-5', // Runs at 9:00 AM every Monday to Friday
pattern: '0 0 9 * * 1-5', // Runs at 9:00 AM every Monday to Friday
},
{
name: 'cron-job',
Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/guide/workers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ worker.on('progress', (job: Job, progress: number | object) => {
Finally, when the process fails with an exception it is possible to listen for the `failed` event too:

```typescript
worker.on('failed', (job: Job, error: Error) => {
worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
// Do something with the return value.
});
```
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.17.1",
"version": "5.19.1",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
44 changes: 20 additions & 24 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ export class JobScheduler extends QueueBase {
super(name, opts, Connection);

this.repeatStrategy =
(opts.settings && opts.settings.repeatStrategy) || getNextMillis;
(opts.settings && opts.settings.repeatStrategy) || defaultRepeatStrategy;
}

async upsertJobScheduler<T = any, R = any, N extends string = string>(
jobSchedulerId: string,
repeatOpts: Omit<RepeatOptions, 'key'>,
repeatOpts: Omit<RepeatOptions, 'key' | 'prevMillis' | 'offset'>,
jobName: N,
jobData: T,
opts: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>,
Expand Down Expand Up @@ -69,21 +69,26 @@ export class JobScheduler extends QueueBase {
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

// Check if we have a start date for the repeatable job
const { startDate } = repeatOpts;
const { startDate, ...filteredRepeatOpts } = repeatOpts;
if (startDate) {
const startMillis = new Date(startDate).getTime();
now = startMillis > now ? startMillis : now;
}

const nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
let nextMillis;
if (every) {
nextMillis = prevMillis + every;

if (nextMillis < now) {
nextMillis = now;
}
} else if (pattern) {
now = prevMillis < now ? now : prevMillis;
nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
}

const hasImmediately = Boolean(
(every || pattern) && repeatOpts.immediately,
);
const offset = hasImmediately && every ? now - nextMillis : undefined;
if (nextMillis) {
if (override) {
await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, {
Expand All @@ -100,16 +105,13 @@ export class JobScheduler extends QueueBase {
);
}

const { immediately, ...filteredRepeatOpts } = repeatOpts;

return this.createNextJob<T, R, N>(
jobName,
nextMillis,
jobSchedulerId,
{ ...opts, repeat: { offset, ...filteredRepeatOpts } },
{ ...opts, repeat: filteredRepeatOpts },
jobData,
iterationCount,
hasImmediately,
);
}
}
Expand All @@ -121,24 +123,22 @@ export class JobScheduler extends QueueBase {
opts: JobsOptions,
data: T,
currentCount: number,
hasImmediately: boolean,
) {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId: jobSchedulerId,
jobSchedulerId,
nextMillis,
});

const now = Date.now();
const delay =
nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now;
const delay = nextMillis - now;

const mergedOpts = {
...opts,
jobId,
delay: delay < 0 || hasImmediately ? 0 : delay,
delay: delay < 0 ? 0 : delay,
timestamp: now,
prevMillis: nextMillis,
repeatJobKey: jobSchedulerId,
Expand Down Expand Up @@ -230,15 +230,11 @@ export class JobScheduler extends QueueBase {
}
}

export const getNextMillis = (
export const defaultRepeatStrategy = (
millis: number,
opts: RepeatOptions,
): number | undefined => {
const { every, pattern } = opts;

if (every) {
return Math.floor(millis / every) * every + (opts.immediately ? 0 : every);
}
const { pattern } = opts;

const currentDate = new Date(millis);
const interval = parseExpression(pattern, {
Expand Down
22 changes: 17 additions & 5 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
lengthInUtf8Bytes,
parseObjectValues,
tryCatch,
finishedErrors,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -95,7 +94,15 @@ export class Job<
* An amount of milliseconds to wait until this job can be processed.
* @defaultValue 0
*/
delay: number;
delay = 0;

/**
* Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
* using priorities has a slight impact on performance,
* so do not use it if not required.
* @defaultValue 0
*/
priority = 0;

/**
* Timestamp when the job was created (unless overridden with job options).
Expand Down Expand Up @@ -194,13 +201,14 @@ export class Job<
this.opts = Object.assign(
{
attempts: 0,
delay: 0,
},
restOpts,
);

this.delay = this.opts.delay;

this.priority = this.opts.priority || 0;

this.repeatJobKey = repeatJobKey;

this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
Expand All @@ -214,7 +222,9 @@ export class Job<
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;
this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId;
this.deduplicationId = opts.deduplication
? opts.deduplication.id
: this.debounceId;

this.toKey = queue.toKey.bind(queue);
this.setScripts();
Expand Down Expand Up @@ -737,7 +747,7 @@ export class Job<

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw finishedErrors({
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
Expand Down Expand Up @@ -840,13 +850,15 @@ export class Job<
/**
* Change job priority.
*
* @param opts - options containing priority and lifo values.
* @returns void
*/
async changePriority(opts: {
priority?: number;
lifo?: boolean;
}): Promise<void> {
await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
this.priority = opts.priority || 0;
}

/**
Expand Down
12 changes: 6 additions & 6 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,12 @@ export class Queue<
*
* @param id - identifier
*/
async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;
return client.del(`${this.keys.de}:${id}`);
}
async removeDeduplicationKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
Loading

0 comments on commit 8617fb5

Please sign in to comment.