From c82b5e469dc484e516c1105f04b78f07876e64b8 Mon Sep 17 00:00:00 2001 From: Hassan Abdel-Rahman Date: Wed, 15 Jan 2025 14:49:04 -0500 Subject: [PATCH] Add job priority support and prioritize user initiated indexing requests --- README.md | 4 +- packages/host/app/lib/browser-queue.ts | 25 ++-- ...44_schema.sql => 1736950557343_schema.sql} | 0 .../1736950557343_queue-priority.js | 16 ++ packages/postgres/pg-queue.ts | 141 +++++++++++------- .../realm-server/scripts/start-worker-base.sh | 1 - .../scripts/start-worker-development.sh | 3 +- .../scripts/start-worker-production.sh | 3 +- .../scripts/start-worker-staging.sh | 3 +- packages/realm-server/server.ts | 2 +- packages/realm-server/tests/helpers/index.ts | 2 +- packages/realm-server/tests/queue-test.ts | 122 +++++++++++++-- packages/realm-server/tsconfig.json | 2 +- packages/realm-server/worker-manager.ts | 51 +++++-- packages/realm-server/worker.ts | 10 +- packages/runtime-common/queue.ts | 16 +- .../runtime-common/realm-index-updater.ts | 39 +++-- packages/runtime-common/realm.ts | 10 +- packages/runtime-common/tests/queue-test.ts | 35 ++++- 19 files changed, 357 insertions(+), 128 deletions(-) rename packages/host/config/schema/{1735832183444_schema.sql => 1736950557343_schema.sql} (100%) create mode 100644 packages/postgres/migrations/1736950557343_queue-priority.js diff --git a/README.md b/README.md index 1391391eb4..2f45eb8e22 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ Live reloads are not available in this mode, however, if you use start the serve #### Using `start:all` -Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_COUNT` to add additional workers. By default there is 1 worker for each realm server. Here's what is spun up with `start:all`: +Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_HIGH_PRIORITY_COUNT` to add additional workers that service only user initiated requests and `WORKER_ALL_PRIORITY_COUNT` to add workers that service all jobs (system or user initiated). By default there is 1 all priority worker for each realm server. Here's what is spun up with `start:all`: | Port | Description | Running `start:all` | Running `start:base` | | ----- | ------------------------------------------------------------- | ------------------- | -------------------- | @@ -123,7 +123,7 @@ When running tests we isolate the database between each test run by actually cre If you wish to drop the development databases you can execute: ``` -pnpm drop-all-dbs +pnpm full-reset ``` You can then run `pnpm migrate up` (with `PGDATABASE` set accordingly if you want to migrate a database other than `boxel`) or just start the realm server (`pnpm start:all`) to create the database again. diff --git a/packages/host/app/lib/browser-queue.ts b/packages/host/app/lib/browser-queue.ts index cf2517bc6a..7256de6c74 100644 --- a/packages/host/app/lib/browser-queue.ts +++ b/packages/host/app/lib/browser-queue.ts @@ -21,7 +21,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner { private jobs: { jobId: number; jobType: string; - arg: PgPrimitive; + args: PgPrimitive; notifier: Deferred; }[] = []; private types: Map Promise> = new Map(); @@ -50,12 +50,17 @@ export class BrowserQueue implements QueuePublisher, QueueRunner { this.debouncedDrainJobs(); } - async publish( - jobType: string, - _concurrencyGroup: string | null, - _timeout: number, - arg: PgPrimitive, - ): Promise> { + async publish({ + jobType, + concurrencyGroup: _concurrencyGroup, + timeout: _timeout, + args, + }: { + jobType: string; + concurrencyGroup: string | null; + timeout: number; + args: PgPrimitive; + }): Promise> { if (this.isDestroyed) { throw new Error(`Cannot publish job on a destroyed Queue`); } @@ -66,7 +71,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner { jobId, notifier, jobType, - arg, + args, }); this.debouncedDrainJobs(); return job; @@ -84,7 +89,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner { let jobs = [...this.jobs]; this.jobs = []; for (let workItem of jobs) { - let { jobId, jobType, notifier, arg } = workItem; + let { jobId, jobType, notifier, args } = workItem; let handler = this.types.get(jobType); if (!handler) { // no handler for this job, add it back to the queue @@ -96,7 +101,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner { this.onBeforeJob(jobId); } try { - notifier.fulfill(await handler(arg)); + notifier.fulfill(await handler(args)); } catch (e: any) { notifier.reject(e); } diff --git a/packages/host/config/schema/1735832183444_schema.sql b/packages/host/config/schema/1736950557343_schema.sql similarity index 100% rename from packages/host/config/schema/1735832183444_schema.sql rename to packages/host/config/schema/1736950557343_schema.sql diff --git a/packages/postgres/migrations/1736950557343_queue-priority.js b/packages/postgres/migrations/1736950557343_queue-priority.js new file mode 100644 index 0000000000..3a787ec1f8 --- /dev/null +++ b/packages/postgres/migrations/1736950557343_queue-priority.js @@ -0,0 +1,16 @@ +exports.up = (pgm) => { + pgm.sql('delete from job_reservations'); + pgm.sql('delete from jobs'); + + pgm.addColumns('jobs', { + priority: { type: 'integer', notNull: true, default: 0 }, + }); + pgm.createIndex('jobs', 'priority'); +}; + +exports.down = (pgm) => { + pgm.dropIndex('jobs', 'priority'); + pgm.dropColumns('jobs', { + priority: 'integer', + }); +}; diff --git a/packages/postgres/pg-queue.ts b/packages/postgres/pg-queue.ts index 0e1c7cce9b..b8ed2232be 100644 --- a/packages/postgres/pg-queue.ts +++ b/packages/postgres/pg-queue.ts @@ -24,6 +24,7 @@ interface JobsTable { job_type: string; concurrency_group: string | null; timeout: number; + priority: number; args: Record; status: 'unfulfilled' | 'resolved' | 'rejected'; created_at: Date; @@ -107,10 +108,9 @@ class WorkLoop { export class PgQueuePublisher implements QueuePublisher { #isDestroyed = false; #pgClient: PgAdapter; - - private pollInterval = 10000; - private notifiers: Map> = new Map(); - private notificationRunner: WorkLoop | undefined; + #pollInterval = 10000; + #notifiers: Map> = new Map(); + #notificationRunner: WorkLoop | undefined; constructor(pgClient: PgAdapter) { this.#pgClient = pgClient; @@ -121,12 +121,12 @@ export class PgQueuePublisher implements QueuePublisher { } private addNotifier(id: number, n: Deferred) { - if (!this.notificationRunner && !this.#isDestroyed) { - this.notificationRunner = new WorkLoop( + if (!this.#notificationRunner && !this.#isDestroyed) { + this.#notificationRunner = new WorkLoop( 'notificationRunner', - this.pollInterval, + this.#pollInterval, ); - this.notificationRunner.run(async (loop) => { + this.#notificationRunner.run(async (loop) => { await this.#pgClient.listen( 'jobs_finished', loop.wake.bind(loop), @@ -139,12 +139,12 @@ export class PgQueuePublisher implements QueuePublisher { ); }); } - this.notifiers.set(id, n); + this.#notifiers.set(id, n); } private async drainNotifications(loop: WorkLoop) { while (!loop.shuttingDown) { - let waitingIds = [...this.notifiers.keys()]; + let waitingIds = [...this.#notifiers.keys()]; log.debug('jobs waiting for notification: %s', waitingIds); let result = (await this.#query([ `SELECT id, status, result FROM jobs WHERE status != 'unfulfilled' AND (`, @@ -163,8 +163,8 @@ export class PgQueuePublisher implements QueuePublisher { ); // "!" because we only searched for rows matching our notifiers Map, and // we are the only code that deletes from that Map. - let notifier = this.notifiers.get(row.id)!; - this.notifiers.delete(row.id); + let notifier = this.#notifiers.get(row.id)!; + this.#notifiers.delete(row.id); if (row.status === 'resolved') { notifier.fulfill(row.result); } else { @@ -174,20 +174,28 @@ export class PgQueuePublisher implements QueuePublisher { } } - async publish( - jobType: string, - concurrencyGroup: string | null, - timeout: number, // in seconds - args: PgPrimitive, - ): Promise> { + async publish({ + jobType, + concurrencyGroup, + timeout, // in seconds + priority = 0, + args, + }: { + jobType: string; + concurrencyGroup: string | null; + priority?: number; + timeout: number; // in seconds + args: PgPrimitive; + }): Promise> { let { nameExpressions, valueExpressions } = asExpressions({ args, job_type: jobType, concurrency_group: concurrencyGroup, + priority, timeout, } as Pick< JobsTable, - 'args' | 'job_type' | 'concurrency_group' | 'timeout' + 'args' | 'job_type' | 'concurrency_group' | 'timeout' | 'priority' >); let [{ id: jobId }] = (await this.#query([ 'INSERT INTO JOBS', @@ -206,34 +214,52 @@ export class PgQueuePublisher implements QueuePublisher { async destroy() { this.#isDestroyed = true; - if (this.notificationRunner) { - await this.notificationRunner.shutDown(); + if (this.#notificationRunner) { + await this.#notificationRunner.shutDown(); } } } export class PgQueueRunner implements QueueRunner { #isDestroyed = false; + #pgClient: PgAdapter; + #workerId: string; + #maxTimeoutSec: number; + #pollInterval = 10000; + #handlers: Map = new Map(); + #jobRunner: WorkLoop | undefined; + #priority: number; + + constructor({ + adapter, + workerId, + maxTimeoutSec = 10 * 60, + priority = 0, + }: { + adapter: PgAdapter; + workerId: string; + priority?: number; + maxTimeoutSec?: number; + }) { + this.#pgClient = adapter; + this.#workerId = workerId; + this.#maxTimeoutSec = maxTimeoutSec; + this.#priority = priority; + } - private pollInterval = 10000; - private handlers: Map = new Map(); - private jobRunner: WorkLoop | undefined; - - constructor( - private pgClient: PgAdapter, - private workerId: string, - private maxTimeoutSec = 10 * 60, - ) {} + get priority() { + return this.#priority; + } register(jobType: string, handler: (arg: A) => Promise) { - this.handlers.set(jobType, handler); + this.#handlers.set(jobType, handler); } async start() { - if (!this.jobRunner && !this.#isDestroyed) { - this.jobRunner = new WorkLoop('jobRunner', this.pollInterval); - this.jobRunner.run(async (loop) => { - await this.pgClient.listen('jobs', loop.wake.bind(loop), async () => { + if (!this.#jobRunner && !this.#isDestroyed) { + this.#jobRunner = new WorkLoop('jobRunner', this.#pollInterval); + this.#jobRunner.run(async (loop) => { + await this.#pgClient.listen('jobs', loop.wake.bind(loop), async () => { while (!loop.shuttingDown) { await this.processJobs(loop); await loop.sleep(); @@ -244,7 +270,7 @@ export class PgQueueRunner implements QueueRunner { } private async runJob(jobType: string, args: PgPrimitive) { - let handler = this.handlers.get(jobType); + let handler = this.#handlers.get(jobType); if (!handler) { throw new Error(`unknown job handler ${jobType}`); } @@ -252,10 +278,10 @@ export class PgQueueRunner implements QueueRunner { } private async processJobs(workLoop: WorkLoop) { - await this.pgClient.withConnection(async (query) => { + await this.#pgClient.withConnection(async (query) => { try { while (!workLoop.shuttingDown) { - log.debug(`%s: processing jobs`, this.workerId); + log.debug(`%s: processing jobs`, this.#workerId); await query(['BEGIN']); await query(['SET TRANSACTION ISOLATION LEVEL SERIALIZABLE']); @@ -264,8 +290,9 @@ export class PgQueueRunner implements QueueRunner { // find the queue with the oldest job that isn't running and lock it. `WITH pending_jobs AS ( - SELECT * FROM jobs WHERE status='unfulfilled' - ), + SELECT * FROM jobs WHERE status='unfulfilled' and priority >=`, + param(this.#priority), + `), valid_reservations AS ( SELECT * FROM job_reservations WHERE locked_until > NOW() AND completed_at IS NULL ), @@ -283,14 +310,14 @@ export class PgQueueRunner implements QueueRunner { LIMIT 1`, ])) as unknown as JobsTable[]; if (jobs.length === 0) { - log.debug(`%s: found no work`, this.workerId); + log.debug(`%s: found no work`, this.#workerId); await query(['ROLLBACK']); return; } let jobToRun = jobs[0]; log.debug( `%s: found job to run, job id: %s`, - this.workerId, + this.#workerId, jobToRun.id, ); @@ -300,10 +327,10 @@ export class PgQueueRunner implements QueueRunner { [param(jobToRun.id)], [ '(', - param(Math.min(jobToRun.timeout, this.maxTimeoutSec)), + param(Math.min(jobToRun.timeout, this.#maxTimeoutSec)), ` || ' seconds')::interval + now()`, ], - [param(this.workerId)], + [param(this.#workerId)], ]), ') RETURNING id', ] as Expression)) as Pick[]; @@ -312,14 +339,14 @@ export class PgQueueRunner implements QueueRunner { log.debug( `%s: claimed job %s, reservation %s`, - this.workerId, + this.#workerId, jobToRun.id, jobReservationId, ); let newStatus: string; let result: PgPrimitive; try { - log.debug(`%s: running %s`, this.workerId, jobToRun.id); + log.debug(`%s: running %s`, this.#workerId, jobToRun.id); result = await Promise.race([ this.runJob(jobToRun.job_type, jobToRun.args), // we race the job so that it doesn't hold this worker hostage if @@ -327,12 +354,14 @@ export class PgQueueRunner implements QueueRunner { new Promise<'timeout'>((r) => setTimeout(() => { r('timeout'); - }, this.maxTimeoutSec * 1000), + }, this.#maxTimeoutSec * 1000), ), ]); if (result === 'timeout') { throw new Error( - `Timed-out after ${this.maxTimeoutSec}s waiting for job ${jobToRun.id} to complete`, + `Timed-out after ${this.#maxTimeoutSec}s waiting for job ${ + jobToRun.id + } to complete`, ); } newStatus = 'resolved'; @@ -349,7 +378,7 @@ export class PgQueueRunner implements QueueRunner { } log.debug( `%s: finished %s as %s`, - this.workerId, + this.#workerId, jobToRun.id, newStatus, ); @@ -362,7 +391,7 @@ export class PgQueueRunner implements QueueRunner { if (jobStatus !== 'unfulfilled') { log.debug( '%s: rolling back because our job is already marked done', - this.workerId, + this.#workerId, ); await query(['ROLLBACK']); return; @@ -374,7 +403,7 @@ export class PgQueueRunner implements QueueRunner { if (jobReservation.completed_at) { log.debug( '%s: rolling back because someone else processed our job', - this.workerId, + this.#workerId, ); await query(['ROLLBACK']); return; @@ -390,7 +419,7 @@ export class PgQueueRunner implements QueueRunner { if (total > 0) { log.debug( '%s: rolling back because someone else has reserved our (timed-out) job', - this.workerId, + this.#workerId, ); await query(['ROLLBACK']); return; @@ -415,14 +444,14 @@ export class PgQueueRunner implements QueueRunner { await query(['COMMIT']); log.debug( `%s: committed job completion, notified jobs_finished`, - this.workerId, + this.#workerId, ); } } catch (e: any) { if (e.code === '40001') { log.debug( `%s: detected concurrency conflict, rolling back`, - this.workerId, + this.#workerId, ); await query(['ROLLBACK']); return; @@ -434,8 +463,8 @@ export class PgQueueRunner implements QueueRunner { async destroy() { this.#isDestroyed = true; - if (this.jobRunner) { - await this.jobRunner.shutDown(); + if (this.#jobRunner) { + await this.#jobRunner.shutDown(); } } } diff --git a/packages/realm-server/scripts/start-worker-base.sh b/packages/realm-server/scripts/start-worker-base.sh index 9325706168..5d3d735c2d 100755 --- a/packages/realm-server/scripts/start-worker-base.sh +++ b/packages/realm-server/scripts/start-worker-base.sh @@ -11,7 +11,6 @@ NODE_ENV=development \ REALM_SECRET_SEED="shhh! it's a secret" \ ts-node \ --transpileOnly worker-manager \ - --count="${WORKER_COUNT:-1}" \ --port=4213 \ --matrixURL='http://localhost:8008' \ --distURL="${HOST_URL:-http://localhost:4200}" \ diff --git a/packages/realm-server/scripts/start-worker-development.sh b/packages/realm-server/scripts/start-worker-development.sh index eacbbec262..e5dc27e2a5 100755 --- a/packages/realm-server/scripts/start-worker-development.sh +++ b/packages/realm-server/scripts/start-worker-development.sh @@ -12,7 +12,8 @@ NODE_ENV=development \ REALM_SECRET_SEED="shhh! it's a secret" \ ts-node \ --transpileOnly worker-manager \ - --count="${WORKER_COUNT:-1}" \ + --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ + --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --port=4210 \ --matrixURL='http://localhost:8008' \ --distURL="${HOST_URL:-http://localhost:4200}" \ diff --git a/packages/realm-server/scripts/start-worker-production.sh b/packages/realm-server/scripts/start-worker-production.sh index 6f5d355c7a..9fb91fae80 100755 --- a/packages/realm-server/scripts/start-worker-production.sh +++ b/packages/realm-server/scripts/start-worker-production.sh @@ -3,7 +3,8 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ - --count="${WORKER_COUNT:-1}" \ + --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ + --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix.boxel.ai' \ --distURL='https://boxel-host.boxel.ai' \ \ diff --git a/packages/realm-server/scripts/start-worker-staging.sh b/packages/realm-server/scripts/start-worker-staging.sh index 4e3fd0aec8..6e1380fe97 100755 --- a/packages/realm-server/scripts/start-worker-staging.sh +++ b/packages/realm-server/scripts/start-worker-staging.sh @@ -3,7 +3,8 @@ NODE_NO_WARNINGS=1 \ ts-node \ --transpileOnly worker-manager \ - --count="${WORKER_COUNT:-1}" \ + --allPriorityCount="${WORKER_ALL_PRIORITY_COUNT:-1}" \ + --highPriorityCount="${WORKER_HIGH_PRIORITY_COUNT:-0}" \ --matrixURL='https://matrix-staging.stack.cards' \ --distURL='https://boxel-host-staging.stack.cards' \ \ diff --git a/packages/realm-server/server.ts b/packages/realm-server/server.ts index 764ccbd277..fcf495910f 100644 --- a/packages/realm-server/server.ts +++ b/packages/realm-server/server.ts @@ -360,7 +360,7 @@ export class RealmServer { username, }, }, - { invalidateEntireRealm: true }, + { invalidateEntireRealm: true, userInitiatedRealmCreation: true }, ); this.realms.push(realm); this.virtualNetwork.mount(realm.handle); diff --git a/packages/realm-server/tests/helpers/index.ts b/packages/realm-server/tests/helpers/index.ts index ee57575503..8233e1a213 100644 --- a/packages/realm-server/tests/helpers/index.ts +++ b/packages/realm-server/tests/helpers/index.ts @@ -128,7 +128,7 @@ export function setupDB( prepareTestDB(); dbAdapter = new PgAdapter({ autoMigrate: true }); publisher = new PgQueuePublisher(dbAdapter); - runner = new PgQueueRunner(dbAdapter, 'test-worker'); + runner = new PgQueueRunner({ adapter: dbAdapter, workerId: 'test-worker' }); }; const runAfterHook = async () => { diff --git a/packages/realm-server/tests/queue-test.ts b/packages/realm-server/tests/queue-test.ts index d55a27e436..255ae29e2f 100644 --- a/packages/realm-server/tests/queue-test.ts +++ b/packages/realm-server/tests/queue-test.ts @@ -25,13 +25,14 @@ module(basename(__filename), function () { prepareTestDB(); adapter = new PgAdapter({ autoMigrate: true }); publisher = new PgQueuePublisher(adapter); - runner = new PgQueueRunner(adapter, 'q1', 2); + runner = new PgQueueRunner({ adapter, workerId: 'q1', maxTimeoutSec: 2 }); await runner.start(); }); hooks.afterEach(async function () { await runner.destroy(); await publisher.destroy(); + await adapter.close(); }); test('it can run a job', async function (assert) { @@ -61,7 +62,12 @@ module(basename(__filename), function () { runner.register('logJob', logJob); - let job = await publisher.publish('logJob', 'log-group', 1, null); + let job = await publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'log-group', + timeout: 1, + args: null, + }); try { await job.done; @@ -81,7 +87,7 @@ module(basename(__filename), function () { let adapter2: PgAdapter; nestedHooks.beforeEach(async function () { adapter2 = new PgAdapter({ autoMigrate: true }); - runner2 = new PgQueueRunner(adapter2, 'q2'); + runner2 = new PgQueueRunner({ adapter: adapter2, workerId: 'q2' }); await runner2.start(); // Because we need tight timing control for this test, we don't want any @@ -93,6 +99,7 @@ module(basename(__filename), function () { nestedHooks.afterEach(async function () { await runner2.destroy(); + await adapter2.close(); }); test('jobs in different concurrency groups can run in parallel', async function (assert) { @@ -107,15 +114,20 @@ module(basename(__filename), function () { runner.register('logJob', logJob); runner2.register('logJob', logJob); - let promiseForJob1 = publisher.publish('logJob', 'log-group', 5000, 1); + let promiseForJob1 = publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'log-group', + timeout: 5000, + args: 1, + }); // start the 2nd job before the first job finishes await new Promise((r) => setTimeout(r, 100)); - let promiseForJob2 = publisher.publish( - 'logJob', - 'other-group', - 5000, - 2, - ); + let promiseForJob2 = publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'other-group', + timeout: 5000, + args: 2, + }); let [job1, job2] = await Promise.all([promiseForJob1, promiseForJob2]); await Promise.all([job1.done, job2.done]); @@ -139,10 +151,20 @@ module(basename(__filename), function () { runner.register('logJob', logJob); runner2.register('logJob', logJob); - let promiseForJob1 = publisher.publish('logJob', 'log-group', 5000, 1); + let promiseForJob1 = publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'log-group', + timeout: 5000, + args: 1, + }); // start the 2nd job before the first job finishes await new Promise((r) => setTimeout(r, 100)); - let promiseForJob2 = publisher.publish('logJob', 'log-group', 5000, 2); + let promiseForJob2 = publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'log-group', + timeout: 5000, + args: 2, + }); let [job1, job2] = await Promise.all([promiseForJob1, promiseForJob2]); await Promise.all([job1.done, job2.done]); @@ -170,7 +192,12 @@ module(basename(__filename), function () { runner.register('logJob', logJob); runner2.register('logJob', logJob); - let job = await publisher.publish('logJob', 'log-group', 1, null); + let job = await publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'log-group', + timeout: 1, + args: null, + }); // just after our job has timed out, kick the queue so that another worker // will notice it. Otherwise we'd be stuck until the polling comes around. @@ -188,4 +215,73 @@ module(basename(__filename), function () { }); }); }); + + module('queue - high priority worker', function (hooks) { + let publisher: QueuePublisher; + let runner: QueueRunner; + let adapter: PgAdapter; + + hooks.beforeEach(async function () { + prepareTestDB(); + adapter = new PgAdapter({ autoMigrate: true }); + publisher = new PgQueuePublisher(adapter); + runner = new PgQueueRunner({ + adapter, + workerId: 'q1', + maxTimeoutSec: 1, + priority: 10, + }); + await runner.start(); + }); + + hooks.afterEach(async function () { + await runner.destroy(); + await publisher.destroy(); + await adapter.close(); + }); + + test('worker can be set to only process jobs greater or equal to a particular priority', async function (assert) { + let events: string[] = []; + let logJob = async ({ name }: { name: string }) => { + events.push(name); + }; + runner.register('logJob', logJob); + + let lowPriorityJob = await publisher.publish({ + jobType: 'logJob', + concurrencyGroup: null, + timeout: 1, + args: { name: 'low priority' }, + priority: 0, + }); + let highPriorityJob1 = await publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'logGroup', + timeout: 1, + args: { name: 'high priority 1' }, + priority: 10, + }); + let highPriorityJob2 = await publisher.publish({ + jobType: 'logJob', + concurrencyGroup: 'logGroup', + timeout: 1, + args: { name: 'high priority 2' }, + priority: 11, + }); + + await highPriorityJob1.done; + await highPriorityJob2.done; + await Promise.race([ + lowPriorityJob.done, + // the low priority job will never get picked up, so we race it against a timeout + new Promise((r) => setTimeout(r, 2)), + ]); + + assert.deepEqual( + events.sort(), + ['high priority 1', 'high priority 2'], + 'only the high priority jobs were processed', + ); + }); + }); }); diff --git a/packages/realm-server/tsconfig.json b/packages/realm-server/tsconfig.json index bac42b1bfb..675a0ce45a 100644 --- a/packages/realm-server/tsconfig.json +++ b/packages/realm-server/tsconfig.json @@ -37,5 +37,5 @@ } }, "include": ["../base/**/*", "./**/*"], - "exclude": ["../base/**/__boxel/**"] + "exclude": ["../base/**/__boxel/**", "./realms/**"] } diff --git a/packages/realm-server/worker-manager.ts b/packages/realm-server/worker-manager.ts index 8304453599..af2c3a1c7b 100644 --- a/packages/realm-server/worker-manager.ts +++ b/packages/realm-server/worker-manager.ts @@ -1,6 +1,10 @@ import './instrument'; import './setup-logger'; // This should be first -import { logger } from '@cardstack/runtime-common'; +import { + logger, + userInitiatedPriority, + systemInitiatedPriority, +} from '@cardstack/runtime-common'; import yargs from 'yargs'; import * as Sentry from '@sentry/node'; import { createServer } from 'net'; @@ -21,7 +25,8 @@ if (!REALM_SECRET_SEED) { let { port, matrixURL, - count = 1, + allPriorityCount = 1, + highPriorityCount = 0, distURL = process.env.HOST_URL ?? 'http://localhost:4200', fromUrl: fromUrls, toUrl: toUrls, @@ -32,8 +37,14 @@ let { description: 'TCP port for worker to communicate readiness (for tests)', type: 'number', }, - count: { - description: 'The number of workers to start', + highPriorityCount: { + description: + 'The number of workers that service high priority jobs (user initiated) to start (default 0)', + type: 'number', + }, + allPriorityCount: { + description: + 'The number of workers that service all jobs regardless of priority to start (default 1)', type: 'number', }, fromUrl: { @@ -123,14 +134,25 @@ if (port != null) { } (async () => { - log.info(`starting ${count} ${pluralize('worker', count)}`); + log.info( + `starting ${highPriorityCount} high-priority ${pluralize( + 'worker', + highPriorityCount, + )} and ${allPriorityCount} all-priority ${pluralize( + 'worker', + allPriorityCount, + )}`, + ); let urlMappings = fromUrls.map((fromUrl, i) => [ new URL(String(fromUrl)), new URL(String(toUrls[i])), ]); - for (let i = 0; i < count; i++) { - await startWorker(urlMappings); + for (let i = 0; i < highPriorityCount; i++) { + await startWorker(userInitiatedPriority, urlMappings); + } + for (let i = 0; i < allPriorityCount; i++) { + await startWorker(systemInitiatedPriority, urlMappings); } isReady = true; log.info('All workers have been started'); @@ -143,7 +165,7 @@ if (port != null) { process.exit(1); }); -async function startWorker(urlMappings: URL[][]) { +async function startWorker(priority: number, urlMappings: URL[][]) { let worker = spawn( 'ts-node', [ @@ -151,6 +173,7 @@ async function startWorker(urlMappings: URL[][]) { 'worker', `--matrixURL='${matrixURL}'`, `--distURL='${distURL}'`, + `--priority=${priority}`, ...flattenDeep( urlMappings.map(([from, to]) => [ `--fromUrl='${from.href}'`, @@ -166,18 +189,22 @@ async function startWorker(urlMappings: URL[][]) { worker.on('exit', () => { if (!isExiting) { log.info(`worker ${worker.pid} exited. spawning replacement worker`); - startWorker(urlMappings); + startWorker(priority, urlMappings); } }); if (worker.stdout) { worker.stdout.on('data', (data: Buffer) => - log.info(`[worker ${worker.pid}]: ${data.toString()}`), + log.info( + `[worker ${worker.pid} priority ${priority}]: ${data.toString()}`, + ), ); } if (worker.stderr) { worker.stderr.on('data', (data: Buffer) => - log.error(`[worker ${worker.pid}]: ${data.toString()}`), + log.error( + `[worker ${worker.pid} priority ${priority}]: ${data.toString()}`, + ), ); } @@ -185,7 +212,7 @@ async function startWorker(urlMappings: URL[][]) { new Promise((r) => { worker.on('message', (message) => { if (message === 'ready') { - log.info(`[worker ${worker.pid}]: worker ready`); + log.info(`[worker ${worker.pid} priority ${priority}]: worker ready`); r(); } }); diff --git a/packages/realm-server/worker.ts b/packages/realm-server/worker.ts index e4eb00f080..5ed8a51b2b 100644 --- a/packages/realm-server/worker.ts +++ b/packages/realm-server/worker.ts @@ -37,6 +37,7 @@ let { distURL = process.env.HOST_URL ?? 'http://localhost:4200', fromUrl: fromUrls, toUrl: toUrls, + priority = 0, migrateDB, } = yargs(process.argv.slice(2)) .usage('Start worker') @@ -66,10 +67,15 @@ let { demandOption: true, type: 'string', }, + priority: { + description: + 'The minimum priority of jobs that the worker should process (defaults to 0)', + type: 'number', + }, }) .parseSync(); -log.info(`starting worker with pid ${process.pid}`); +log.info(`starting worker with pid ${process.pid} and priority ${priority}`); if (fromUrls.length !== toUrls.length) { log.error( @@ -94,7 +100,7 @@ let autoMigrate = migrateDB || undefined; (async () => { let dbAdapter = new PgAdapter({ autoMigrate }); - let queue = new PgQueueRunner(dbAdapter, workerId); + let queue = new PgQueueRunner({ adapter: dbAdapter, workerId, priority }); let manager = new RunnerOptionsManager(); let { getRunner } = await makeFastBootIndexRunner( dist, diff --git a/packages/runtime-common/queue.ts b/packages/runtime-common/queue.ts index 8566cdf3ed..23affd13a6 100644 --- a/packages/runtime-common/queue.ts +++ b/packages/runtime-common/queue.ts @@ -1,6 +1,9 @@ import { type PgPrimitive } from './index'; import { Deferred } from './deferred'; +export const systemInitiatedPriority = 0; +export const userInitiatedPriority = 10; + export interface QueueRunner { start: () => Promise; register: (category: string, handler: (arg: A) => Promise) => void; @@ -8,12 +11,13 @@ export interface QueueRunner { } export interface QueuePublisher { - publish: ( - jobType: string, - concurrencyGroup: string | null, - timeout: number, - args: PgPrimitive, - ) => Promise>; + publish: (args: { + jobType: string; + priority?: number; + concurrencyGroup: string | null; + timeout: number; + args: PgPrimitive; + }) => Promise>; destroy: () => Promise; } diff --git a/packages/runtime-common/realm-index-updater.ts b/packages/runtime-common/realm-index-updater.ts index c37f40dbf4..eb2a8dcbed 100644 --- a/packages/runtime-common/realm-index-updater.ts +++ b/packages/runtime-common/realm-index-updater.ts @@ -4,6 +4,8 @@ import { Deferred, logger, fetchUserPermissions, + systemInitiatedPriority, + userInitiatedPriority, type Stats, type DBAdapter, type QueuePublisher, @@ -18,6 +20,11 @@ import { Loader } from './loader'; import ignore, { type Ignore } from 'ignore'; import { getMatrixUsername } from './matrix-client'; +interface FullIndexOpts { + invalidateEntireRealm?: boolean; + userInitiatedRequest?: boolean; +} + export class RealmIndexUpdater { #realm: Realm; #loader: Loader; @@ -81,8 +88,8 @@ export class RealmIndexUpdater { return await this.#indexWriter.isNewIndex(this.realmURL); } - async run(invalidateEntireRealm?: boolean) { - await this.fullIndex(invalidateEntireRealm); + async run(opts?: FullIndexOpts) { + await this.fullIndex(opts); } indexing() { @@ -92,20 +99,23 @@ export class RealmIndexUpdater { // TODO consider triggering SSE events for invalidations now that we can // calculate fine grained invalidations for from-scratch indexing by passing // in an onInvalidation callback - async fullIndex(invalidateEntireRealm?: boolean) { + async fullIndex(opts?: FullIndexOpts) { this.#indexingDeferred = new Deferred(); try { let args: FromScratchArgs = { realmURL: this.#realm.url, realmUsername: await this.getRealmUsername(), - invalidateEntireRealm: Boolean(invalidateEntireRealm), + invalidateEntireRealm: Boolean(opts?.invalidateEntireRealm), }; - let job = await this.#queue.publish( - `from-scratch-index`, - `indexing:${this.#realm.url}`, - 4 * 60, + let job = await this.#queue.publish({ + jobType: `from-scratch-index`, + concurrencyGroup: `indexing:${this.#realm.url}`, + timeout: 4 * 60, + priority: opts?.userInitiatedRequest + ? userInitiatedPriority + : systemInitiatedPriority, args, - ); + }); let { ignoreData, stats } = await job.done; this.#stats = stats; this.#ignoreData = ignoreData; @@ -138,12 +148,13 @@ export class RealmIndexUpdater { operation: opts?.delete ? 'delete' : 'update', ignoreData: { ...this.#ignoreData }, }; - let job = await this.#queue.publish( - `incremental-index`, - `indexing:${this.#realm.url}`, - 4 * 60, + let job = await this.#queue.publish({ + jobType: `incremental-index`, + concurrencyGroup: `indexing:${this.#realm.url}`, + timeout: 4 * 60, + priority: userInitiatedPriority, args, - ); + }); let { invalidations, ignoreData, stats } = await job.done; this.#stats = stats; this.#ignoreData = ignoreData; diff --git a/packages/runtime-common/realm.ts b/packages/runtime-common/realm.ts index eaf8fcf748..975708c813 100644 --- a/packages/runtime-common/realm.ts +++ b/packages/runtime-common/realm.ts @@ -164,6 +164,7 @@ export interface RealmAdapter { interface Options { disableModuleCaching?: true; invalidateEntireRealm?: true; + userInitiatedRealmCreation?: true; } interface UpdateItem { @@ -254,6 +255,7 @@ export class Realm { #realmSecretSeed: string; #disableModuleCaching = false; #invalidateEntireRealm = false; + #userInitiatedRealmCreation = false; #publicEndpoints: RouteTable = new Map([ [ @@ -308,6 +310,9 @@ export class Realm { }); this.#disableModuleCaching = Boolean(opts?.disableModuleCaching); this.#invalidateEntireRealm = Boolean(opts?.invalidateEntireRealm); + this.#userInitiatedRealmCreation = Boolean( + opts?.userInitiatedRealmCreation, + ); let fetch = fetcher(virtualNetwork.fetch, [ async (req, next) => { @@ -594,7 +599,10 @@ export class Realm { await Promise.resolve(); let startTime = Date.now(); let isNewIndex = await this.#realmIndexUpdater.isNewIndex(); - let promise = this.#realmIndexUpdater.run(this.#invalidateEntireRealm); + let promise = this.#realmIndexUpdater.run({ + invalidateEntireRealm: this.#invalidateEntireRealm, + userInitiatedRequest: this.#userInitiatedRealmCreation, + }); if (isNewIndex) { // we only await the full indexing at boot if this is a brand new index await promise; diff --git a/packages/runtime-common/tests/queue-test.ts b/packages/runtime-common/tests/queue-test.ts index 4b4f4b329d..2abb6fa29f 100644 --- a/packages/runtime-common/tests/queue-test.ts +++ b/packages/runtime-common/tests/queue-test.ts @@ -3,7 +3,12 @@ import { type SharedTests } from '../helpers'; const tests = Object.freeze({ 'it can run a job': async (assert, { publisher, runner }) => { - let job = await publisher.publish('increment', null, 5, 17); + let job = await publisher.publish({ + jobType: 'increment', + concurrencyGroup: null, + timeout: 5, + args: 17, + }); runner.register('increment', async (a: number) => a + 1); let result = await job.done; assert.strictEqual(result, 18); @@ -15,8 +20,18 @@ const tests = Object.freeze({ throw new Error('boom!'); }); let [errorJob, nonErrorJob] = await Promise.all([ - publisher.publish('boom', null, 5, null), - publisher.publish('increment', null, 5, 17), + publisher.publish({ + jobType: 'boom', + concurrencyGroup: null, + timeout: 5, + args: null, + }), + publisher.publish({ + jobType: 'increment', + concurrencyGroup: null, + timeout: 5, + args: 17, + }), ]); // assert that the error that was thrown does not prevent subsequent jobs @@ -73,8 +88,18 @@ const tests = Object.freeze({ }; runner.register('count', count); - let job1 = await publisher.publish('count', 'count-group', 5, 0); - let job2 = await publisher.publish('count', 'count-group', 5, 1); + let job1 = await publisher.publish({ + jobType: 'count', + concurrencyGroup: 'count-group', + timeout: 5, + args: 0, + }); + let job2 = await publisher.publish({ + jobType: 'count', + concurrencyGroup: 'count-group', + timeout: 5, + args: 1, + }); await Promise.all([job2.done, job1.done]); },