Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/all-years-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow/utils": patch
---

Add initial `@workflow/utils` package
1 change: 1 addition & 0 deletions .changeset/pre.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@workflow/world-postgres": "4.0.0",
"@workflow/world-testing": "4.0.0",
"@workflow/world-vercel": "4.0.0",
"@workflow/utils": "4.0.0",
"docs": "4.0.0",
"@workflow/example-app": "0.0.2-alpha.18",
"@workflow/example-hono": "0.0.0",
Expand Down
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@types/ms": "^2.1.0",
"@vercel/functions": "catalog:",
"@workflow/errors": "workspace:*",
"@workflow/utils": "workspace:*",
"@workflow/world": "workspace:*",
"@workflow/world-local": "workspace:*",
"@workflow/world-vercel": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/step.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { FatalError, WorkflowRuntimeError } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { EventConsumerResult } from './events-consumer.js';
import { WorkflowSuspension } from './global.js';
import { stepLogger } from './logger.js';
import type { WorkflowOrchestratorContext } from './private.js';
import type { Serializable } from './schemas.js';
import { hydrateStepReturnValue } from './serialization.js';
import { withResolvers } from './util.js';

export function createUseStep(ctx: WorkflowOrchestratorContext) {
return function useStep<Args extends Serializable[], Result>(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/telemetry.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Span, SpanOptions } from '@opentelemetry/api';
import { once } from './util.js';
import { once } from '@workflow/utils';

// ============================================================
// Trace Context Propagation Utilities
Expand Down
177 changes: 1 addition & 176 deletions packages/core/src/util.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import { describe, expect, it } from 'vitest';
import {
buildWorkflowSuspensionMessage,
getWorkflowRunStreamId,
parseDurationToDate,
} from './util';
import { buildWorkflowSuspensionMessage, getWorkflowRunStreamId } from './util';

describe('buildWorkflowSuspensionMessage', () => {
const runId = 'test-run-123';
Expand Down Expand Up @@ -169,174 +165,3 @@ describe('getWorkflowRunStreamId', () => {
expect(result.includes('_user')).toBe(true);
});
});

describe('parseDurationToDate', () => {
describe('string durations', () => {
it('should parse seconds', () => {
const before = Date.now();
const result = parseDurationToDate('5s');
const after = Date.now();
expect(result.getTime()).toBeGreaterThanOrEqual(before + 5000);
expect(result.getTime()).toBeLessThanOrEqual(after + 5000);
});

it('should parse minutes', () => {
const before = Date.now();
const result = parseDurationToDate('2m');
const after = Date.now();
const expected = before + 120000;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 120000);
});

it('should parse hours', () => {
const before = Date.now();
const result = parseDurationToDate('1h');
const after = Date.now();
const expected = before + 3600000;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 3600000);
});

it('should parse days', () => {
const before = Date.now();
const result = parseDurationToDate('1d');
const after = Date.now();
const expected = before + 86400000;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 86400000);
});

it('should parse milliseconds', () => {
const before = Date.now();
const result = parseDurationToDate('500ms');
const after = Date.now();
const expected = before + 500;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 500);
});

it('should throw error for invalid string', () => {
expect(() =>
parseDurationToDate(
// @ts-expect-error
'invalid'
)
).toThrow(
'Invalid duration: "invalid". Expected a valid duration string like "1s", "1m", "1h", etc.'
);
});

it('should throw error for negative duration string', () => {
expect(() => parseDurationToDate('-1s')).toThrow(
'Invalid duration: "-1s". Expected a valid duration string like "1s", "1m", "1h", etc.'
);
});
});

