Skip to content

Commit

Permalink
docs(step-jobs): add how to import errors (#1748)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Mar 24, 2023
1 parent 358ab2a commit d0cbb76
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
],
"@typescript-eslint/no-explicit-any": "off",

"@typescript-eslint/no-non-null-assertion": "off",
"@typescript-eslint/ban-types": [
1,
{
Expand Down
29 changes: 29 additions & 0 deletions docs/gitbook/bullmq-pro/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,32 @@
# [5.2.0](https://github.com/taskforcesh/bullmq-pro/compare/v5.1.15...v5.2.0) (2023-03-23)


### Features

* **groups:** add repair maxed group function ([a1fa1d8](https://github.com/taskforcesh/bullmq-pro/commit/a1fa1d80cf8ad79c7b9844df163765f61231350a))

## [5.1.15](https://github.com/taskforcesh/bullmq-pro/compare/v5.1.14...v5.1.15) (2023-03-23)


### Bug Fixes

* **deps:** upgrade bullmq to 3.10.2 ([#138](https://github.com/taskforcesh/bullmq-pro/issues/138)) ([186be2b](https://github.com/taskforcesh/bullmq-pro/commit/186be2bfedf474dae6931d2dbc636bd53d900cf8))

## [5.1.14](https://github.com/taskforcesh/bullmq-pro/compare/v5.1.13...v5.1.14) (2023-02-15)


### Bug Fixes

* **deps:** upgrade bullmq to 3.6.6 ([#137](https://github.com/taskforcesh/bullmq-pro/issues/137)) ([2af512a](https://github.com/taskforcesh/bullmq-pro/commit/2af512a4d6f4212d888af5766de8d20ea22b3c3c))

## [5.1.13](https://github.com/taskforcesh/bullmq-pro/compare/v5.1.12...v5.1.13) (2023-02-07)


### Bug Fixes

* upgrade bullmq to v3.6.3 ([74d8d0c](https://github.com/taskforcesh/bullmq-pro/commit/74d8d0c937973c94792abc063a150372364fe0bf))
* **rate-limit:** update group concurrency after manual rate-limit ([de66ec4](https://github.com/taskforcesh/bullmq-pro/commit/de66ec494b8400e3cbb916f5937dc3834a213389))

## [5.1.12](https://github.com/taskforcesh/bullmq-pro/compare/v5.1.11...v5.1.12) (2023-01-26)


Expand Down
6 changes: 6 additions & 0 deletions docs/gitbook/patterns/process-step-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Another use case is to delay a job at runtime.
This could be handled using the moveToDelayed method:

```typescript
import { DelayedError, Worker } from 'bullmq';

enum Step {
Initial,
Second,
Expand Down Expand Up @@ -95,6 +97,8 @@ A common use case is to add children at runtime and then wait for the children t
This could be handled using the moveToWaitingChildren method:

```typescript
import { WaitingChildrenError, Worker } from 'bullmq';

enum Step {
Initial,
Second,
Expand Down Expand Up @@ -177,6 +181,8 @@ Another use case is to add flows at runtime and then wait for the children to co
For example, we can add children dynamically in the processor function of a worker. This could be handled in this way:

```typescript
import { FlowProducer, WaitingChildrenError, Worker } from 'bullmq';

enum Step {
Initial,
Second,
Expand Down
10 changes: 5 additions & 5 deletions src/classes/async-fifo-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
*
*/
export class AsyncFifoQueue<T> {
private queue: T[] = [];
private queue: (T | undefined)[] = [];

private nextPromise: Promise<T> | undefined;
private nextPromise: Promise<T | undefined> | undefined;
private resolve: ((value: T | undefined) => void) | undefined;
private reject: ((reason?: any) => void) | undefined;
private pending = new Set<Promise<T>>();
Expand Down Expand Up @@ -56,17 +56,17 @@ export class AsyncFifoQueue<T> {
}

private resolvePromise(job: T) {
this.resolve(job);
this.resolve!(job);
this.newPromise();
}

private rejectPromise(err: any) {
this.reject(err);
this.reject!(err);
this.newPromise();
}

private newPromise() {
this.nextPromise = new Promise<T>((resolve, reject) => {
this.nextPromise = new Promise<T | undefined>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
Expand Down
10 changes: 6 additions & 4 deletions src/classes/backoffs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ export class Backoffs {
},
};

static normalize(backoff: number | BackoffOptions): BackoffOptions {
static normalize(
backoff: number | BackoffOptions,
): BackoffOptions | undefined {
if (Number.isFinite(<number>backoff)) {
return {
type: 'fixed',
Expand All @@ -37,7 +39,7 @@ export class Backoffs {
err: Error,
job: MinimalJob,
customStrategy?: BackoffStrategy,
): Promise<number> | number {
): Promise<number> | number | undefined {
if (backoff) {
const strategy = lookupStrategy(backoff, customStrategy);

Expand All @@ -48,10 +50,10 @@ export class Backoffs {

function lookupStrategy(
backoff: BackoffOptions,
customStrategy: BackoffStrategy,
customStrategy?: BackoffStrategy,
): BackoffStrategy {
if (backoff.type in Backoffs.builtinStrategies) {
return Backoffs.builtinStrategies[backoff.type](backoff.delay);
return Backoffs.builtinStrategies[backoff.type](backoff.delay!);
} else if (customStrategy) {
return customStrategy;
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/classes/child-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ enum ChildStatus {
*
*/
export class ChildProcessor {
public status: ChildStatus;
public status?: ChildStatus;
public processor: any;
public currentJobPromise: Promise<unknown> | undefined;

Expand Down
4 changes: 2 additions & 2 deletions src/classes/process-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function hasProcessExited(child: ChildProcess): boolean {
export async function killAsync(
child: ChildProcess,
signal: 'SIGTERM' | 'SIGKILL' = 'SIGKILL',
timeoutMs: number = undefined,
timeoutMs?: number,
): Promise<void> {
if (hasProcessExited(child)) {
return;
Expand All @@ -28,7 +28,7 @@ export async function killAsync(
const onExit = onExitOnce(child);
child.kill(signal);

if (timeoutMs === 0 || isFinite(timeoutMs)) {
if (timeoutMs !== undefined && (timeoutMs === 0 || isFinite(timeoutMs))) {
const timeoutHandle = setTimeout(() => {
if (!hasProcessExited(child)) {
child.kill('SIGKILL');
Expand Down
2 changes: 2 additions & 0 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ export class QueueGetters<
if (!clientCommandMessageReg.test((<Error>err).message)) {
throw err;
}

return [];
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class Queue<
> extends QueueGetters<DataType, ResultType, NameType> {
token = v4();
jobsOpts: BaseJobOptions;
private _repeat: Repeat;
private _repeat?: Repeat;

constructor(
name: string,
Expand All @@ -113,7 +113,7 @@ export class Queue<
Connection,
);

this.jobsOpts = get(opts, 'defaultJobOptions');
this.jobsOpts = get(opts, 'defaultJobOptions') ?? {};

this.waitUntilReady()
.then(client => {
Expand Down
18 changes: 9 additions & 9 deletions src/commands/script-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class ScriptLoader {

if (!includeMetadata) {
const { name, numberOfKeys } = splitFilename(includePath);
let childContent: string;
let childContent = '';
try {
const buf = await readFile(includePath, { flag: 'r' });
childContent = buf.toString();
Expand Down Expand Up @@ -334,7 +334,7 @@ export class ScriptLoader {
processed = processed || new Set<string>();
let content = file.content;
file.includes.forEach((child: ScriptMetadata) => {
const emitted = processed.has(child.path);
const emitted = processed!.has(child.path);
const fragment = this.interpolate(child, processed);
const replacement = emitted ? '' : fragment;

Expand All @@ -347,7 +347,7 @@ export class ScriptLoader {
content = replaceAll(content, child.token, '');
}

processed.add(child.path);
processed!.add(child.path);
});

return content;
Expand All @@ -371,7 +371,7 @@ export class ScriptLoader {

return {
name,
options: { numberOfKeys, lua },
options: { numberOfKeys: numberOfKeys!, lua },
};
}

Expand Down Expand Up @@ -437,7 +437,7 @@ export class ScriptLoader {
pathname: string,
cache?: Map<string, ScriptMetadata>,
): Promise<void> {
let paths: Set<string> = this.clientScripts.get(client);
let paths = this.clientScripts.get(client);
if (!paths) {
paths = new Set<string>();
this.clientScripts.set(client, paths);
Expand Down Expand Up @@ -513,18 +513,18 @@ function getPkgJsonDir(): string {
// this version is preferred to the simpler version because of
// https://github.com/facebook/jest/issues/5303 -
// tldr: dont assume you're the only one with the doing something like this
function getCallerFile() {
function getCallerFile(): string {
const originalFunc = Error.prepareStackTrace;

let callerFile;
let callerFile = '';
try {
Error.prepareStackTrace = (_, stack) => stack;

const sites = <NodeJS.CallSite[]>(<unknown>new Error().stack);
const currentFile = sites.shift().getFileName();
const currentFile = sites.shift()?.getFileName();

while (sites.length) {
callerFile = sites.shift().getFileName();
callerFile = sites.shift()?.getFileName() ?? '';

if (currentFile !== callerFile) {
break;
Expand Down
2 changes: 1 addition & 1 deletion src/types/backoff-strategy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MinimalJob } from '../interfaces/minimal-job';

export type BackoffStrategy = (
attemptsMade?: number,
attemptsMade: number,
type?: string,
err?: Error,
job?: MinimalJob,
Expand Down
7 changes: 5 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ export async function removeAllQueueData(
});
}

export function getParentKey(opts: { id: string; queue: string }): string {
export function getParentKey(opts: {
id: string;
queue: string;
}): string | undefined {
if (opts) {
return `${opts.queue}:${opts.id}`;
}
Expand Down Expand Up @@ -127,7 +130,7 @@ export const asyncSend = <T extends procSendLike>(
): Promise<void> => {
return new Promise((resolve, reject) => {
if (typeof proc.send === 'function') {
proc.send(msg, (err: Error) => {
proc.send(msg, (err: Error | null) => {
if (err) {
reject(err);
} else {
Expand Down

0 comments on commit d0cbb76

Please sign in to comment.