diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..63f3a4e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +export.ndjson diff --git a/README.md b/README.md index 8eac4de..f05d681 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,65 @@ const value = { a: new Map([[{ a: 1 }, { b: /234/ }]]), b: false }; assertEquals(estimateSize(value), 36); ``` +## Importing and exporting entries + +Deno KV stores can be exported and imported. This is useful for backing up and +restoring data, as well as for transferring data between different Deno +processes. + +The import and export utilities are: + +- `exportEntries` - Export entries from a Deno KV store as a stream or a + response. +- `importEntries` - Import entries into a Deno KV store. + +### Examples + +Exporting entries from a Deno KV store and saving them to a file: + +```ts +import { exportEntries } from "@deno/kv-utils"; + +const db = await Deno.openKv(); +const file = await Deno.open("export.ndjson", { write: true, create: true }); +for await (const chunk of exportEntries(db, { prefix: ["person"] })) { + await file.write(chunk); +} +file.close(); +db.close(); +``` + +Exporting entries from a Deno KV store and sending them as a response: + +```ts ignore +import { exportEntries } from "@deno/kv-utils"; + +const db = await Deno.openKv(); +const server = Deno.serve((_req) => + exportEntries( + db, + { prefix: ["person"] }, + { type: "response" }, + ) +); + +await server.finished; +db.close(); +``` + +Importing entries from a file and storing them in a Deno KV store: + +```ts +import { importEntries } from "@deno/kv-utils"; +import { assert } from "@std/assert"; + +const db = await Deno.openKv(); +const file = await Deno.open("export.ndjson", { read: true }); +const result = await importEntries(db, file.readable); +assert(result.errors === 0); +db.close(); +``` + # License MIT diff --git a/_test_util.ts b/_test_util.ts new file mode 100644 index 0000000..7f51cc1 --- /dev/null +++ b/_test_util.ts @@ -0,0 +1,27 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assert } from "@std/assert"; + +let kv: { close(): void } | undefined; +let path: string | undefined; + +/** + * Creates a temporary `Deno.Kv` instance and returns it. + * + * @returns the temporary `Deno.Kv` instance + */ +export async function setup() { + return kv = await Deno.openKv(path = `${await Deno.makeTempDir()}/test.db`); +} + +/** + * Closes the temporary `Deno.Kv` instance and removes the temporary store. + * + * @returns the promise which resolves when the temporary store is removed + */ +export function teardown() { + assert(kv); + kv.close(); + assert(path); + return Deno.remove(path); +} diff --git a/deno.json b/deno.json index 88941a6..202943f 100644 --- a/deno.json +++ b/deno.json @@ -3,20 +3,22 @@ "version": "0.0.0", "exports": { ".": "./mod.ts", - "./json": "./json.ts", - "./estimate-size": "./estimate_size.ts" + "./estimate-size": "./estimate_size.ts", + "./import-export": "./import_export.ts", + "./json": "./json.ts" }, "publish": { - "exclude": [".github", "_benches", "*_test.ts"] + "exclude": [".github", "_benches", "*.test.ts", "_test_util.ts"] }, "imports": { "@std/assert": "jsr:@std/assert@^1.0.6", + "@std/bytes": "jsr:@std/bytes@^1.0.3", "@std/crypto": "jsr:@std/crypto@^1.0.3", "@std/encoding": "jsr:@std/encoding@^1.0.5" }, "tasks": { "bench": "deno bench _benches/*.ts", - "test": "deno test --allow-net --unstable-kv --doc" + "test": "deno test --allow-net --allow-write --allow-read --unstable-kv --doc" }, "lint": { "rules": { diff --git a/deno.lock b/deno.lock index 4a9fb34..12b0419 100644 --- a/deno.lock +++ b/deno.lock @@ -3,6 +3,7 @@ "specifiers": { "jsr:@denostack/superserial@0.3.5": "0.3.5", "jsr:@std/assert@^1.0.6": "1.0.6", + "jsr:@std/bytes@^1.0.3": "1.0.3", "jsr:@std/crypto@^1.0.3": "1.0.3", "jsr:@std/encoding@^1.0.5": "1.0.5", "jsr:@std/internal@^1.0.4": "1.0.4", @@ -18,6 +19,9 @@ "jsr:@std/internal" ] }, + "@std/bytes@1.0.3": { + "integrity": "e5d5b9e685966314e4edb4be60dfc4bd7624a075bfd4ec8109252b4320f76452" + }, "@std/crypto@1.0.3": { "integrity": "a2a32f51ddef632d299e3879cd027c630dcd4d1d9a5285d6e6788072f4e51e7f" }, @@ -42,6 +46,7 @@ "workspace": { "dependencies": [ "jsr:@std/assert@^1.0.6", + "jsr:@std/bytes@^1.0.3", "jsr:@std/crypto@^1.0.3", "jsr:@std/encoding@^1.0.5" ] diff --git a/import_export.test.ts b/import_export.test.ts new file mode 100644 index 0000000..7241921 --- /dev/null +++ b/import_export.test.ts @@ -0,0 +1,298 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assert, assertEquals, assertStrictEquals } from "@std/assert"; +import { concat } from "@std/bytes"; + +import { setup, teardown } from "./_test_util.ts"; + +import { exportEntries, importEntries, ImportError } from "./import_export.ts"; + +const decoder = new TextDecoder(); +const encoder = new TextEncoder(); + +Deno.test({ + name: "exportEntries - bytes", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100n); + await kv.set(["b"], new Uint8Array([1, 2, 3])); + let u8 = new Uint8Array(); + for await (const chunk of exportEntries(kv, { prefix: [] })) { + u8 = concat([u8, chunk]); + } + const actual = decoder.decode(u8); + assertEquals(actual.split("\n").length, 3); + assert( + actual.startsWith( + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":`, + ), + ); + return teardown(); + }, +}); + +Deno.test({ + name: "exportEntries - string", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100n); + await kv.set(["b"], new Uint8Array([1, 2, 3])); + const chunks: string[] = []; + for await ( + const chunk of exportEntries(kv, { prefix: [] }, { type: "string" }) + ) { + assert(typeof chunk === "string"); + chunks.push(chunk); + } + assertEquals(chunks.length, 2); + assert( + chunks[0].startsWith( + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":`, + ), + ); + return teardown(); + }, +}); + +Deno.test({ + name: "exportEntries - Response - no filename", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100n); + await kv.set(["b"], new Uint8Array([1, 2, 3])); + const response = exportEntries(kv, { prefix: [] }, { type: "response" }); + assertEquals(response.headers.get("content-type"), "application/x-ndjson"); + assertEquals(response.headers.get("content-disposition"), null); + let u8 = new Uint8Array(); + for await (const chunk of exportEntries(kv, { prefix: [] })) { + u8 = concat([u8, chunk]); + } + const actual = decoder.decode(u8); + assertEquals(actual.split("\n").length, 3); + assert( + actual.startsWith( + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":`, + ), + ); + return teardown(); + }, +}); + +Deno.test({ + name: "exportToResponse - filename", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100n); + await kv.set(["b"], new Uint8Array([1, 2, 3])); + const response = exportEntries(kv, { prefix: [] }, { + type: "response", + filename: "export", + }); + assertEquals(response.headers.get("content-type"), "application/x-ndjson"); + assertEquals( + response.headers.get("content-disposition"), + `attachment; filename="export.ndjson"`, + ); + let u8 = new Uint8Array(); + for await (const chunk of exportEntries(kv, { prefix: [] })) { + u8 = concat([u8, chunk]); + } + const actual = decoder.decode(u8); + assertEquals(actual.split("\n").length, 3); + assert( + actual.startsWith( + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":`, + ), + ); + return teardown(); + }, +}); + +const fixture = + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"} +{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"} +`; + +Deno.test({ + name: "importEntries() - no prefix", + async fn() { + const kv = await setup(); + const result = await importEntries(kv, fixture); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - no overwrite", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100); + const result = await importEntries(kv, fixture); + assertEquals(result, { count: 2, skipped: 1, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - overwrite", + async fn() { + const kv = await setup(); + await kv.set(["a"], 100); + const result = await importEntries(kv, fixture, { overwrite: true }); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - data as stream", + async fn() { + const kv = await setup(); + const result = await importEntries(kv, new Blob([fixture]).stream()); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - data as Blob", + async fn() { + const kv = await setup(); + const result = await importEntries(kv, new Blob([fixture])); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - data as Uint8Array", + async fn() { + const kv = await setup(); + const result = await importEntries(kv, encoder.encode(fixture)); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - data as ArrayBuffer", + async fn() { + const kv = await setup(); + const result = await importEntries(kv, encoder.encode(fixture).buffer); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals((await kv.get(["a"])).value, 100n); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - onProgress", + async fn() { + const kv = await setup(); + const progress: [number, number, number][] = []; + const result = await importEntries(kv, fixture, { + onProgress(count, skipped, errors) { + progress.push([count, skipped, errors]); + }, + }); + assertEquals(result, { count: 2, skipped: 0, errors: 0 }); + assertEquals(progress, [ + [1, 0, 0], + [2, 0, 0], + ]); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - with errors", + async fn() { + const kv = await setup(); + const fixture = + `{key:[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"} +{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"} +`; + const result = await importEntries(kv, fixture); + assertEquals(result, { count: 2, skipped: 0, errors: 1 }); + assertEquals((await kv.get(["a"])).value, null); + assertEquals((await kv.get(["b"])).value, true); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - on error", + sanitizeResources: false, + async fn() { + const kv = await setup(); + const fixture = + `{key:[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"} +{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"} +`; + const errors: ImportError[] = []; + const result = await importEntries(kv, fixture, { + onError(error) { + errors.push(error); + }, + }); + assertEquals(result, { count: 2, skipped: 0, errors: 1 }); + assertEquals((await kv.get(["a"])).value, null); + assertEquals((await kv.get(["b"])).value, true); + assertEquals(errors.length, 1); + assert(errors[0] instanceof ImportError); + assert(errors[0].cause instanceof SyntaxError); + assertEquals(errors[0].count, 1); + assertEquals(errors[0].errors, 1); + assertEquals( + errors[0].json, + `{key:[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"}`, + ); + assertStrictEquals(errors[0].db, kv); + assertEquals(errors[0].skipped, 0); + return teardown(); + }, +}); + +Deno.test({ + name: "importEntries() - error throws", + sanitizeResources: false, + async fn() { + const kv = await setup(); + const fixture = + `{key:[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"} +{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"} +`; + let thrown = false; + try { + await importEntries(kv, fixture, { throwOnError: true }); + } catch (error) { + thrown = true; + assert(error instanceof ImportError); + assert(error.cause instanceof SyntaxError); + assertEquals(error.count, 1); + assertEquals(error.errors, 1); + assertEquals( + error.json, + `{key:[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"}`, + ); + assertStrictEquals(error.db, kv); + assertEquals(error.skipped, 0); + } + assert(thrown); + return teardown(); + }, +}); diff --git a/import_export.ts b/import_export.ts new file mode 100644 index 0000000..b8827bb --- /dev/null +++ b/import_export.ts @@ -0,0 +1,513 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +/** + * Allows for the import and export of data from a {@linkcode Deno.Kv} store. + * + * ## Exporting Data + * + * Data can be exported from a {@linkcode Deno.Kv} store using the + * {@linkcode exportEntries} function. This function will return a stream of + * newline-delimited JSON records that can be consumed by a client. The exported + * data can be returned as a stream of bytes, a stream of strings or as a + * {@linkcode Response} with the exported data as the body of the response. + * + * ### Example: Exporting Data as a Stream of Bytes + * + * ```ts + * import { exportEntries } from "@deno/kv-utils/import-export"; + * import { assert } from "@std/assert"; + * + * const db = await Deno.openKv(); + * const stream = exportEntries(db, { prefix: ["person"] }); + * for await (const chunk of stream) { + * assert(chunk.byteLength); + * } + * db.close(); + * ``` + * + * ### Example: Exporting Data as a Stream of Strings + * + * ```ts + * import { exportEntries } from "@deno/kv-utils/import-export"; + * import { assert } from "@std/assert"; + * + * const db = await Deno.openKv(); + * const stream = exportEntries(db, { prefix: ["person"] }, { type: "string" }); + * for await (const chunk of stream) { + * assert(typeof chunk === "string"); + * } + * db.close(); + * ``` + * + * ### Example: Exporting Data as a Response + * + * ```ts ignore + * import { exportEntries } from "@deno/kv-utils/import-export"; + * + * const db = await Deno.openKv(); + * const server = Deno.serve((_req) => exportEntries( + * db, + * { prefix: ["person"] }, + * { type: "response" } + * )); + * + * await server.finished; + * db.close(); + * ``` + * + * ## Importing Data + * + * Data can be imported into a {@linkcode Deno.Kv} store using the + * {@linkcode importEntries} function. This function will read a stream of + * newline-delimited JSON records and import them into the store. The import + * process can be controlled with options to overwrite existing entries, provide + * a prefix for the imported keys, and to handle errors that occur during the + * import process. + * + * ### Example: Importing Data from a Byte Array + * + * ```ts + * import { importEntries } from "@deno/kv-utils/import-export"; + * import { assert } from "@std/assert"; + * + * const db = await Deno.openKv(); + * const data = new TextEncoder().encode('{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"}\n'); + * const result = await importEntries(db, data); + * assert(result.count === 1); + * db.close(); + * ``` + * + * @module + */ + +import { entryToJSON, type KvEntryJSON, toKey, toValue } from "./json.ts"; +import { LinesTransformStream } from "./line_transform_stream.ts"; + +/** + * Options which can be set when calling {@linkcode exportEntries} to export + * entries as a stream of string records. + */ +export interface ExportEntriesOptionsString extends Deno.KvListOptions { + /** + * Determines if the store should be closed after the export is complete. + * + * @default false + */ + close?: boolean | undefined; + /** + * The type of export to perform. Where `"bytes"` is the default, this option + * can be set to `"string"` to export each entry as a stringified JSON object + * followed by a newline character or `"response"` to return a + * {@linkcode Response} with the exported data as the body of the + * {@linkcode Response}. + * + * @default "bytes" + */ + type: "string"; +} + +/** + * Options which can be set when calling {@linkcode exportEntries} to export + * entries as a {@linkcode Response} with the exported data as the body of the + * {@linkcode Response}. + */ +export interface ExportEntriesOptionsResponse extends Deno.KvListOptions { + /** + * Determines if the store should be closed after the export is complete. + * + * @default false + */ + close?: boolean | undefined; + /** + * The type of export to perform. Where `"bytes"` is the default, this option + * can be set to `"string"` to export each entry as a stringified JSON object + * followed by a newline character or `"response"` to return a + * {@linkcode Response} with the exported data as the body of the + * {@linkcode Response}. + * + * @default "bytes" + */ + type: "response"; + /** + * The filename to use when exporting the data. This is used to set the + * `Content-Disposition` header in the response which suggests a filename to + * the client. + */ + filename?: string | undefined; +} + +/** + * Options which can be set when calling {@linkcode exportEntries} to export + * entries as a stream of bytes (`Uint8Array` chunks). + * + * This is the default format for exporting entries. + */ +export interface ExportEntriesOptionsBytes extends Deno.KvListOptions { + /** + * Determines if the store should be closed after the export is complete. + * + * @default false + */ + close?: boolean | undefined; + /** + * The type of export to perform. Where `"bytes"` is the default, this option + * can be set to `"string"` to export each entry as a stringified JSON object + * followed by a newline character or `"response"` to return a + * {@linkcode Response} with the exported data as the body of the + * {@linkcode Response}. + * + * @default "bytes" + */ + type?: "bytes" | undefined; +} + +/** + * Options which can be set when calling {@linkcode exportEntries}. + */ +export type ExportEntriesOptions = + | ExportEntriesOptionsString + | ExportEntriesOptionsResponse + | ExportEntriesOptionsBytes; + +/** + * Options which are supplied when creating an {@linkcode ImportError}. + * + * @private + */ +interface ImportErrorOptions extends ErrorOptions { + count: number; + db: Deno.Kv; + errors: number; + json?: string; + skipped: number; +} + +/** + * Options which can be set when calling {@linkcode importEntries}. + */ +export interface ImportEntriesOptions { + /** + * Determines what happens when a key already exists in the target store for + * an entry being being import. By default the entry will be skipped. Setting + * the `overwrite` option to `true` will cause any existing value to be + * overwritten with the imported value. + */ + overwrite?: boolean; + /** + * An optional callback which occurs when an error is encountered when + * importing entries. The supplied error will provide details about what was + * occurring. + * + * See {@linkcode ImportError} for more details. + */ + onError?: (error: ImportError) => void; + /** + * An optional callback which occurs every time an entry has been successfully + * processed, providing an update of the number of entries processed, the + * number of those that were skipped and the number of those that errored. + */ + onProgress?: (count: number, skipped: number, errors: number) => void; + /** + * The prefix which should be prepended to the front of each entry key when + * importing. This makes it useful to "namespace" imported data. For example + * if you were bring in a data set of people, you might supply the + * {@linkcode Deno.KvKey} of `["person"]`. The imported entry key of `[1]` + * would then become `["person", 1]`. + */ + prefix?: Deno.KvKey; + /** + * Used to stop the import process. When the signal is aborted, the current + * import entry will be completed and then the function will return. + */ + signal?: AbortSignal; + /** + * By default, {@linkcode importEntries} will not throw on errors that occur + * while processing the import data, but just increment the `errors` value + * and call the `onError()` callback if provided. + * + * By setting this to `true`, an {@linkcode ImportError} will be thrown when + * an error is encountered and terminate the import process. + */ + throwOnError?: boolean; +} + +/** + * The result returned from calling {@linkcode importEntries}. + */ +export interface ImportEntriesResult { + /** If set, the import process was aborted prior to completing. */ + aborted?: true; + /** The number of entries read from the input data. */ + count: number; + /** + * The number of entries skipped from the input data. Entries are skipped + * if a matching entry key is already present in the target, unless the + * `overwrite` option is set to `true`. + */ + skipped: number; + /** The number of entries that errored while processing the data. */ + errors: number; +} + +/** The filename extension for NDJSON files. */ +const EXT_NDJSON = ".ndjson"; + +/** The media type for NDJSON which is a newline-delimited JSON format. */ +export const MEDIA_TYPE_NDJSON = "application/x-ndjson"; +/** The media type for JSONL which is compatible with NDJSON. */ +export const MEDIA_TYPE_JSONL = "application/jsonl"; +/** The media type for JSON Lines which is compatible with NDJSON. */ +export const MEDIA_TYPE_JSON_LINES = "application/json-lines"; + +const encoder = new TextEncoder(); + +/** + * Exports entries from a {@linkcode Deno.Kv} store as a {@linkcode Response} + * where the body of the response is a stream of newline-delimited JSON records + * that match the provided selector. + * + * @param db The {@linkcode Deno.Kv} store to export entries from. + * @param selector A selector that selects the range of data returned by a list + * operation on a {@linkcode Deno.Kv}. + * @param options Options which can be set + */ +export function exportEntries( + db: Deno.Kv, + selector: Deno.KvListSelector, + options: ExportEntriesOptionsResponse, +): Response; +/** + * Exports entries from a {@linkcode Deno.Kv} store as a stream of newline- + * delimited JSON strings that match the provided selector. + * + * @param db The {@linkcode Deno.Kv} store to export entries from. + * @param selector A selector that selects the range of data returned by a list + * operation on a {@linkcode Deno.Kv}. + * @param options Options which can be set + */ +export function exportEntries( + db: Deno.Kv, + selector: Deno.KvListSelector, + options: ExportEntriesOptionsString, +): ReadableStream; +/** + * Exports entries from a {@linkcode Deno.Kv} store as a stream of newline- + * delimited JSON records encoded as bytes (`Uint8Array` chunks) that match the + * provided selector. + * + * @param db The {@linkcode Deno.Kv} store to export entries from. + * @param selector A selector that selects the range of data returned by a list + * operation on a {@linkcode Deno.Kv}. + * @param options Options which can be set + */ +export function exportEntries( + db: Deno.Kv, + selector: Deno.KvListSelector, + options?: ExportEntriesOptions, +): ReadableStream; +export function exportEntries( + db: Deno.Kv, + selector: Deno.KvListSelector, + options: ExportEntriesOptions = {}, +): ReadableStream | Response { + const text = options.type === "string"; + let cancelled = false; + const stream = new ReadableStream({ + async start(controller) { + try { + for await (const entry of db.list(selector, options)) { + const chunk = entryToJSON(entry); + controller.enqueue( + text + ? `${JSON.stringify(chunk)}\n` + : encoder.encode(`${JSON.stringify(chunk)}\n`), + ); + if (cancelled) { + return; + } + } + if (options.close) { + db.close(); + } + controller.close(); + } catch (error) { + controller.error(error); + } + }, + cancel(_reason) { + cancelled = true; + }, + }); + if (options.type === "response") { + const init = { + headers: { "content-type": MEDIA_TYPE_NDJSON } as Record, + }; + if (options.filename) { + init.headers["content-disposition"] = + `attachment; filename="${options.filename}${EXT_NDJSON}"`; + } + return new Response(stream, init); + } + return stream; +} + +/** + * An error that can occur when importing records into a {@linkcode Deno.Kv} + * store. Information associated with the error is available with the `cause` + * being set to the original error that was thrown. + */ +export class ImportError extends Error { + #count: number; + #db: Deno.Kv; + #errors: number; + #json?: string; + #skipped: number; + + /** + * The number of entries that had been read from the stream when the + * error occurred. + */ + get count(): number { + return this.#count; + } + /** + * Reference to the {@linkcode Deno.Kv} store that was the target for the + * import. + */ + get db(): Deno.Kv { + return this.#db; + } + /** + * The number of errors in aggregate that had occurred to this point. + */ + get errors(): number { + return this.#errors; + } + /** + * If available, the most recent JSON string what had been read from the data. + */ + get json(): string | undefined { + return this.#json; + } + /** + * The aggregate number of records that had been skipped. + */ + get skipped(): number { + return this.#skipped; + } + + constructor( + message: string, + { count, errors, json, db, skipped, ...options }: ImportErrorOptions, + ) { + super(message, options); + this.#count = count; + this.#errors = errors; + this.#json = json; + this.#db = db; + this.#skipped = skipped; + } +} + +/** + * Imports entries into a {@linkcode Deno.Kv} store from a stream of newline- + * delimited JSON records. The import process can be controlled with options to + * overwrite existing entries, provide a prefix for the imported keys, and to + * handle errors that occur during the import process. + * + * The import process will read the stream of records and parse each record as + * JSON. The key and value of the entry will be extracted from the JSON object + * and imported into the store. If the `overwrite` option is set to `true`, then + * any existing entry with the same key will be replaced with the imported + * value. + * + * If an error occurs while processing the import data, the error will be passed + * to the `onError` callback if provided. If the `throwOnError` option is set to + * `true`, then an {@linkcode ImportError} will be thrown when an error is + * encountered and terminate the import process. + * + * @param db The {@linkcode Deno.Kv} store to import entries into. + * @param data The data to import into the store. This can be a stream of bytes, + * a {@linkcode Blob}, an {@linkcode ArrayBufferView}, an + * {@linkcode ArrayBuffer} or a string. + * @param options Options which can be set when importing entries. + * @returns A promise that resolves to an {@linkcode ImportEntriesResult} object + * which provides details about the import process. + */ +export async function importEntries( + db: Deno.Kv, + data: + | ReadableStream + | Blob + | ArrayBufferView + | ArrayBuffer + | string, + options: ImportEntriesOptions = {}, +): Promise { + const { + overwrite = false, + prefix = [], + onError, + onProgress, + signal, + throwOnError, + } = options; + let stream: ReadableStream; + const transformer = new LinesTransformStream(); + if (data instanceof ReadableStream) { + stream = data.pipeThrough(transformer); + } else if (data instanceof Blob) { + stream = data.stream().pipeThrough(transformer); + } else { + stream = new Blob([data]).stream().pipeThrough(transformer); + } + const reader = stream.getReader(); + let count = 0; + let errors = 0; + let skipped = 0; + while (true) { + let result: ReadableStreamReadResult | undefined = undefined; + try { + result = await reader.read(); + if (result.value) { + count++; + const entry: KvEntryJSON = JSON.parse(result.value); + const { key, value } = entry; + const entryKey = prefix.length + ? [...prefix, ...toKey(key)] + : toKey(key); + if (!overwrite) { + const { versionstamp } = await db.get(entryKey); + if (versionstamp) { + skipped++; + continue; + } + } + await db.set(entryKey, toValue(value)); + onProgress?.(count, skipped, errors); + } + if (result.done) { + break; + } + if (signal?.aborted) { + reader.releaseLock(); + return { aborted: true, count, skipped, errors }; + } + } catch (cause) { + errors++; + if (onError || throwOnError) { + const error = new ImportError( + cause instanceof Error ? cause.message : "An import error occurred.", + { cause, json: result?.value, count, db, skipped, errors }, + ); + onError?.(error); + if (throwOnError) { + reader.releaseLock(); + throw error; + } + } + } + } + reader.releaseLock(); + return { count, skipped, errors }; +} diff --git a/line_transform_stream.test.ts b/line_transform_stream.test.ts new file mode 100644 index 0000000..768f1fc --- /dev/null +++ b/line_transform_stream.test.ts @@ -0,0 +1,27 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assertEquals } from "@std/assert/equals"; + +import { LinesTransformStream } from "./line_transform_stream.ts"; + +const fixture = + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"} +{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"} +`; + +Deno.test({ + name: "LinesTransformStream", + async fn() { + const stream = new Blob([fixture]).stream().pipeThrough( + new LinesTransformStream(), + ); + const actual: string[] = []; + for await (const chunk of stream) { + actual.push(chunk); + } + assertEquals(actual, [ + `{"key":[{"type":"string","value":"a"}],"value":{"type":"bigint","value":"100"},"versionstamp":"00000000000000060000"}`, + `{"key":[{"type":"string","value":"b"}],"value":{"type":"boolean","value":true},"versionstamp":"000000000000000f0000"}`, + ]); + }, +}); diff --git a/line_transform_stream.ts b/line_transform_stream.ts new file mode 100644 index 0000000..d11bc50 --- /dev/null +++ b/line_transform_stream.ts @@ -0,0 +1,101 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +/** + * Module that provides {@linkcode LinesTransformStream} which is used within + * the library to transform a byte stream into chunks of string lines. + * + * @module + */ + +import { concat } from "@std/bytes/concat"; + +const LF = 0x0a; +const CR = 0x0d; +const decoder = new TextDecoder(); + +function stripEol(u8: Uint8Array): Uint8Array { + const length = u8.byteLength; + if (u8[length - 1] === LF) { + let drop = 1; + if (length > 1 && u8[length - 2] === CR) { + drop = 2; + } + return u8.subarray(0, length - drop); + } + return u8; +} + +/** + * A transform stream that takes a byte stream and transforms it into a stream + * of string lines. + */ +export class LinesTransformStream extends TransformStream { + #buffer = new Uint8Array(0); + #pos = 0; + + constructor() { + super({ + transform: (chunk, controller) => { + this.#transform(chunk, controller); + }, + flush: (controller) => { + const slice = stripEol(this.#buffer.subarray(this.#pos)); + if (slice.length) { + try { + controller.enqueue(decoder.decode(slice)); + } catch (error) { + controller.error(error); + } + } + }, + }); + } + + #readLineBytes(): Uint8Array | null { + let slice: Uint8Array | null = null; + const i = this.#buffer.subarray(this.#pos).indexOf(LF); + if (i >= 0) { + slice = this.#buffer.subarray(this.#pos, this.#pos + i + 1); + this.#pos += i + 1; + return stripEol(slice); + } + return null; + } + + *#lines(): IterableIterator { + while (true) { + const bytes = this.#readLineBytes(); + if (!bytes) { + this.#truncate(); + return null; + } + yield decoder.decode(bytes); + } + } + + #transform( + chunk: Uint8Array, + controller: TransformStreamDefaultController, + ) { + this.#buffer = concat([this.#buffer, chunk]); + const iterator = this.#lines(); + while (true) { + try { + const result = iterator.next(); + if (result.value) { + controller.enqueue(result.value); + } + if (result.done) { + break; + } + } catch (error) { + controller.error(error); + } + } + } + + #truncate() { + this.#buffer = this.#buffer.slice(this.#pos); + this.#pos = 0; + } +} diff --git a/mod.ts b/mod.ts index 8367bd7..3652751 100644 --- a/mod.ts +++ b/mod.ts @@ -88,8 +88,66 @@ * assertEquals(estimateSize(value), 36); * ``` * + * ## Importing and exporting entries + * + * Deno KV stores can be exported and imported. This is useful for backing up + * and restoring data, as well as for transferring data between different Deno + * processes. + * + * The import and export utilities are: + * + * - {@linkcode exportEntries} - Export entries from a Deno KV store as a stream + * or a response. + * - {@linkcode importEntries} - Import entries into a Deno KV store. + * + * ### Examples + * + * Exporting entries from a Deno KV store and saving them to a file: + * + * ```ts + * import { exportEntries } from "@deno/kv-utils"; + * + * const db = await Deno.openKv(); + * const file = await Deno.open("export.ndjson", { write: true, create: true }); + * for await (const chunk of exportEntries(db, { prefix: ["person"] })) { + * await file.write(chunk); + * } + * file.close(); + * db.close(); + * ``` + * + * Exporting entries from a Deno KV store and sending them as a response: + * + * ```ts ignore + * import { exportEntries } from "@deno/kv-utils"; + * + * const db = await Deno.openKv(); + * const server = Deno.serve((_req) => exportEntries( + * db, + * { prefix: ["person"] }, + * { type: "response" } + * )); + * + * await server.finished; + * db.close(); + * ``` + * + * Importing entries from a file and storing them in a Deno KV store: + * + * ```ts + * import { importEntries } from "@deno/kv-utils"; + * import { assert } from "@std/assert"; + * + * const db = await Deno.openKv(); + * const file = await Deno.open("export.ndjson", { read: true }); + * const result = await importEntries(db, file.readable); + * assert(result.errors === 0); + * db.close(); + * ``` + * * @module */ export * from "./json.ts"; export * from "./estimate_size.ts"; +export * from "./import_export.ts";