describe('number durations (milliseconds)', () => {
it('should parse zero milliseconds', () => {
const before = Date.now();
const result = parseDurationToDate(0);
const after = Date.now();
expect(result.getTime()).toBeGreaterThanOrEqual(before);
expect(result.getTime()).toBeLessThanOrEqual(after);
});

it('should parse positive milliseconds', () => {
const before = Date.now();
const result = parseDurationToDate(10000);
const after = Date.now();
const expected = before + 10000;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 10000);
});

it('should parse large milliseconds', () => {
const before = Date.now();
const result = parseDurationToDate(1000000);
const after = Date.now();
const expected = before + 1000000;
expect(result.getTime()).toBeGreaterThanOrEqual(expected);
expect(result.getTime()).toBeLessThanOrEqual(after + 1000000);
});

it('should throw error for negative number', () => {
expect(() => parseDurationToDate(-1000)).toThrow(
'Invalid duration: -1000. Expected a non-negative finite number of milliseconds.'
);
});

it('should throw error for NaN', () => {
expect(() => parseDurationToDate(NaN)).toThrow(
'Invalid duration: NaN. Expected a non-negative finite number of milliseconds.'
);
});

it('should throw error for Infinity', () => {
expect(() => parseDurationToDate(Infinity)).toThrow(
'Invalid duration: Infinity. Expected a non-negative finite number of milliseconds.'
);
});

it('should throw error for -Infinity', () => {
expect(() => parseDurationToDate(-Infinity)).toThrow(
'Invalid duration: -Infinity. Expected a non-negative finite number of milliseconds.'
);
});
});

describe('Date objects', () => {
it('should return Date instance directly', () => {
const targetTime = Date.now() + 60000;
const futureDate = new Date(targetTime);
const result = parseDurationToDate(futureDate);
expect(result).toBe(futureDate);
expect(result.getTime()).toBe(targetTime);
});

it('should handle past dates', () => {
const targetTime = Date.now() - 60000;
const pastDate = new Date(targetTime);
const result = parseDurationToDate(pastDate);
expect(result).toBe(pastDate);
expect(result.getTime()).toBe(targetTime);
});

it('should handle date-like objects from deserialization', () => {
const targetTime = Date.now() + 30000;
const dateLike = {
getTime: () => targetTime,
};
const result = parseDurationToDate(dateLike as any);
expect(result.getTime()).toBe(targetTime);
expect(result instanceof Date).toBe(true);
});
});

describe('invalid inputs', () => {
it('should throw error for null', () => {
expect(() => parseDurationToDate(null as any)).toThrow(
'Invalid duration parameter. Expected a duration string, number (milliseconds), or Date object.'
);
});

it('should throw error for undefined', () => {
expect(() => parseDurationToDate(undefined as any)).toThrow(
'Invalid duration parameter. Expected a duration string, number (milliseconds), or Date object.'
);
});

it('should throw error for boolean', () => {
expect(() => parseDurationToDate(true as any)).toThrow(
'Invalid duration parameter. Expected a duration string, number (milliseconds), or Date object.'
);
});

it('should throw error for object without getTime', () => {
expect(() => parseDurationToDate({} as any)).toThrow(
'Invalid duration parameter. Expected a duration string, number (milliseconds), or Date object.'
);
});
});
});
88 changes: 0 additions & 88 deletions packages/core/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,3 @@
import type { StringValue } from 'ms';
import ms from 'ms';

export interface PromiseWithResolvers<T> {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (reason?: any) => void;
}

/**
* Polyfill for `Promise.withResolvers()`.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers
*/
export function withResolvers<T>(): PromiseWithResolvers<T> {
let resolve!: (value: T) => void;
let reject!: (reason?: any) => void;
const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
return { promise, resolve, reject };
}

/**
* Creates a lazily-evaluated, memoized version of the provided function.
*
* The returned object exposes a `value` getter that calls `fn` only once,
* caches its result, and returns the cached value on subsequent accesses.
*
* @typeParam T - The return type of the provided function.
* @param fn - The function to be called once and whose result will be cached.
* @returns An object with a `value` property that returns the memoized result of `fn`.
*/
export function once<T>(fn: () => T) {
const result = {
get value() {
const value = fn();
Object.defineProperty(result, 'value', { value });
return value;
},
};
return result;
}

