Skip to content

Commit

Permalink
feat: Merge pull request #28 from seamapi/feat-use-birpc
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb authored Apr 5, 2024
2 parents f32f204 + 3e31180 commit ffcbb79
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 358 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/npm-semantic-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: "16.x"
node-version: "18.x"
- name: Install dependencies
run: yarn install
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/prettier.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: 16
node-version: 18
- name: Install dependencies
run: yarn install
- name: Run Prettier Test
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ jobs:
name: Run NPM Test
strategy:
matrix:
node-version: [14.x, 16.x, 18.x]
postgres-version: [13, 14, 15]
node-version: [18.x, 20.x]
postgres-version: [14, 15, 16]
runs-on: ubuntu-20.04
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/type-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: 16
node-version: 18
cache: "yarn"
- name: Run NPM Install
run: yarn install --frozen-lockfile
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@semantic-release/git": "10.0.1",
"@semantic-release/npm": "9.0.1",
"@semantic-release/release-notes-generator": "10.0.3",
"@types/lodash": "4.17.0",
"@types/node": "18.7.14",
"@types/object-hash": "2.2.1",
"@types/pg": "8.6.5",
Expand All @@ -45,6 +46,8 @@
},
"dependencies": {
"async-mutex": "0.4.0",
"birpc": "0.2.17",
"lodash": "4.17.21",
"nanoid": "4.0.0",
"object-hash": "3.0.0",
"pg": "8.8.0"
Expand All @@ -59,5 +62,8 @@
"build": "tsup",
"format": "prettier -w .",
"format:check": "prettier --check ."
},
"engines": {
"node": ">=18"
}
}
278 changes: 134 additions & 144 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
import { registerSharedWorker, SharedWorker } from "ava/plugin"
import { registerSharedWorker } from "ava/plugin"
import hash from "object-hash"
import path from "node:path"
import {
import type {
ConnectionDetailsFromWorker,
FinishedRunningBeforeTemplateIsBakedHookMessage,
InitialWorkerData,
MessageFromWorker,
MessageToWorker,
SharedWorkerFunctions,
TestWorkerFunctions,
} from "./internal-types"
import {
import type {
ConnectionDetails,
GetTestPostgresDatabase,
GetTestPostgresDatabaseFactoryOptions,
GetTestPostgresDatabaseOptions,
GetTestPostgresDatabaseResult,
} from "./public-types"
import { Pool } from "pg"
import { Jsonifiable } from "type-fest"
import { StartedNetwork } from "testcontainers"
import { ExecutionContext } from "ava"
import type { Jsonifiable } from "type-fest"
import type { ExecutionContext } from "ava"
import { once } from "node:events"
import { createBirpc } from "birpc"
import { ExecResult } from "testcontainers"
import isPlainObject from "lodash/isPlainObject"

// https://stackoverflow.com/a/30580513
const isSerializable = (obj: Record<any, any>): boolean => {
var isNestedSerializable
function isPlain(val: any) {
return (
typeof val === "undefined" ||
typeof val === "string" ||
typeof val === "boolean" ||
typeof val === "number" ||
Array.isArray(val) ||
isPlainObject(val)
)
}
if (!isPlain(obj)) {
return false
}
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
if (!isPlain(obj[property])) {
return false
}
if (typeof obj[property] == "object") {
isNestedSerializable = isSerializable(obj[property])
if (!isNestedSerializable) {
return false
}
}
}
}
return true
}

const getWorker = async (
initialData: InitialWorkerData,
Expand Down Expand Up @@ -50,6 +83,24 @@ const getWorker = async (
})
}

const teardownConnection = async ({
pool,
pgbouncerPool,
}: ConnectionDetails) => {
try {
await pool.end()
await pgbouncerPool?.end()
} catch (error) {
if (
(error as Error).message.includes("Called end on pool more than once")
) {
return
}

throw error
}
}

