Skip to content

Commit

Permalink
fix: lock middleware to follow the rest of the convention and typing
Browse files Browse the repository at this point in the history
  • Loading branch information
Plopix committed Nov 3, 2024
1 parent ef586a6 commit 570852e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 83 deletions.
22 changes: 14 additions & 8 deletions docs/src/content/docs/built-in-middlewares/lock.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@ As for any Middleware, you can use it by adding it to the `bus` instance.
```typescript
const commandBus = createCommandBus<CommandHandlerRegistry>();
commandBus.useLockMiddleware({
getLockKey: (envelope) => envelope.message.id,
}, {
adapter,
ttl: 500, // the default ttl for all messages
tick: 100, // the delay between each try to get the lock
timeout: 1000 // the maximum time to wait for the lock
}
)
adapter,
ttl: 500, // the default ttl for all messages
tick: 100, // the delay between each try to get the lock
timeout: 1000 // the maximum time to wait for the lock
getLockKey: async (envelope) => envelope.message.__type,
intents: {
createUser: {
getLockKey: async (envelope) => envelope.message.something,
timeout: 20,
ttl: 50,
tick: 10,
},
},
});
```

> Remember built-in middlewares are _intent_ aware, therefore you can customize the behavior per intent using the key `intents`.
Expand Down
31 changes: 13 additions & 18 deletions examples/shared/src/core/buses.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,22 @@ commandBus.useMockerMiddleware({
},
},
});
commandBus.useLockMiddleware(
{
getLockKey: (envelope) => {
return '12';
},
commandBus.useLockMiddleware({
adapter: {
acquire: async () => true,
release: async () => undefined,
},
{
adapter: {
acquire: async () => true,
release: async () => undefined,
},
timeout: 1000,
intents: {
createUser: {
timeout: 2000,
ttl: 500,
tick: 100,
},
timeout: 1000,
getLockKey: async (envelope) => envelope.message.__type,
intents: {
createUser: {
getLockKey: async (envelope) => envelope.message.email,
timeout: 2000,
ttl: 500,
tick: 100,
},
},
);
});

commandBus.useWebhookMiddleware({
async: true,
Expand Down
2 changes: 1 addition & 1 deletion libs/missive.js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"Sébastien Morel <[email protected]>",
"Anaël Chardan"
],
"version": "0.1.2",
"version": "0.2.0",
"type": "module",
"main": "./build/index.cjs",
"module": "./build/index.js",
Expand Down
34 changes: 18 additions & 16 deletions libs/missive.js/src/middlewares/lock-middleware.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { BusKinds, MessageRegistryType } from '../core/bus.js';
import { BusKinds, MessageRegistry, MessageRegistryType, TypedMessage } from '../core/bus.js';

import { Middleware } from '../core/middleware.js';
import { createInMemoryLockAdapter } from '../adapters/in-memory-lock-adapter.js';
import { Envelope } from '../core/envelope.js';

export type LockAdapter = {
acquire: (key: string, ttl: number) => Promise<boolean>;
Expand All @@ -14,30 +15,31 @@ type BasicOptions = {
tick?: number;
};

type Options<Def> = BasicOptions & {
adapter: LockAdapter;
intents?: Partial<Record<keyof Def, BasicOptions>>;
};

type Params<BusKind extends BusKinds, T extends MessageRegistryType<BusKind>> = {
getLockKey: (envelope: LockMiddlewareMessage<BusKind, T>) => string;
type Options<BusKind extends BusKinds, T extends MessageRegistryType<BusKind>> = BasicOptions & {
adapter?: LockAdapter;
getLockKey: (envelope: Envelope<TypedMessage<MessageRegistry<BusKind, T>>>) => Promise<string>;
intents?: {
[K in keyof T]?: BasicOptions & {
getLockKey?: (envelope: NarrowedEnvelope<BusKind, T, K>) => Promise<string>;
};
};
};

type LockMiddlewareMessage<BusKind extends BusKinds, T extends MessageRegistryType<BusKind>> = Parameters<
Middleware<BusKind, T>
>[0];
type NarrowedEnvelope<BusKind extends BusKinds, T extends MessageRegistryType<BusKind>, K extends keyof T> = Envelope<
TypedMessage<MessageRegistry<BusKind, Pick<T, K>>>
>;

export function createLockMiddleware<BusKind extends BusKinds, T extends MessageRegistryType<BusKind>>(
{ getLockKey }: Params<BusKind, T>,
options: Partial<Options<T>> = {},
options: Options<BusKind, T>,
): Middleware<BusKind, T> {
const adapter = options.adapter ?? createInMemoryLockAdapter();

return async (envelope, next) => {
const type = envelope.message.__type;
const type = envelope.message.__type as keyof T;
const ttl = options.intents?.[type]?.ttl ?? options.ttl ?? 500;
const tick = options.intents?.[type]?.tick ?? options.tick ?? 100;
const lockKey = getLockKey(envelope);
const getLockKey = options.intents?.[type]?.getLockKey ?? options.getLockKey;
const lockKey = await getLockKey(envelope);

async function doUnderLock(timeout: number) {
const isAcquired = await adapter.acquire(lockKey, ttl);
if (isAcquired) {
Expand Down
64 changes: 24 additions & 40 deletions libs/missive.js/tests/lock-middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ describe('createLockMiddleware', () => {
});

it('work when everything is working', async () => {
const middleware = createLockMiddleware<'command', MessageRegistry>(
{
getLockKey: (e) => {
return e.message.id.toString();
},
const middleware = createLockMiddleware<'command', MessageRegistry>({
getLockKey: async (e) => {
return e.message.id.toString();
},
{
adapter,
timeout: 0,
},
);
adapter,
timeout: 0,
});

(adapter.acquire as ReturnType<typeof vi.fn>).mockResolvedValue(true);

Expand All @@ -54,17 +50,13 @@ describe('createLockMiddleware', () => {
expect(adapter.release).toHaveBeenCalledOnce();
});
it('should throw an error if next is throwing an error', async () => {
const middleware = createLockMiddleware<'command', MessageRegistry>(
{
getLockKey: (e) => {
return e.message.id.toString();
},
},
{
adapter,
timeout: 0,
const middleware = createLockMiddleware<'command', MessageRegistry>({
getLockKey: async (e) => {
return e.message.id.toString();
},
);
adapter,
timeout: 0,
});

(adapter.acquire as ReturnType<typeof vi.fn>).mockResolvedValue(true);
(next as ReturnType<typeof vi.fn>).mockRejectedValue(new Error('Test Error'));
Expand All @@ -75,34 +67,26 @@ describe('createLockMiddleware', () => {
});

it('should throw an error if the lock is not acquired and there is no timeout', async () => {
const middleware = createLockMiddleware<'command', MessageRegistry>(
{
getLockKey: (e) => {
return e.message.id.toString();
},
const middleware = createLockMiddleware<'command', MessageRegistry>({
getLockKey: async (e) => {
return e.message.id.toString();
},
{
adapter,
timeout: 0,
},
);
adapter,
timeout: 0,
});
(adapter.acquire as ReturnType<typeof vi.fn>).mockResolvedValue(false);

await expect(middleware(envelope, next)).rejects.toThrow('Lock not acquired or timeout');
});

it('should retry to get the lock', async () => {
const middleware = createLockMiddleware<'command', MessageRegistry>(
{
getLockKey: (e) => {
return e.message.id.toString();
},
},
{
adapter,
timeout: 200,
const middleware = createLockMiddleware<'command', MessageRegistry>({
getLockKey: async (e) => {
return e.message.id.toString();
},
);
adapter,
timeout: 200,
});
(adapter.acquire as ReturnType<typeof vi.fn>).mockResolvedValueOnce(false).mockResolvedValueOnce(true);

await middleware(envelope, next);
Expand Down

0 comments on commit 570852e

Please sign in to comment.