Skip to content

Commit

Permalink
Abstract key-value store and message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Mar 29, 2024
1 parent 5a398a9 commit 326a5ab
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 64 deletions.
24 changes: 24 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,30 @@ Version 0.5.0

To be released.

- Abstract key-value store for caching.

- Added `KvStore` interface.
- Added `KvStoreSetOptions` interface.
- Added `KvKey` type.
- Added `MemoryKvStore` class.
- `KvCacheParameters.kv` option now accepts a `KvStore` instead of
`Deno.Kv`.
- `KvCacheParameters.prefix` option now accepts a `KvKey` instead of
`Deno.KvKey`.
- `FederationParameters.kv` option now accepts a `KvStore` instead of
`Deno.Kv`.
- `FederationKvPrefixes.activityIdempotence` option now accepts a `KvKey`
instead of `Deno.KvKey`.
- `FederationKvPrefixes.remoteDocument` option now accepts a `KvKey`
instead of `Deno.KvKey`.

- Abstract message queue for outgoing activities.

- Added `MessageQueue` interface.
- Added `MessageQueueEnqueueOptions` interface.
- Added `InProcessMessageQueue` class.
- Added `FederationParameters.queue` option.


Version 0.4.0
-------------
Expand Down
23 changes: 18 additions & 5 deletions docs/manual/federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ properties. Some of them are required:

### `kv`

*Required.* The `~FederationParameters.kv` property is a [`Deno.Kv`] instance
*Required.* The `~FederationParameters.kv` property is a `KvStore` instance
that the `Federation` object uses to store several kinds of cache data and
to maintain the queue of outgoing activities. Usually instantiated by
calling the [`Deno.openKv()`] function.
to maintain the queue of outgoing activities.

[`Deno.Kv`]: https://deno.land/api?unstable&s=Deno.Kv
[`Deno.openKv()`]: https://deno.land/api?unstable&s=Deno.openKv
`KvStore` is an abstract interface that represents a key-value store.
Currently, there is only one implementation of `KvStore`, which is the
`MemoryKvStore` class, but you can define your own `KvStore` implementation
if you want to use a different key-value store.

### `kvPrefixes`

Expand All @@ -62,6 +63,18 @@ that the `Federation` object uses:
: The key prefix used for storing remote JSON-LD documents.
`["_fedify", "remoteDocument"]` by default.

### `queue`

The `~FederationParameters.queue` property is a `MessageQueue` instance that
the `Federation` object uses to maintain the queue of outgoing activities.
If you don't provide this option, activities will not be queued and will
be sent immediately.

`MessageQueue` is an abstract interface that represents a message queue.
Currently, there is only one implementation of `MessageQueue`, which is the
`InProcessMessageQueue` class, but you can define your own `MessageQueue`
implementation if you want to use a different message queue.

### `documentLoader`

