Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use birpc #28

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/npm-semantic-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: "16.x"
node-version: "18.x"
- name: Install dependencies
run: yarn install
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/prettier.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: 16
node-version: 18
- name: Install dependencies
run: yarn install
- name: Run Prettier Test
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ jobs:
name: Run NPM Test
strategy:
matrix:
node-version: [14.x, 16.x, 18.x]
postgres-version: [13, 14, 15]
node-version: [18.x, 20.x]
postgres-version: [14, 15, 16]
runs-on: ubuntu-20.04
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/type-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: 16
node-version: 18
cache: "yarn"
- name: Run NPM Install
run: yarn install --frozen-lockfile
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@semantic-release/git": "10.0.1",
"@semantic-release/npm": "9.0.1",
"@semantic-release/release-notes-generator": "10.0.3",
"@types/lodash": "4.17.0",
"@types/node": "18.7.14",
"@types/object-hash": "2.2.1",
"@types/pg": "8.6.5",
Expand All @@ -45,6 +46,8 @@
},
"dependencies": {
"async-mutex": "0.4.0",
"birpc": "0.2.17",
"lodash": "4.17.21",
"nanoid": "4.0.0",
"object-hash": "3.0.0",
"pg": "8.8.0"
Expand All @@ -59,5 +62,8 @@
"build": "tsup",
"format": "prettier -w .",
"format:check": "prettier --check ."
},
"engines": {
"node": ">=18"
}
}
278 changes: 134 additions & 144 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
import { registerSharedWorker, SharedWorker } from "ava/plugin"
import { registerSharedWorker } from "ava/plugin"
import hash from "object-hash"
import path from "node:path"
import {
import type {
ConnectionDetailsFromWorker,
FinishedRunningBeforeTemplateIsBakedHookMessage,
InitialWorkerData,
MessageFromWorker,
MessageToWorker,
SharedWorkerFunctions,
TestWorkerFunctions,
} from "./internal-types"
import {
import type {
ConnectionDetails,
GetTestPostgresDatabase,
GetTestPostgresDatabaseFactoryOptions,
GetTestPostgresDatabaseOptions,
GetTestPostgresDatabaseResult,
} from "./public-types"
import { Pool } from "pg"
import { Jsonifiable } from "type-fest"
import { StartedNetwork } from "testcontainers"
import { ExecutionContext } from "ava"
import type { Jsonifiable } from "type-fest"
import type { ExecutionContext } from "ava"
import { once } from "node:events"
import { createBirpc } from "birpc"
import { ExecResult } from "testcontainers"
import isPlainObject from "lodash/isPlainObject"

// https://stackoverflow.com/a/30580513
const isSerializable = (obj: Record<any, any>): boolean => {
var isNestedSerializable
function isPlain(val: any) {
return (
typeof val === "undefined" ||
typeof val === "string" ||
typeof val === "boolean" ||
typeof val === "number" ||
Array.isArray(val) ||
isPlainObject(val)
)
}
if (!isPlain(obj)) {
return false
}
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
if (!isPlain(obj[property])) {
return false
}
if (typeof obj[property] == "object") {
isNestedSerializable = isSerializable(obj[property])
if (!isNestedSerializable) {
return false
}
}
}
}
return true
}

const getWorker = async (
initialData: InitialWorkerData,
Expand Down Expand Up @@ -50,6 +83,24 @@ const getWorker = async (
})
}

const teardownConnection = async ({
pool,
pgbouncerPool,
}: ConnectionDetails) => {
try {
await pool.end()
await pgbouncerPool?.end()
} catch (error) {
if (
(error as Error).message.includes("Called end on pool more than once")
) {
return
}

throw error
}
}

