From 0d748fd472e868c65130260aec89673bc20e113a Mon Sep 17 00:00:00 2001 From: Max Isom Date: Wed, 3 Apr 2024 15:58:32 -0700 Subject: [PATCH 1/7] Use birpc Greatly simplifies code and paves the way for us to grab a new database instance within `beforeTemplateIsBaked()` in application code. --- package.json | 1 + src/index.ts | 186 +++++++++++++--------------------- src/lib/rpc.ts | 20 ++++ src/worker.ts | 270 +++++++++++++++++++++---------------------------- tsconfig.json | 2 +- yarn.lock | 5 + 6 files changed, 210 insertions(+), 274 deletions(-) create mode 100644 src/lib/rpc.ts diff --git a/package.json b/package.json index 0e5868d..6565bef 100644 --- a/package.json +++ b/package.json @@ -45,6 +45,7 @@ }, "dependencies": { "async-mutex": "0.4.0", + "birpc": "0.2.17", "nanoid": "4.0.0", "object-hash": "3.0.0", "pg": "8.8.0" diff --git a/src/index.ts b/src/index.ts index 25ba576..1ea71ce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,24 +1,23 @@ -import { registerSharedWorker, SharedWorker } from "ava/plugin" +import { registerSharedWorker } from "ava/plugin" import hash from "object-hash" import path from "node:path" -import { +import type { ConnectionDetailsFromWorker, - FinishedRunningBeforeTemplateIsBakedHookMessage, InitialWorkerData, - MessageFromWorker, - MessageToWorker, } from "./internal-types" -import { +import type { ConnectionDetails, GetTestPostgresDatabase, GetTestPostgresDatabaseFactoryOptions, GetTestPostgresDatabaseOptions, - GetTestPostgresDatabaseResult, } from "./public-types" import { Pool } from "pg" -import { Jsonifiable } from "type-fest" -import { StartedNetwork } from "testcontainers" -import { ExecutionContext } from "ava" +import type { Jsonifiable } from "type-fest" +import type { ExecutionContext } from "ava" +import { once } from "node:events" +import { createBirpc } from "birpc" +import { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" +import { ExecResult } from "testcontainers" const getWorker = async ( initialData: InitialWorkerData, @@ -110,127 +109,80 @@ export const getTestPostgresDatabaseFactory = < const worker = await workerPromise await worker.available - const waitForAndHandleReply = async ( - message: SharedWorker.Plugin.PublishedMessage - ): Promise => { - let reply = await message.replies().next() - const replyData: MessageFromWorker = reply.value.data - - if (replyData.type === "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED") { - let result: FinishedRunningBeforeTemplateIsBakedHookMessage["result"] = - { - status: "success", - result: undefined, - } - - if (options?.beforeTemplateIsBaked) { - const connectionDetails = - mapWorkerConnectionDetailsToConnectionDetails( - replyData.connectionDetails - ) + let rpcCallback: (data: any) => void + + const rpc = createBirpc( + { + runBeforeTemplateIsBakedHook: async (connection) => { + if (options?.beforeTemplateIsBaked) { + const connectionDetails = + mapWorkerConnectionDetailsToConnectionDetails(connection) + + // Ignore if the pool is terminated by the shared worker + // (This happens in CI for some reason even though we drain the pool first.) + connectionDetails.pool.on("error", (error) => { + if ( + error.message.includes( + "terminating connection due to administrator command" + ) + ) { + return + } + + throw error + }) - // Ignore if the pool is terminated by the shared worker - // (This happens in CI for some reason even though we drain the pool first.) - connectionDetails.pool.on("error", (error) => { - if ( - error.message.includes( - "terminating connection due to administrator command" - ) - ) { - return - } - - throw error - }) - - try { const hookResult = await options.beforeTemplateIsBaked({ params, connection: connectionDetails, - containerExec: async (command) => { - const request = reply.value.reply({ - type: "EXEC_COMMAND_IN_CONTAINER", - command, - }) - - reply = await request.replies().next() - - if ( - reply.value.data.type !== "EXEC_COMMAND_IN_CONTAINER_RESULT" - ) { - throw new Error( - "Expected EXEC_COMMAND_IN_CONTAINER_RESULT message" - ) - } - - return reply.value.data.result - }, + containerExec: async (command): Promise => + rpc.execCommandInContainer(command), }) - result = { - status: "success", - result: hookResult, - } - } catch (error) { - result = { - status: "error", - error: - error instanceof Error - ? error.stack ?? error.message - : new Error( - "Unknown error type thrown in beforeTemplateIsBaked hook" - ), - } - } finally { - // Otherwise connection will be killed by worker when converting to template - await connectionDetails.pool.end() + return hookResult } - } + }, + }, + { + post: (data) => worker.publish(data), + on: (data) => { + rpcCallback = data + }, + } + ) - try { - return waitForAndHandleReply( - reply.value.reply({ - type: "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED", - result, - } as MessageToWorker) - ) - } catch (error) { - if (error instanceof Error && error.name === "DataCloneError") { - throw new TypeError( - "Return value of beforeTemplateIsBaked() hook could not be serialized. Make sure it returns only JSON-serializable values." - ) - } + const messageHandlerAbortController = new AbortController() + const messageHandlerPromise = Promise.race([ + once(messageHandlerAbortController.signal, "abort"), + (async () => { + for await (const msg of worker.subscribe()) { + rpcCallback!(msg.data) - throw error - } - } else if (replyData.type === "GOT_DATABASE") { - if (replyData.beforeTemplateIsBakedResult.status === "error") { - if (typeof replyData.beforeTemplateIsBakedResult.error === "string") { - throw new Error(replyData.beforeTemplateIsBakedResult.error) + if (messageHandlerAbortController.signal.aborted) { + break } - - throw replyData.beforeTemplateIsBakedResult.error } + })(), + ]) - return { - ...mapWorkerConnectionDetailsToConnectionDetails( - replyData.connectionDetails - ), - beforeTemplateIsBakedResult: - replyData.beforeTemplateIsBakedResult.result, - } - } + t.teardown(async () => { + messageHandlerAbortController.abort() + await messageHandlerPromise + }) - throw new Error(`Unexpected message type: ${replyData.type}`) - } + const testDatabaseConnection = await rpc.getTestDatabase({ + // todo: rename? + key: getTestDatabaseOptions?.databaseDedupeKey, + params, + }) - return waitForAndHandleReply( - worker.publish({ - type: "GET_TEST_DATABASE", - params, - key: getTestDatabaseOptions?.databaseDedupeKey, - } as MessageToWorker) - ) + return { + ...mapWorkerConnectionDetailsToConnectionDetails( + testDatabaseConnection.connectionDetails + ), + beforeTemplateIsBakedResult: + testDatabaseConnection.beforeTemplateIsBakedResult, + } } return getTestPostgresDatabase diff --git a/src/lib/rpc.ts b/src/lib/rpc.ts new file mode 100644 index 0000000..30c80cb --- /dev/null +++ b/src/lib/rpc.ts @@ -0,0 +1,20 @@ +import type { ExecResult } from "testcontainers" +import type { Jsonifiable } from "type-fest" +import type { ConnectionDetailsFromWorker } from "~/internal-types" + +export interface TestWorkerFunctions { + runBeforeTemplateIsBakedHook: ( + connectionDetails: ConnectionDetailsFromWorker + ) => Promise +} + +export interface SharedWorkerFunctions { + getTestDatabase: (options: { + key?: string + params?: Jsonifiable + }) => Promise<{ + connectionDetails: ConnectionDetailsFromWorker + beforeTemplateIsBakedResult: unknown + }> + execCommandInContainer: (command: string[]) => Promise +} diff --git a/src/worker.ts b/src/worker.ts index bbf7766..9963a1e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,21 +1,15 @@ import pg from "pg" -import { - GenericContainer, - Network, - StartedNetwork, - getContainerRuntimeClient, -} from "testcontainers" +import { GenericContainer, Network } from "testcontainers" import { Mutex } from "async-mutex" import hash from "object-hash" -import { - GotDatabaseMessage, - InitialWorkerData, - MessageFromWorker, - MessageToWorker, - WorkerMessage, -} from "./internal-types" +import type { InitialWorkerData } from "./internal-types" import getRandomDatabaseName from "./lib/get-random-database-name" -import { SharedWorker } from "ava/plugin" +import type { SharedWorker } from "ava/plugin" +import { type BirpcReturn, type ChannelOptions, createBirpc } from "birpc" +import { once } from "node:events" +import type { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" + +type WorkerRpc = BirpcReturn class TestWorkerShutdownError extends Error { constructor() { @@ -31,7 +25,6 @@ export class Worker { private keyToDatabaseName = new Map() private keyToCreationMutex = new Map() private getOrCreateKeyToCreationMutex = new Mutex() - private createdDatabasesByTestWorkerId = new Map() private getOrCreateTemplateNameMutex = new Mutex() private startContainerPromise: ReturnType @@ -41,174 +34,140 @@ export class Worker { } public async handleTestWorker(testWorker: SharedWorker.TestWorker) { + let workerRpcCallback: (data: any) => void + const rpcChannel: ChannelOptions = { + post: (data) => testWorker.publish(data), + on: (data) => { + workerRpcCallback = data + }, + } + + const messageHandlerAbortController = new AbortController() + const messageHandlerPromise = Promise.race([ + once(messageHandlerAbortController.signal, "abort"), + (async () => { + for await (const msg of testWorker.subscribe()) { + workerRpcCallback!(msg.data) + + if (messageHandlerAbortController.signal.aborted) { + break + } + } + })(), + ]) + testWorker.teardown(async () => { - await this.handleTestWorkerTeardown(testWorker) + messageHandlerAbortController.abort() + await messageHandlerPromise }) - for await (const message of testWorker.subscribe()) { - await this.handleMessage(message as any) - } + const rpc: WorkerRpc = createBirpc< + TestWorkerFunctions, + SharedWorkerFunctions + >( + { + getTestDatabase: async (options) => { + return this.getTestDatabase(options, rpc, (teardown) => { + testWorker.teardown(teardown) + }) + }, + execCommandInContainer: async (command) => { + const container = (await this.startContainerPromise).container + return container.exec(command) + }, + }, + rpcChannel + ) } - public async handleMessage( - message: SharedWorker.ReceivedMessage + private async getTestDatabase( + options: Parameters[0], + rpc: WorkerRpc, + registerTeardown: (teardown: () => Promise) => void ) { - if (message.data.type === "GET_TEST_DATABASE") { - // Get template name - const paramsHash = hash(message.data.params ?? null) - let neededToCreateTemplate = false - // (Mutex avoids race conditions where two identical templates get built) - await this.getOrCreateTemplateNameMutex.runExclusive(() => { - if (!this.paramsHashToTemplateCreationPromise.has(paramsHash)) { - neededToCreateTemplate = true - this.paramsHashToTemplateCreationPromise.set( - paramsHash, - this.createTemplate(message) - ) - } - }) - let templateCreationResult - try { - templateCreationResult = - await this.paramsHashToTemplateCreationPromise.get(paramsHash)! - } catch (error) { - if (error instanceof TestWorkerShutdownError) { - return - } - - throw error + // Get template name + const paramsHash = hash(options.params ?? null) + // (Mutex avoids race conditions where two identical templates get built) + await this.getOrCreateTemplateNameMutex.runExclusive(() => { + if (!this.paramsHashToTemplateCreationPromise.has(paramsHash)) { + this.paramsHashToTemplateCreationPromise.set( + paramsHash, + this.createTemplate(rpc) + ) } + }) + const templateCreationResult = + await this.paramsHashToTemplateCreationPromise.get(paramsHash)! - const { - templateName, - beforeTemplateIsBakedResult, - lastMessage: lastMessageFromTemplateCreation, - } = templateCreationResult! - - // Create database using template - const { postgresClient } = await this.startContainerPromise - - // Only relevant when a `key` is provided - const fullDatabaseKey = `${paramsHash}-${message.data.key}` - - let databaseName = message.data.key - ? this.keyToDatabaseName.get(fullDatabaseKey) - : undefined - if (!databaseName) { - const createDatabase = async () => { - databaseName = getRandomDatabaseName() - await postgresClient.query( - `CREATE DATABASE ${databaseName} WITH TEMPLATE ${templateName};` - ) - this.createdDatabasesByTestWorkerId.set( - message.testWorker.id, - ( - this.createdDatabasesByTestWorkerId.get(message.testWorker.id) ?? - [] - ).concat(databaseName) - ) - } + const { templateName, beforeTemplateIsBakedResult } = + templateCreationResult! - if (message.data.key) { - await this.getOrCreateKeyToCreationMutex.runExclusive(() => { - if (!this.keyToCreationMutex.has(fullDatabaseKey)) { - this.keyToCreationMutex.set(fullDatabaseKey, new Mutex()) - } - }) + // Create database using template + const { postgresClient } = await this.startContainerPromise - const mutex = this.keyToCreationMutex.get(fullDatabaseKey)! + // Only relevant when a `key` is provided + const fullDatabaseKey = `${paramsHash}-${options.key}` + + let databaseName = options.key + ? this.keyToDatabaseName.get(fullDatabaseKey) + : undefined + if (!databaseName) { + const createDatabase = async () => { + databaseName = getRandomDatabaseName() + await postgresClient.query( + `CREATE DATABASE ${databaseName} WITH TEMPLATE ${templateName};` + ) + } - await mutex.runExclusive(async () => { - if (!this.keyToDatabaseName.has(fullDatabaseKey)) { - await createDatabase() - this.keyToDatabaseName.set(fullDatabaseKey, databaseName!) - } + if (options.key) { + await this.getOrCreateKeyToCreationMutex.runExclusive(() => { + if (!this.keyToCreationMutex.has(fullDatabaseKey)) { + this.keyToCreationMutex.set(fullDatabaseKey, new Mutex()) + } + }) - databaseName = this.keyToDatabaseName.get(fullDatabaseKey)! - }) - } else { - await createDatabase() - } - } + const mutex = this.keyToCreationMutex.get(fullDatabaseKey)! - const gotDatabaseMessage: GotDatabaseMessage = { - type: "GOT_DATABASE", - connectionDetails: await this.getConnectionDetails(databaseName!), - beforeTemplateIsBakedResult, - } + await mutex.runExclusive(async () => { + if (!this.keyToDatabaseName.has(fullDatabaseKey)) { + await createDatabase() + this.keyToDatabaseName.set(fullDatabaseKey, databaseName!) + } - if (neededToCreateTemplate) { - lastMessageFromTemplateCreation.value.reply(gotDatabaseMessage) + databaseName = this.keyToDatabaseName.get(fullDatabaseKey)! + }) } else { - message.reply(gotDatabaseMessage) + await createDatabase() } - - return } - throw new Error(`Unknown message: ${JSON.stringify(message.data)}`) - } - - private async handleTestWorkerTeardown( - testWorker: SharedWorker.TestWorker - ) { - const databases = this.createdDatabasesByTestWorkerId.get(testWorker.id) - - if (databases) { - const { postgresClient } = await this.startContainerPromise + registerTeardown(async () => { + // Don't remove keyed databases + if (options.key && this.keyToDatabaseName.has(fullDatabaseKey)) { + return + } - const databasesAssociatedWithKeys = new Set( - this.keyToDatabaseName.values() - ) + await this.forceDisconnectClientsFrom(databaseName!) + await postgresClient.query(`DROP DATABASE ${databaseName}`) + }) - await Promise.all( - databases - .filter((d) => !databasesAssociatedWithKeys.has(d)) - .map(async (database) => { - await this.forceDisconnectClientsFrom(database) - await postgresClient.query(`DROP DATABASE ${database}`) - }) - ) + return { + connectionDetails: await this.getConnectionDetails(databaseName!), + beforeTemplateIsBakedResult, } } - private async createTemplate( - message: SharedWorker.ReceivedMessage - ) { + private async createTemplate(rpc: WorkerRpc) { const databaseName = getRandomDatabaseName() // Create database - const { postgresClient, container, pgbouncerContainer } = await this - .startContainerPromise + const { postgresClient } = await this.startContainerPromise await postgresClient.query(`CREATE DATABASE ${databaseName};`) - const msg = message.reply({ - type: "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED", - connectionDetails: await this.getConnectionDetails(databaseName), - }) - - let reply = await msg.replies().next() - - if (reply.done) { - throw new TestWorkerShutdownError() - } - - while ( - reply.value.data.type !== "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED" - ) { - const replyValue = reply.value.data as MessageToWorker - - if (replyValue.type === "EXEC_COMMAND_IN_CONTAINER") { - const result = await container.exec(replyValue.command) - const message = reply.value.reply({ - type: "EXEC_COMMAND_IN_CONTAINER_RESULT", - result, - } as MessageFromWorker) - - reply = await message.replies().next() - } - } + const beforeTemplateIsBakedResult = await rpc.runBeforeTemplateIsBakedHook( + await this.getConnectionDetails(databaseName) + ) // Disconnect any clients await this.forceDisconnectClientsFrom(databaseName) @@ -220,8 +179,7 @@ export class Worker { return { templateName: databaseName, - beforeTemplateIsBakedResult: reply.value.data.result, - lastMessage: reply, + beforeTemplateIsBakedResult, } } diff --git a/tsconfig.json b/tsconfig.json index 1011eac..9e48fd9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,7 @@ { "compilerOptions": { "target": "es2020", - "lib": ["esnext"], + "lib": ["esnext", "DOM"], "baseUrl": ".", "allowJs": false, "skipLibCheck": true, diff --git a/yarn.lock b/yarn.lock index fb461dd..212dadd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -886,6 +886,11 @@ binary-extensions@^2.0.0, binary-extensions@^2.2.0: resolved "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz" integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA== +birpc@0.2.17: + version "0.2.17" + resolved "https://registry.yarnpkg.com/birpc/-/birpc-0.2.17.tgz#d0bdb90d4d063061156637f03b7b0adea1779734" + integrity sha512-+hkTxhot+dWsLpp3gia5AkVHIsKlZybNT5gIYiDlNzJrmYPcTM9k5/w2uaj3IPpd7LlEYpmCj4Jj1nC41VhDFg== + bl@^4.0.3: version "4.1.0" resolved "https://registry.npmjs.org/bl/-/bl-4.1.0.tgz" From 943755fe6ba0845f302c811312cd7092a2463bc9 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Wed, 3 Apr 2024 16:12:40 -0700 Subject: [PATCH 2/7] Bump node version --- .github/workflows/npm-semantic-release.yml | 2 +- .github/workflows/prettier.yml | 2 +- .github/workflows/test.yml | 4 ++-- .github/workflows/type-test.yml | 2 +- package.json | 3 +++ src/worker.ts | 6 ------ 6 files changed, 8 insertions(+), 11 deletions(-) diff --git a/.github/workflows/npm-semantic-release.yml b/.github/workflows/npm-semantic-release.yml index 347f976..3cc7548 100644 --- a/.github/workflows/npm-semantic-release.yml +++ b/.github/workflows/npm-semantic-release.yml @@ -15,7 +15,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v2 with: - node-version: "16.x" + node-version: "18.x" - name: Install dependencies run: yarn install - name: Build diff --git a/.github/workflows/prettier.yml b/.github/workflows/prettier.yml index a0c9f04..d074da4 100644 --- a/.github/workflows/prettier.yml +++ b/.github/workflows/prettier.yml @@ -13,7 +13,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v2 with: - node-version: 16 + node-version: 18 - name: Install dependencies run: yarn install - name: Run Prettier Test diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f590faf..0de2bf2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,8 +8,8 @@ jobs: name: Run NPM Test strategy: matrix: - node-version: [14.x, 16.x, 18.x] - postgres-version: [13, 14, 15] + node-version: [18.x, 20.x] + postgres-version: [14, 15, 16] runs-on: ubuntu-20.04 steps: - name: Checkout diff --git a/.github/workflows/type-test.yml b/.github/workflows/type-test.yml index 761d452..b89a0e6 100644 --- a/.github/workflows/type-test.yml +++ b/.github/workflows/type-test.yml @@ -11,7 +11,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v2 with: - node-version: 16 + node-version: 18 cache: "yarn" - name: Run NPM Install run: yarn install --frozen-lockfile diff --git a/package.json b/package.json index 6565bef..46324d7 100644 --- a/package.json +++ b/package.json @@ -60,5 +60,8 @@ "build": "tsup", "format": "prettier -w .", "format:check": "prettier --check ." + }, + "engines": { + "node": ">=18" } } diff --git a/src/worker.ts b/src/worker.ts index 9963a1e..030d163 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -11,12 +11,6 @@ import type { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" type WorkerRpc = BirpcReturn -class TestWorkerShutdownError extends Error { - constructor() { - super("Test worker unexpectedly shut down") - } -} - export class Worker { private paramsHashToTemplateCreationPromise = new Map< string, From abdfbd361d2a953f51b0224baf158df2c0753e28 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 4 Apr 2024 11:41:29 -0700 Subject: [PATCH 3/7] Fix tests --- src/index.ts | 209 +++++++++++++++++++++++++------------------------ src/lib/rpc.ts | 3 +- src/worker.ts | 8 +- 3 files changed, 115 insertions(+), 105 deletions(-) diff --git a/src/index.ts b/src/index.ts index 1ea71ce..f6b0ef3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -49,6 +49,24 @@ const getWorker = async ( }) } +const teardownConnection = async ({ + pool, + pgbouncerPool, +}: ConnectionDetails) => { + try { + await pool.end() + await pgbouncerPool?.end() + } catch (error) { + if ( + (error as Error).message.includes("Called end on pool more than once") + ) { + return + } + + throw error + } +} + export const getTestPostgresDatabaseFactory = < Params extends Jsonifiable = never >( @@ -62,124 +80,113 @@ export const getTestPostgresDatabaseFactory = < const workerPromise = getWorker(initialData, options as any) - const getTestPostgresDatabase: GetTestPostgresDatabase = async ( - t: ExecutionContext, - params: any, - getTestDatabaseOptions?: GetTestPostgresDatabaseOptions - ) => { - const mapWorkerConnectionDetailsToConnectionDetails = ( - connectionDetailsFromWorker: ConnectionDetailsFromWorker - ): ConnectionDetails => { - const pool = new Pool({ - connectionString: connectionDetailsFromWorker.connectionString, - }) - - let pgbouncerPool: Pool | undefined - if (connectionDetailsFromWorker.pgbouncerConnectionString) { - pgbouncerPool = new Pool({ - connectionString: - connectionDetailsFromWorker.pgbouncerConnectionString, - }) - } + const mapWorkerConnectionDetailsToConnectionDetails = ( + connectionDetailsFromWorker: ConnectionDetailsFromWorker + ): ConnectionDetails => { + const pool = new Pool({ + connectionString: connectionDetailsFromWorker.connectionString, + }) - t.teardown(async () => { - try { - await pool.end() - await pgbouncerPool?.end() - } catch (error) { - if ( - (error as Error).message.includes( - "Called end on pool more than once" - ) - ) { - return - } - - throw error - } + let pgbouncerPool: Pool | undefined + if (connectionDetailsFromWorker.pgbouncerConnectionString) { + pgbouncerPool = new Pool({ + connectionString: connectionDetailsFromWorker.pgbouncerConnectionString, }) + } - return { - ...connectionDetailsFromWorker, - pool, - pgbouncerPool, - } + return { + ...connectionDetailsFromWorker, + pool, + pgbouncerPool, } + } - const worker = await workerPromise - await worker.available - - let rpcCallback: (data: any) => void - - const rpc = createBirpc( - { - runBeforeTemplateIsBakedHook: async (connection) => { - if (options?.beforeTemplateIsBaked) { - const connectionDetails = - mapWorkerConnectionDetailsToConnectionDetails(connection) - - // Ignore if the pool is terminated by the shared worker - // (This happens in CI for some reason even though we drain the pool first.) - connectionDetails.pool.on("error", (error) => { - if ( - error.message.includes( - "terminating connection due to administrator command" - ) - ) { - return - } - - throw error - }) - - const hookResult = await options.beforeTemplateIsBaked({ - params, - connection: connectionDetails, - containerExec: async (command): Promise => - rpc.execCommandInContainer(command), - }) - - return hookResult - } - }, + let rpcCallback: (data: any) => void + const rpc = createBirpc( + { + runBeforeTemplateIsBakedHook: async (connection, params) => { + if (options?.beforeTemplateIsBaked) { + const connectionDetails = + mapWorkerConnectionDetailsToConnectionDetails(connection) + + // Ignore if the pool is terminated by the shared worker + // (This happens in CI for some reason even though we drain the pool first.) + connectionDetails.pool.on("error", (error) => { + if ( + error.message.includes( + "terminating connection due to administrator command" + ) + ) { + return + } + + throw error + }) + + const hookResult = await options.beforeTemplateIsBaked({ + params: params as any, + connection: connectionDetails, + containerExec: async (command): Promise => + rpc.execCommandInContainer(command), + }) + + await teardownConnection(connectionDetails) + + return hookResult + } }, - { - post: (data) => worker.publish(data), - on: (data) => { - rpcCallback = data - }, - } - ) + }, + { + post: async (data) => { + const worker = await workerPromise + await worker.available + worker.publish(data) + }, + on: (data) => { + rpcCallback = data + }, + } + ) - const messageHandlerAbortController = new AbortController() - const messageHandlerPromise = Promise.race([ - once(messageHandlerAbortController.signal, "abort"), - (async () => { - for await (const msg of worker.subscribe()) { - rpcCallback!(msg.data) + // todo: properly tear down? + const messageHandlerAbortController = new AbortController() + const messageHandlerPromise = Promise.race([ + once(messageHandlerAbortController.signal, "abort"), + (async () => { + const worker = await workerPromise + await worker.available - if (messageHandlerAbortController.signal.aborted) { - break - } - } - })(), - ]) + for await (const msg of worker.subscribe()) { + rpcCallback!(msg.data) - t.teardown(async () => { - messageHandlerAbortController.abort() - await messageHandlerPromise - }) + if (messageHandlerAbortController.signal.aborted) { + break + } + } + })(), + ]) + const getTestPostgresDatabase: GetTestPostgresDatabase = async ( + t: ExecutionContext, + params: any, + getTestDatabaseOptions?: GetTestPostgresDatabaseOptions + ) => { const testDatabaseConnection = await rpc.getTestDatabase({ // todo: rename? key: getTestDatabaseOptions?.databaseDedupeKey, params, }) + const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails( + testDatabaseConnection.connectionDetails + ) + + t.teardown(async () => { + await teardownConnection(connectionDetails) + }) + return { - ...mapWorkerConnectionDetailsToConnectionDetails( - testDatabaseConnection.connectionDetails - ), + ...connectionDetails, beforeTemplateIsBakedResult: testDatabaseConnection.beforeTemplateIsBakedResult, } diff --git a/src/lib/rpc.ts b/src/lib/rpc.ts index 30c80cb..432f25f 100644 --- a/src/lib/rpc.ts +++ b/src/lib/rpc.ts @@ -4,7 +4,8 @@ import type { ConnectionDetailsFromWorker } from "~/internal-types" export interface TestWorkerFunctions { runBeforeTemplateIsBakedHook: ( - connectionDetails: ConnectionDetailsFromWorker + connectionDetails: ConnectionDetailsFromWorker, + params?: Jsonifiable ) => Promise } diff --git a/src/worker.ts b/src/worker.ts index 030d163..f5f3639 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -8,6 +8,7 @@ import type { SharedWorker } from "ava/plugin" import { type BirpcReturn, type ChannelOptions, createBirpc } from "birpc" import { once } from "node:events" import type { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" +import type { Jsonifiable } from "type-fest" type WorkerRpc = BirpcReturn @@ -86,7 +87,7 @@ export class Worker { if (!this.paramsHashToTemplateCreationPromise.has(paramsHash)) { this.paramsHashToTemplateCreationPromise.set( paramsHash, - this.createTemplate(rpc) + this.createTemplate(rpc, options.params) ) } }) @@ -151,7 +152,7 @@ export class Worker { } } - private async createTemplate(rpc: WorkerRpc) { + private async createTemplate(rpc: WorkerRpc, params?: Jsonifiable) { const databaseName = getRandomDatabaseName() // Create database @@ -160,7 +161,8 @@ export class Worker { await postgresClient.query(`CREATE DATABASE ${databaseName};`) const beforeTemplateIsBakedResult = await rpc.runBeforeTemplateIsBakedHook( - await this.getConnectionDetails(databaseName) + await this.getConnectionDetails(databaseName), + params ) // Disconnect any clients From 9c423b4d35fb901190be28a5195db9c115788782 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 4 Apr 2024 12:01:06 -0700 Subject: [PATCH 4/7] Fix serialization error --- package.json | 2 ++ src/index.ts | 39 +++++++++++++++++++++++++++++++++++++++ yarn.lock | 9 +++++++-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 46324d7..df27c95 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "@semantic-release/git": "10.0.1", "@semantic-release/npm": "9.0.1", "@semantic-release/release-notes-generator": "10.0.3", + "@types/lodash": "4.17.0", "@types/node": "18.7.14", "@types/object-hash": "2.2.1", "@types/pg": "8.6.5", @@ -46,6 +47,7 @@ "dependencies": { "async-mutex": "0.4.0", "birpc": "0.2.17", + "lodash": "4.17.21", "nanoid": "4.0.0", "object-hash": "3.0.0", "pg": "8.8.0" diff --git a/src/index.ts b/src/index.ts index f6b0ef3..bee0bea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,39 @@ import { once } from "node:events" import { createBirpc } from "birpc" import { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" import { ExecResult } from "testcontainers" +import isPlainObject from "lodash/isPlainObject" + +// https://stackoverflow.com/a/30580513 +const isSerializable = (obj: Record): boolean => { + var isNestedSerializable + function isPlain(val: any) { + return ( + typeof val === "undefined" || + typeof val === "string" || + typeof val === "boolean" || + typeof val === "number" || + Array.isArray(val) || + isPlainObject(val) + ) + } + if (!isPlain(obj)) { + return false + } + for (var property in obj) { + if (obj.hasOwnProperty(property)) { + if (!isPlain(obj[property])) { + return false + } + if (typeof obj[property] == "object") { + isNestedSerializable = isSerializable(obj[property]) + if (!isNestedSerializable) { + return false + } + } + } + } + return true +} const getWorker = async ( initialData: InitialWorkerData, @@ -132,6 +165,12 @@ export const getTestPostgresDatabaseFactory = < await teardownConnection(connectionDetails) + if (hookResult && !isSerializable(hookResult)) { + throw new TypeError( + "Return value of beforeTemplateIsBaked() hook could not be serialized. Make sure it returns only JSON-serializable values." + ) + } + return hookResult } }, diff --git a/yarn.lock b/yarn.lock index 212dadd..a38018a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -462,6 +462,11 @@ "@types/docker-modem" "*" "@types/node" "*" +"@types/lodash@4.17.0": + version "4.17.0" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.17.0.tgz#d774355e41f372d5350a4d0714abb48194a489c3" + integrity sha512-t7dhREVv6dbNj0q17X12j7yDG4bD/DHYX7o5/DbDxobP0HnGPgpRz2Ej77aL7TZT3DSw13fqUTj8J4mMnqa7WA== + "@types/minimist@^1.2.0": version "1.2.2" resolved "https://registry.npmjs.org/@types/minimist/-/minimist-1.2.2.tgz" @@ -2893,9 +2898,9 @@ lodash.uniqby@^4.7.0: resolved "https://registry.npmjs.org/lodash.uniqby/-/lodash.uniqby-4.7.0.tgz" integrity sha512-e/zcLx6CSbmaEgFHCA7BnoQKyCtKMxnuWrJygbwPs/AIn+IMKl66L8/s+wBUn5LRw2pZx3bUHibiV1b6aTWIww== -lodash@^4.17.15, lodash@^4.17.21, lodash@^4.17.4: +lodash@4.17.21, lodash@^4.17.15, lodash@^4.17.21, lodash@^4.17.4: version "4.17.21" - resolved "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz" + resolved "https://registry.yarnpkg.com/lodash/-/lodash-4.17.21.tgz#679591c564c3bffaae8454cf0b3df370c3d6911c" integrity sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg== lru-cache@^6.0.0: From bfbb016e3dea526b771cb180794bb11f88fd6276 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 4 Apr 2024 12:06:01 -0700 Subject: [PATCH 5/7] Cleanup --- src/index.ts | 29 +++++++++----------- src/internal-types.ts | 63 +++++++++++-------------------------------- src/lib/rpc.ts | 21 --------------- src/worker.ts | 8 ++++-- 4 files changed, 35 insertions(+), 86 deletions(-) delete mode 100644 src/lib/rpc.ts diff --git a/src/index.ts b/src/index.ts index bee0bea..03b37c7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,8 @@ import path from "node:path" import type { ConnectionDetailsFromWorker, InitialWorkerData, + SharedWorkerFunctions, + TestWorkerFunctions, } from "./internal-types" import type { ConnectionDetails, @@ -16,7 +18,6 @@ import type { Jsonifiable } from "type-fest" import type { ExecutionContext } from "ava" import { once } from "node:events" import { createBirpc } from "birpc" -import { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" import { ExecResult } from "testcontainers" import isPlainObject from "lodash/isPlainObject" @@ -187,23 +188,19 @@ export const getTestPostgresDatabaseFactory = < } ) - // todo: properly tear down? - const messageHandlerAbortController = new AbortController() - const messageHandlerPromise = Promise.race([ - once(messageHandlerAbortController.signal, "abort"), - (async () => { - const worker = await workerPromise - await worker.available + // Automatically cleaned up by AVA since each test file runs in a separate worker + const _messageHandlerPromise = (async () => { + const worker = await workerPromise + await worker.available - for await (const msg of worker.subscribe()) { - rpcCallback!(msg.data) - - if (messageHandlerAbortController.signal.aborted) { - break - } + for await (const msg of worker.subscribe()) { + if (msg.data.type === "teardown") { + console.log("breaking") + break } - })(), - ]) + rpcCallback!(msg.data) + } + })() const getTestPostgresDatabase: GetTestPostgresDatabase = async ( t: ExecutionContext, diff --git a/src/internal-types.ts b/src/internal-types.ts index 7d7807a..028ccad 100644 --- a/src/internal-types.ts +++ b/src/internal-types.ts @@ -1,5 +1,6 @@ -import type { ExecResult, StartedNetwork } from "testcontainers" +import type { ExecResult } from "testcontainers" import type { Jsonifiable } from "type-fest" + import type { ConnectionDetails, GetTestPostgresDatabaseFactoryOptions, @@ -13,52 +14,20 @@ export interface InitialWorkerData { export type ConnectionDetailsFromWorker = Omit -export interface RequestDatabaseFromWorkerMessage { - type: "GET_TEST_DATABASE" - key?: string - params?: Jsonifiable -} - -export interface RequestBeforeTemplateIsBakedHookToBeRunMessage { - type: "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED" - connectionDetails: ConnectionDetailsFromWorker -} - -export interface FinishedRunningBeforeTemplateIsBakedHookMessage { - type: "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED" - result: - | { - status: "success" - result: any - } - | { - status: "error" - error: Error | string - } +export interface TestWorkerFunctions { + runBeforeTemplateIsBakedHook: ( + connectionDetails: ConnectionDetailsFromWorker, + params?: Jsonifiable + ) => Promise } -export interface ExecCommandInContainerMessage { - type: "EXEC_COMMAND_IN_CONTAINER" - command: string[] +export interface SharedWorkerFunctions { + getTestDatabase: (options: { + key?: string + params?: Jsonifiable + }) => Promise<{ + connectionDetails: ConnectionDetailsFromWorker + beforeTemplateIsBakedResult: unknown + }> + execCommandInContainer: (command: string[]) => Promise } - -export interface ExecCommandInContainerResultMessage { - type: "EXEC_COMMAND_IN_CONTAINER_RESULT" - result: ExecResult -} - -export interface GotDatabaseMessage { - type: "GOT_DATABASE" - connectionDetails: ConnectionDetailsFromWorker - beforeTemplateIsBakedResult: FinishedRunningBeforeTemplateIsBakedHookMessage["result"] -} - -export type MessageToWorker = - | RequestDatabaseFromWorkerMessage - | FinishedRunningBeforeTemplateIsBakedHookMessage - | ExecCommandInContainerMessage -export type MessageFromWorker = - | RequestBeforeTemplateIsBakedHookToBeRunMessage - | GotDatabaseMessage - | ExecCommandInContainerResultMessage -export type WorkerMessage = MessageToWorker | MessageFromWorker diff --git a/src/lib/rpc.ts b/src/lib/rpc.ts deleted file mode 100644 index 432f25f..0000000 --- a/src/lib/rpc.ts +++ /dev/null @@ -1,21 +0,0 @@ -import type { ExecResult } from "testcontainers" -import type { Jsonifiable } from "type-fest" -import type { ConnectionDetailsFromWorker } from "~/internal-types" - -export interface TestWorkerFunctions { - runBeforeTemplateIsBakedHook: ( - connectionDetails: ConnectionDetailsFromWorker, - params?: Jsonifiable - ) => Promise -} - -export interface SharedWorkerFunctions { - getTestDatabase: (options: { - key?: string - params?: Jsonifiable - }) => Promise<{ - connectionDetails: ConnectionDetailsFromWorker - beforeTemplateIsBakedResult: unknown - }> - execCommandInContainer: (command: string[]) => Promise -} diff --git a/src/worker.ts b/src/worker.ts index f5f3639..ad6b07f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -2,12 +2,15 @@ import pg from "pg" import { GenericContainer, Network } from "testcontainers" import { Mutex } from "async-mutex" import hash from "object-hash" -import type { InitialWorkerData } from "./internal-types" +import type { + InitialWorkerData, + SharedWorkerFunctions, + TestWorkerFunctions, +} from "./internal-types" import getRandomDatabaseName from "./lib/get-random-database-name" import type { SharedWorker } from "ava/plugin" import { type BirpcReturn, type ChannelOptions, createBirpc } from "birpc" import { once } from "node:events" -import type { SharedWorkerFunctions, TestWorkerFunctions } from "./lib/rpc" import type { Jsonifiable } from "type-fest" type WorkerRpc = BirpcReturn @@ -54,6 +57,7 @@ export class Worker { testWorker.teardown(async () => { messageHandlerAbortController.abort() await messageHandlerPromise + testWorker.publish({ type: "teardown" }) }) const rpc: WorkerRpc = createBirpc< From 2b97eaf5d7973bad5c37b6fc72bdaf9ceb92020d Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 4 Apr 2024 12:11:05 -0700 Subject: [PATCH 6/7] Rename --- src/index.ts | 3 +-- src/internal-types.ts | 2 +- src/worker.ts | 11 +++++++---- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/index.ts b/src/index.ts index 03b37c7..b83229d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -208,8 +208,7 @@ export const getTestPostgresDatabaseFactory = < getTestDatabaseOptions?: GetTestPostgresDatabaseOptions ) => { const testDatabaseConnection = await rpc.getTestDatabase({ - // todo: rename? - key: getTestDatabaseOptions?.databaseDedupeKey, + databaseDedupeKey: getTestDatabaseOptions?.databaseDedupeKey, params, }) diff --git a/src/internal-types.ts b/src/internal-types.ts index 028ccad..2700f0e 100644 --- a/src/internal-types.ts +++ b/src/internal-types.ts @@ -23,7 +23,7 @@ export interface TestWorkerFunctions { export interface SharedWorkerFunctions { getTestDatabase: (options: { - key?: string + databaseDedupeKey?: string params?: Jsonifiable }) => Promise<{ connectionDetails: ConnectionDetailsFromWorker diff --git a/src/worker.ts b/src/worker.ts index ad6b07f..d94783d 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -105,9 +105,9 @@ export class Worker { const { postgresClient } = await this.startContainerPromise // Only relevant when a `key` is provided - const fullDatabaseKey = `${paramsHash}-${options.key}` + const fullDatabaseKey = `${paramsHash}-${options.databaseDedupeKey}` - let databaseName = options.key + let databaseName = options.databaseDedupeKey ? this.keyToDatabaseName.get(fullDatabaseKey) : undefined if (!databaseName) { @@ -118,7 +118,7 @@ export class Worker { ) } - if (options.key) { + if (options.databaseDedupeKey) { await this.getOrCreateKeyToCreationMutex.runExclusive(() => { if (!this.keyToCreationMutex.has(fullDatabaseKey)) { this.keyToCreationMutex.set(fullDatabaseKey, new Mutex()) @@ -142,7 +142,10 @@ export class Worker { registerTeardown(async () => { // Don't remove keyed databases - if (options.key && this.keyToDatabaseName.has(fullDatabaseKey)) { + if ( + options.databaseDedupeKey && + this.keyToDatabaseName.has(fullDatabaseKey) + ) { return } From 3e3118025bf672026d8d19fd2194f5219487318f Mon Sep 17 00:00:00 2001 From: Max Isom Date: Thu, 4 Apr 2024 16:02:56 -0700 Subject: [PATCH 7/7] Cleanup --- src/index.ts | 4 ---- src/worker.ts | 1 - 2 files changed, 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index b83229d..0d08122 100644 --- a/src/index.ts +++ b/src/index.ts @@ -194,10 +194,6 @@ export const getTestPostgresDatabaseFactory = < await worker.available for await (const msg of worker.subscribe()) { - if (msg.data.type === "teardown") { - console.log("breaking") - break - } rpcCallback!(msg.data) } })() diff --git a/src/worker.ts b/src/worker.ts index d94783d..8130e6f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -57,7 +57,6 @@ export class Worker { testWorker.teardown(async () => { messageHandlerAbortController.abort() await messageHandlerPromise - testWorker.publish({ type: "teardown" }) }) const rpc: WorkerRpc = createBirpc<