A JSON-LD document loader function that the `Federation` object uses to
Expand Down
15 changes: 10 additions & 5 deletions examples/blog/federation/mod.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { Temporal } from "@js-temporal/polyfill";
import { parse } from "@std/semver";
import { dirname, join } from "@std/path";
import { Federation } from "@fedify/fedify/federation";
import {
Federation,
InProcessMessageQueue,
MemoryKvStore,
} from "@fedify/fedify/federation";
import {
Accept,
Activity,
Expand All @@ -23,15 +27,16 @@ import {
getFollowers,
removeFollower,
} from "../models/follower.ts";
import { openKv } from "../models/kv.ts";
import { countPosts, getPosts, toArticle } from "../models/post.ts";

// The `Federation<TContextData>` object is a registry that registers
// federation-related callbacks:
export const federation = new Federation<void>({
// The following Deno KV storage is used for several purposes, such as
// cache and outbox queue:
kv: await openKv(),
// The following key-value storage is used for internal cache:
kv: new MemoryKvStore(),

// The following message queue is used for maintaining outgoing activities:
queue: new InProcessMessageQueue(),

// The following option is useful for local development, as Fresh's dev
// server does not support HTTPS:
Expand Down
5 changes: 3 additions & 2 deletions examples/blog/import_map.g.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"imports": {
"@cfworker/json-schema": "npm:@cfworker/json-schema@^1.12.8",
"@deno/dnt": "jsr:@deno/dnt@^0.41.1",
"@fedify/fedify": "../../mod.ts",
"@fedify/fedify/federation": "../../federation/mod.ts",
"@fedify/fedify/httpsig": "../../httpsig/mod.ts",
Expand All @@ -16,8 +17,8 @@
"@std/collections": "jsr:@std/collections@^0.220.1",
"@std/encoding": "jsr:@std/encoding@^0.220.1",
"@std/encoding/base64": "jsr:@std/encoding@^0.220.1/base64",
"@std/http": "jsr:@std/http@^0.220.1",
"@std/json": "jsr:@std/json@^0.220.1",
"@std/http/negotiation": "jsr:@std/http@^0.220.1/negotiation",
"@std/json/common": "jsr:@std/json@^0.220.1/common",
"@std/path": "jsr:@std/path@^0.220.1",
"@std/semver": "jsr:@std/semver@^0.220.1",
"@std/testing": "jsr:@std/testing@^0.220.1",
Expand Down
14 changes: 9 additions & 5 deletions federation/handler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Temporal } from "@js-temporal/polyfill";
import { accepts } from "@std/http/negotiation";
import { doesActorOwnKey, verify } from "../httpsig/mod.ts";
import { DocumentLoader } from "../runtime/docloader.ts";
Expand All @@ -17,6 +18,7 @@ import {
InboxListener,
} from "./callback.ts";
import { RequestContext } from "./context.ts";
import { KvKey, KvStore } from "./kv.ts";

export function acceptsJsonLd(request: Request): boolean {
const types = accepts(request);
Expand Down Expand Up @@ -210,8 +212,8 @@ export async function handleCollection<
export interface InboxHandlerParameters<TContextData> {
handle: string | null;
context: RequestContext<TContextData>;
kv: Deno.Kv;
kvPrefix: Deno.KvKey;
kv: KvStore;
kvPrefix: KvKey;
actorDispatcher?: ActorDispatcher<TContextData>;
inboxListeners: Map<
new (...args: unknown[]) => Activity,
Expand Down Expand Up @@ -276,10 +278,12 @@ export async function handleInbox<TContextData>(
headers: { "Content-Type": "text/plain; charset=utf-8" },
});
}
const cacheKey = activity.id == null ? null : [...kvPrefix, activity.id.href];
const cacheKey = activity.id == null
? null
: [...kvPrefix, activity.id.href] satisfies KvKey;
if (cacheKey != null) {
const cached = await kv.get(cacheKey);
if (cached != null && cached.value === true) {
if (cached === true) {
return new Response(
`Activity <${activity.id}> has already been processed.`,
{
Expand Down Expand Up @@ -330,7 +334,7 @@ export async function handleInbox<TContextData>(
});
}
if (cacheKey != null) {
await kv.set(cacheKey, true, { expireIn: 1000 * 60 * 60 * 24 });
await kv.set(cacheKey, true, { ttl: Temporal.Duration.from({ days: 1 }) });
}
return new Response("", {
status: 202,
Expand Down
93 changes: 93 additions & 0 deletions federation/kv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { Temporal } from "@js-temporal/polyfill";

/**
* A key for a key-value store. An array of one or more strings.
*/
export type KvKey = readonly [string] | readonly [string, ...string[]];

/**
* Additional options for setting a value in a key-value store.
*/
export interface KvStoreSetOptions {
/**
* The time-to-live (TTL) for the value.
*/
ttl?: Temporal.Duration;
}

/**
* An abstract interface for a key-value store.
*/
export interface KvStore {
/**
* Gets the value for the given key.
* @param key The key to get the value for.
* @returns The value for the key, or `undefined` if the key does not exist.
* @typeParam T The type of the value to get.
*/
get<T = unknown>(key: KvKey): Promise<T | undefined>;

/**
* Sets the value for the given key.
* @param key The key to set the value for.
* @param value The value to set.
* @param options Additional options for setting the value.
*/
set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise<void>;

/**
* Deletes the value for the given key.
* @param key The key to delete.
*/
delete(key: KvKey): Promise<void>;
}

/**
* A key-value store that stores values in memory.
* Do not use this in production as it does not persist values.
*/
export class MemoryKvStore implements KvStore {
#values: Record<string, [unknown, null | Temporal.Instant]> = {};

#encodeKey(key: KvKey): string {
return JSON.stringify(key);
}

/**
* {@inheritDoc KvStore.get}
*/
get<T = unknown>(key: KvKey): Promise<T | undefined> {
const encodedKey = this.#encodeKey(key);
const entry = this.#values[encodedKey];
if (entry == null) return Promise.resolve(undefined);
const [value, expiration] = entry;
if (
expiration != null && Temporal.Now.instant().until(expiration).sign < 0
) {
delete this.#values[encodedKey];
return Promise.resolve(undefined);
}
return Promise.resolve(value as T | undefined);
}

/**
* {@inheritDoc KvStore.set}
*/
set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise<void> {
const encodedKey = this.#encodeKey(key);
const expiration = options?.ttl == null
? null
: Temporal.Now.instant().add(options.ttl.round({ largestUnit: "hour" }));
this.#values[encodedKey] = [value, expiration];
return Promise.resolve();
}

/**
* {@inheritDoc KvStore.delete}
*/
delete(key: KvKey): Promise<void> {
const encodedKey = this.#encodeKey(key);
delete this.#values[encodedKey];
return Promise.resolve();
}
}
8 changes: 3 additions & 5 deletions federation/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import { mockDocumentLoader } from "../testing/docloader.ts";
import { privateKey2, publicKey2 } from "../testing/keys.ts";
import { Create, Person } from "../vocab/vocab.ts";
import { Context } from "./context.ts";
import { MemoryKvStore } from "./kv.ts";
import { Federation } from "./middleware.ts";
import { RouterError } from "./router.ts";

Deno.test("Federation.createContext()", async (t) => {
const kv = await Deno.openKv(":memory:");
const kv = new MemoryKvStore();
const documentLoader = (url: string) => {
throw new FetchError(new URL(url), "Not found");
};
Expand Down Expand Up @@ -165,12 +166,10 @@ Deno.test("Federation.createContext()", async (t) => {
assertEquals(ctx.url, new URL("https://example.com/"));
assertEquals(ctx.data, 123);
});

kv.close();
});

Deno.test("Federation.setInboxListeners()", async (t) => {
const kv = await Deno.openKv(":memory:");
const kv = new MemoryKvStore();

mf.install();

Expand Down Expand Up @@ -364,5 +363,4 @@ Deno.test("Federation.setInboxListeners()", async (t) => {
});

mf.uninstall();
kv.close();
});
Loading

0 comments on commit 326a5ab

Please sign in to comment.