export const getTestPostgresDatabaseFactory = <
Params extends Jsonifiable = never
>(
Expand All @@ -63,71 +114,34 @@ export const getTestPostgresDatabaseFactory = <

const workerPromise = getWorker(initialData, options as any)

const getTestPostgresDatabase: GetTestPostgresDatabase<Params> = async (
t: ExecutionContext,
params: any,
getTestDatabaseOptions?: GetTestPostgresDatabaseOptions
) => {
const mapWorkerConnectionDetailsToConnectionDetails = (
connectionDetailsFromWorker: ConnectionDetailsFromWorker
): ConnectionDetails => {
const pool = new Pool({
connectionString: connectionDetailsFromWorker.connectionString,
})

let pgbouncerPool: Pool | undefined
if (connectionDetailsFromWorker.pgbouncerConnectionString) {
pgbouncerPool = new Pool({
connectionString:
connectionDetailsFromWorker.pgbouncerConnectionString,
})
}

t.teardown(async () => {
try {
await pool.end()
await pgbouncerPool?.end()
} catch (error) {
if (
(error as Error).message.includes(
"Called end on pool more than once"
)
) {
return
}
const mapWorkerConnectionDetailsToConnectionDetails = (
connectionDetailsFromWorker: ConnectionDetailsFromWorker
): ConnectionDetails => {
const pool = new Pool({
connectionString: connectionDetailsFromWorker.connectionString,
})

throw error
}
let pgbouncerPool: Pool | undefined
if (connectionDetailsFromWorker.pgbouncerConnectionString) {
pgbouncerPool = new Pool({
connectionString: connectionDetailsFromWorker.pgbouncerConnectionString,
})

return {
...connectionDetailsFromWorker,
pool,
pgbouncerPool,
}
}

const worker = await workerPromise
await worker.available

const waitForAndHandleReply = async (
message: SharedWorker.Plugin.PublishedMessage
): Promise<GetTestPostgresDatabaseResult> => {
let reply = await message.replies().next()
const replyData: MessageFromWorker = reply.value.data

if (replyData.type === "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED") {
let result: FinishedRunningBeforeTemplateIsBakedHookMessage["result"] =
{
status: "success",
result: undefined,
}
return {
...connectionDetailsFromWorker,
pool,
pgbouncerPool,
}
}

let rpcCallback: (data: any) => void
const rpc = createBirpc<SharedWorkerFunctions, TestWorkerFunctions>(
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the idea meant to establish a bidirectional communication with the postgres docker container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite

AVA's architecture has one worker thread per test file (called a test worker) and one worker thread per "shared worker" (called... a shared worker)

previous to this PR, we implemented 2-way RPC calls between the test worker and shared workers ourselves, but using birpc really makes the intent and logic cleaner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, this is to communicate with ava's worker thread

runBeforeTemplateIsBakedHook: async (connection, params) => {
if (options?.beforeTemplateIsBaked) {
const connectionDetails =
mapWorkerConnectionDetailsToConnectionDetails(
replyData.connectionDetails
)
mapWorkerConnectionDetailsToConnectionDetails(connection)

// Ignore if the pool is terminated by the shared worker
// (This happens in CI for some reason even though we drain the pool first.)
Expand All @@ -143,94 +157,70 @@ export const getTestPostgresDatabaseFactory = <
throw error
})

try {
const hookResult = await options.beforeTemplateIsBaked({
params,
connection: connectionDetails,
containerExec: async (command) => {
const request = reply.value.reply({
type: "EXEC_COMMAND_IN_CONTAINER",
command,
})

reply = await request.replies().next()

if (
reply.value.data.type !== "EXEC_COMMAND_IN_CONTAINER_RESULT"
) {
throw new Error(
"Expected EXEC_COMMAND_IN_CONTAINER_RESULT message"
)
}

return reply.value.data.result
},
})

result = {
status: "success",
result: hookResult,
}
} catch (error) {
result = {
status: "error",
error:
error instanceof Error
? error.stack ?? error.message
: new Error(
"Unknown error type thrown in beforeTemplateIsBaked hook"
),
}
} finally {
// Otherwise connection will be killed by worker when converting to template
await connectionDetails.pool.end()
}
}
const hookResult = await options.beforeTemplateIsBaked({
params: params as any,
connection: connectionDetails,
containerExec: async (command): Promise<ExecResult> =>
rpc.execCommandInContainer(command),
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you weren't kidding this is definitely way simpler


try {
return waitForAndHandleReply(
reply.value.reply({
type: "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED",
result,
} as MessageToWorker)
)
} catch (error) {
if (error instanceof Error && error.name === "DataCloneError") {
await teardownConnection(connectionDetails)

if (hookResult && !isSerializable(hookResult)) {
throw new TypeError(
"Return value of beforeTemplateIsBaked() hook could not be serialized. Make sure it returns only JSON-serializable values."
)
}

throw error
}
} else if (replyData.type === "GOT_DATABASE") {
if (replyData.beforeTemplateIsBakedResult.status === "error") {
if (typeof replyData.beforeTemplateIsBakedResult.error === "string") {
throw new Error(replyData.beforeTemplateIsBakedResult.error)
}

throw replyData.beforeTemplateIsBakedResult.error
return hookResult
}
},
},
{
post: async (data) => {
const worker = await workerPromise
await worker.available
worker.publish(data)
},
on: (data) => {
rpcCallback = data
},
}
)

return {
...mapWorkerConnectionDetailsToConnectionDetails(
replyData.connectionDetails
),
beforeTemplateIsBakedResult:
replyData.beforeTemplateIsBakedResult.result,
}
}
// Automatically cleaned up by AVA since each test file runs in a separate worker
const _messageHandlerPromise = (async () => {
const worker = await workerPromise
await worker.available

throw new Error(`Unexpected message type: ${replyData.type}`)
for await (const msg of worker.subscribe()) {
rpcCallback!(msg.data)
}
})()

return waitForAndHandleReply(
worker.publish({
type: "GET_TEST_DATABASE",
params,
key: getTestDatabaseOptions?.databaseDedupeKey,
} as MessageToWorker)
const getTestPostgresDatabase: GetTestPostgresDatabase<Params> = async (
t: ExecutionContext,
params: any,
getTestDatabaseOptions?: GetTestPostgresDatabaseOptions
) => {
const testDatabaseConnection = await rpc.getTestDatabase({
databaseDedupeKey: getTestDatabaseOptions?.databaseDedupeKey,
params,
})

const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails(
testDatabaseConnection.connectionDetails
)

t.teardown(async () => {
await teardownConnection(connectionDetails)
})

return {
...connectionDetails,
beforeTemplateIsBakedResult:
testDatabaseConnection.beforeTemplateIsBakedResult,
}
}

return getTestPostgresDatabase
Expand Down
Loading
Loading