diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index 7e8acac..1fdc05e 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1,6 +1,8 @@ # These are supported funding model platforms -github: [lambdalisue] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +github: [ + lambdalisue, +] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] patreon: # Replace with a single Patreon username open_collective: # Replace with a single Open Collective username ko_fi: # Replace with a single Ko-fi username diff --git a/README.md b/README.md index 11adabd..00fa748 100644 --- a/README.md +++ b/README.md @@ -1115,6 +1115,27 @@ const iter = pipe( console.log(await Array.fromAsync(iter)); // [1, 2, 3, 1, 2, 3] ``` +### repeatable + +Transform an async iterable into a repeatable async iterable. It caches the +values of the original iterable so that it can be replayed. Useful for replaying +the costly async iterable. + +```ts +import { repeatable } from "@core/iterutil/async/repeatable"; +import { assertEquals } from "@std/assert"; + +const origin = (async function* () { + yield 1; + yield 2; + yield 3; +})(); +const iter = repeatable(origin); +assertEquals(await Array.fromAsync(iter), [1, 2, 3]); +assertEquals(await Array.fromAsync(iter), [1, 2, 3]); // iter can be replayed +assertEquals(await Array.fromAsync(origin), []); // origin is already consumed +``` + ### some Returns true if at least one element in the iterable satisfies the provided diff --git a/async/mod.ts b/async/mod.ts index aac738f..a543869 100644 --- a/async/mod.ts +++ b/async/mod.ts @@ -21,6 +21,7 @@ export * from "./pairwise.ts"; export * from "./partition.ts"; export * from "./reduce.ts"; export * from "./repeat.ts"; +export * from "./repeatable.ts"; export * from "./some.ts"; export * from "./take.ts"; export * from "./take_while.ts"; diff --git a/async/repeatable.ts b/async/repeatable.ts new file mode 100644 index 0000000..d332587 --- /dev/null +++ b/async/repeatable.ts @@ -0,0 +1,46 @@ +const done = Symbol("done"); + +export function repeatable(iterable: AsyncIterable): AsyncIterable { + const cache: T[] = []; + let buildingCache: Promise | undefined = undefined; + let pendingResolvers: ((value: T | typeof done) => void)[] = []; + let finished = false; + + return { + [Symbol.asyncIterator]: async function* () { + yield* cache; + + if (!finished) { + if (!buildingCache) { + buildingCache = (async () => { + try { + for await (const item of iterable) { + cache.push(item); + pendingResolvers.forEach((resolve) => resolve(item)); + pendingResolvers = []; + } + } finally { + finished = true; + pendingResolvers.forEach((resolve) => resolve(done)); + pendingResolvers = []; + } + })(); + } + } + let index = cache.length; + while (!finished || index < cache.length) { + if (index < cache.length) { + yield cache[index++]; + } else { + const nextItem = await new Promise((resolve) => { + pendingResolvers.push(resolve); + }); + if (nextItem !== done) { + yield nextItem; + index++; + } + } + } + }, + }; +} diff --git a/async/repeatable_test.ts b/async/repeatable_test.ts new file mode 100644 index 0000000..9a65589 --- /dev/null +++ b/async/repeatable_test.ts @@ -0,0 +1,82 @@ +import { test } from "@cross/test"; +import { delay } from "@std/async/delay"; +import { assertEquals } from "@std/assert"; +import { repeatable } from "./repeatable.ts"; + +async function* delayedGenerator(sideEffect?: () => void) { + yield 1; + await delay(100); + yield 2; + await delay(100); + yield 3; + sideEffect?.(); +} + +await test("repeatable should return the same sequence on multiple iterations", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const result1 = await Array.fromAsync(it); + const result2 = await Array.fromAsync(it); + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); +}); + +await test("repeatable should call internal iterator only once", async () => { + let called = 0; + const input = delayedGenerator(() => called++); + const it = repeatable(input); + + const result1 = await Array.fromAsync(it); + const result2 = await Array.fromAsync(it); + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); + assertEquals(called, 1, "Internal iterator called only once"); +}); + +await test("repeatable should work correctly when consumed partially and then fully", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const result1: number[] = []; + const firstIter = it[Symbol.asyncIterator](); + + result1.push((await firstIter.next()).value); // 1 + + const result2 = await Array.fromAsync(it); + + result1.push((await firstIter.next()).value); // 2 + result1.push((await firstIter.next()).value); // 3 + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "First iteration"); +}); + +await test("repeatable should cache values and return them immediately on subsequent iterations", async () => { + const input = delayedGenerator(); + const it = repeatable(input); + + const start = performance.now(); + const result1 = await Array.fromAsync(it); + const end1 = performance.now(); + const timeTaken1 = end1 - start; + + const start2 = performance.now(); + const result2 = await Array.fromAsync(it); + const end2 = performance.now(); + const timeTaken2 = end2 - start2; + + assertEquals(result1, [1, 2, 3], "First iteration"); + assertEquals(result2, [1, 2, 3], "Second iteration"); + + console.debug("Time taken for first consume:", timeTaken1); + console.debug("Time taken for second consume (with cache):", timeTaken2); + + if (timeTaken2 > timeTaken1 / 10) { + throw new Error( + "Second consume took too long, cache might not be working.", + ); + } +}); diff --git a/deno.jsonc b/deno.jsonc index 9ebf782..04e7905 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -27,6 +27,7 @@ "./async/partition": "./async/partition.ts", "./async/reduce": "./async/reduce.ts", "./async/repeat": "./async/repeat.ts", + "./async/repeatable": "./async/repeatable.ts", "./async/some": "./async/some.ts", "./async/take": "./async/take.ts", "./async/take-while": "./async/take_while.ts", @@ -261,6 +262,7 @@ "@core/unknownutil": "jsr:@core/unknownutil@^4.0.1", "@cross/test": "jsr:@cross/test@^0.0.9", "@std/assert": "jsr:@std/assert@^1.0.2", + "@std/async": "jsr:@std/async@^1.0.6", "@std/jsonc": "jsr:@std/jsonc@^1.0.0", "@std/path": "jsr:@std/path@^1.0.2", "@std/testing": "jsr:@std/testing@^1.0.0"