export const getTestPostgresDatabaseFactory = <
Params extends Jsonifiable = never
>(
Expand All @@ -63,71 +114,34 @@ export const getTestPostgresDatabaseFactory = <

const workerPromise = getWorker(initialData, options as any)

const getTestPostgresDatabase: GetTestPostgresDatabase<Params> = async (
t: ExecutionContext,
params: any,
getTestDatabaseOptions?: GetTestPostgresDatabaseOptions
) => {
const mapWorkerConnectionDetailsToConnectionDetails = (
connectionDetailsFromWorker: ConnectionDetailsFromWorker
): ConnectionDetails => {
const pool = new Pool({
connectionString: connectionDetailsFromWorker.connectionString,
})

let pgbouncerPool: Pool | undefined
if (connectionDetailsFromWorker.pgbouncerConnectionString) {
pgbouncerPool = new Pool({
connectionString:
connectionDetailsFromWorker.pgbouncerConnectionString,
})
}

t.teardown(async () => {
try {
await pool.end()
await pgbouncerPool?.end()
} catch (error) {
if (
(error as Error).message.includes(
"Called end on pool more than once"
)
) {
return
}
const mapWorkerConnectionDetailsToConnectionDetails = (
connectionDetailsFromWorker: ConnectionDetailsFromWorker
): ConnectionDetails => {
const pool = new Pool({
connectionString: connectionDetailsFromWorker.connectionString,
})

throw error
}
let pgbouncerPool: Pool | undefined
if (connectionDetailsFromWorker.pgbouncerConnectionString) {
pgbouncerPool = new Pool({
connectionString: connectionDetailsFromWorker.pgbouncerConnectionString,
})

return {
...connectionDetailsFromWorker,
pool,
pgbouncerPool,
}
}

const worker = await workerPromise
await worker.available

const waitForAndHandleReply = async (
message: SharedWorker.Plugin.PublishedMessage
): Promise<GetTestPostgresDatabaseResult> => {
let reply = await message.replies().next()
const replyData: MessageFromWorker = reply.value.data

if (replyData.type === "RUN_HOOK_BEFORE_TEMPLATE_IS_BAKED") {
let result: FinishedRunningBeforeTemplateIsBakedHookMessage["result"] =
{
status: "success",
result: undefined,
}
return {
...connectionDetailsFromWorker,
pool,
pgbouncerPool,
}
}

let rpcCallback: (data: any) => void
const rpc = createBirpc<SharedWorkerFunctions, TestWorkerFunctions>(
{
runBeforeTemplateIsBakedHook: async (connection, params) => {
if (options?.beforeTemplateIsBaked) {
const connectionDetails =
mapWorkerConnectionDetailsToConnectionDetails(
replyData.connectionDetails
)
mapWorkerConnectionDetailsToConnectionDetails(connection)

// Ignore if the pool is terminated by the shared worker
// (This happens in CI for some reason even though we drain the pool first.)
Expand All @@ -143,94 +157,70 @@ export const getTestPostgresDatabaseFactory = <
throw error
})

try {
const hookResult = await options.beforeTemplateIsBaked({
params,
connection: connectionDetails,
containerExec: async (command) => {
const request = reply.value.reply({
type: "EXEC_COMMAND_IN_CONTAINER",
command,
})

reply = await request.replies().next()

if (
reply.value.data.type !== "EXEC_COMMAND_IN_CONTAINER_RESULT"
) {
throw new Error(
"Expected EXEC_COMMAND_IN_CONTAINER_RESULT message"
)
}

return reply.value.data.result
},
})

result = {
status: "success",
result: hookResult,
}
} catch (error) {
result = {
status: "error",
error:
error instanceof Error
? error.stack ?? error.message
: new Error(
"Unknown error type thrown in beforeTemplateIsBaked hook"
),
}
} finally {
// Otherwise connection will be killed by worker when converting to template
await connectionDetails.pool.end()
}
}
const hookResult = await options.beforeTemplateIsBaked({
params: params as any,
connection: connectionDetails,
containerExec: async (command): Promise<ExecResult> =>
rpc.execCommandInContainer(command),
})

try {
return waitForAndHandleReply(
reply.value.reply({
type: "FINISHED_RUNNING_HOOK_BEFORE_TEMPLATE_IS_BAKED",
result,
} as MessageToWorker)
)
} catch (error) {
if (error instanceof Error && error.name === "DataCloneError") {
await teardownConnection(connectionDetails)

if (hookResult && !isSerializable(hookResult)) {
throw new TypeError(
"Return value of beforeTemplateIsBaked() hook could not be serialized. Make sure it returns only JSON-serializable values."
)
}

throw error
}
} else if (replyData.type === "GOT_DATABASE") {
if (replyData.beforeTemplateIsBakedResult.status === "error") {
if (typeof replyData.beforeTemplateIsBakedResult.error === "string") {
throw new Error(replyData.beforeTemplateIsBakedResult.error)
}

throw replyData.beforeTemplateIsBakedResult.error
return hookResult
}
},
},
{
post: async (data) => {
const worker = await workerPromise
await worker.available
worker.publish(data)
},
on: (data) => {
rpcCallback = data
},
}
)

return {
...mapWorkerConnectionDetailsToConnectionDetails(
replyData.connectionDetails
),
beforeTemplateIsBakedResult:
replyData.beforeTemplateIsBakedResult.result,
}
}
// Automatically cleaned up by AVA since each test file runs in a separate worker
const _messageHandlerPromise = (async () => {
const worker = await workerPromise
await worker.available

throw new Error(`Unexpected message type: ${replyData.type}`)
for await (const msg of worker.subscribe()) {
rpcCallback!(msg.data)
}
})()

return waitForAndHandleReply(
worker.publish({
type: "GET_TEST_DATABASE",
params,
key: getTestDatabaseOptions?.databaseDedupeKey,
} as MessageToWorker)
const getTestPostgresDatabase: GetTestPostgresDatabase<Params> = async (
t: ExecutionContext,
params: any,
getTestDatabaseOptions?: GetTestPostgresDatabaseOptions
) => {
const testDatabaseConnection = await rpc.getTestDatabase({
databaseDedupeKey: getTestDatabaseOptions?.databaseDedupeKey,
params,
})

const connectionDetails = mapWorkerConnectionDetailsToConnectionDetails(
testDatabaseConnection.connectionDetails
)

t.teardown(async () => {
await teardownConnection(connectionDetails)
})

return {
...connectionDetails,
beforeTemplateIsBakedResult:
testDatabaseConnection.beforeTemplateIsBakedResult,
}
}

return getTestPostgresDatabase
Expand Down
Loading

0 comments on commit ffcbb79

Please sign in to comment.