/**
* Builds a workflow suspension log message based on the counts of steps, hooks, and waits.
* @param runId - The workflow run ID
Expand Down Expand Up @@ -107,46 +62,3 @@ export function getWorkflowRunStreamId(runId: string, namespace?: string) {
);
return `${streamId}_${encodedNamespace}`;
}

/**
* Parses a duration parameter (string, number, or Date) and returns a Date object
* representing when the duration should elapse.
*
* - For strings: Parses duration strings like "1s", "5m", "1h", etc. using the `ms` library
* - For numbers: Treats as milliseconds from now
* - For Date objects: Returns the date directly (handles both Date instances and date-like objects from deserialization)
*
* @param param - The duration parameter (StringValue, Date, or number of milliseconds)
* @returns A Date object representing when the duration should elapse
* @throws {Error} If the parameter is invalid or cannot be parsed
*/
export function parseDurationToDate(param: StringValue | Date | number): Date {
if (typeof param === 'string') {
const durationMs = ms(param);
if (typeof durationMs !== 'number' || durationMs < 0) {
throw new Error(
`Invalid duration: "${param}". Expected a valid duration string like "1s", "1m", "1h", etc.`
);
}
return new Date(Date.now() + durationMs);
} else if (typeof param === 'number') {
if (param < 0 || !Number.isFinite(param)) {
throw new Error(
`Invalid duration: ${param}. Expected a non-negative finite number of milliseconds.`
);
}
return new Date(Date.now() + param);
} else if (
param instanceof Date ||
(param &&
typeof param === 'object' &&
typeof (param as any).getTime === 'function')
) {
// Handle both Date instances and date-like objects (from deserialization)
return param instanceof Date ? param : new Date((param as any).getTime());
} else {
throw new Error(
`Invalid duration parameter. Expected a duration string, number (milliseconds), or Date object.`
);
}
}
3 changes: 2 additions & 1 deletion packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { runInContext } from 'node:vm';
import { ERROR_SLUGS } from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import type { Event, WorkflowRun } from '@workflow/world';
import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
Expand All @@ -20,7 +21,7 @@ import {
} from './symbols.js';
import * as Attribute from './telemetry/semantic-conventions.js';
import { trace } from './telemetry.js';
import { getWorkflowRunStreamId, withResolvers } from './util.js';
import { getWorkflowRunStreamId } from './util.js';
import { createContext } from './vm/index.js';
import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js';
import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/workflow/hook.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { type PromiseWithResolvers, withResolvers } from '@workflow/utils';
import type { HookReceivedEvent } from '@workflow/world';
import type { Hook, HookOptions } from '../create-hook.js';
import { EventConsumerResult } from '../events-consumer.js';
import { WorkflowSuspension } from '../global.js';
import { webhookLogger } from '../logger.js';
import type { WorkflowOrchestratorContext } from '../private.js';
import { hydrateStepReturnValue } from '../serialization.js';
import { type PromiseWithResolvers, withResolvers } from '../util.js';

export function createCreateHook(ctx: WorkflowOrchestratorContext) {
return function createHookImpl<T = any>(options: HookOptions = {}): Hook<T> {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/workflow/sleep.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { parseDurationToDate, withResolvers } from '@workflow/utils';
import type { StringValue } from 'ms';
import { EventConsumerResult } from '../events-consumer.js';
import { type WaitInvocationQueueItem, WorkflowSuspension } from '../global.js';
import type { WorkflowOrchestratorContext } from '../private.js';
import { parseDurationToDate, withResolvers } from '../util.js';

export function createSleep(ctx: WorkflowOrchestratorContext) {
return async function sleepImpl(
Expand Down
1 change: 1 addition & 0 deletions packages/utils/LICENSE.md
3 changes: 3 additions & 0 deletions packages/utils/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# @workflow/utils

Utility functions for [Workflow DevKit](https://useworkflow.dev).
Loading