Skip to content

Commit

Permalink
Add job priority support and prioritize user initiated indexing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
habdelra committed Jan 15, 2025
1 parent 37e90ed commit c82b5e4
Show file tree
Hide file tree
Showing 19 changed files with 357 additions and 128 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Live reloads are not available in this mode, however, if you use start the serve

#### Using `start:all`

Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_COUNT` to add additional workers. By default there is 1 worker for each realm server. Here's what is spun up with `start:all`:
Instead of running `pnpm start:base`, you can alternatively use `pnpm start:all` which also serves a few other realms on other ports--this is convenient if you wish to switch between the app and the tests without having to restart servers. Use the environment variable `WORKER_HIGH_PRIORITY_COUNT` to add additional workers that service only user initiated requests and `WORKER_ALL_PRIORITY_COUNT` to add workers that service all jobs (system or user initiated). By default there is 1 all priority worker for each realm server. Here's what is spun up with `start:all`:

| Port | Description | Running `start:all` | Running `start:base` |
| ----- | ------------------------------------------------------------- | ------------------- | -------------------- |
Expand Down Expand Up @@ -123,7 +123,7 @@ When running tests we isolate the database between each test run by actually cre
If you wish to drop the development databases you can execute:

```
pnpm drop-all-dbs
pnpm full-reset
```

You can then run `pnpm migrate up` (with `PGDATABASE` set accordingly if you want to migrate a database other than `boxel`) or just start the realm server (`pnpm start:all`) to create the database again.
Expand Down
25 changes: 15 additions & 10 deletions packages/host/app/lib/browser-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
private jobs: {
jobId: number;
jobType: string;
arg: PgPrimitive;
args: PgPrimitive;
notifier: Deferred<any>;
}[] = [];
private types: Map<string, (arg: any) => Promise<any>> = new Map();
Expand Down Expand Up @@ -50,12 +50,17 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
this.debouncedDrainJobs();
}

async publish<T>(
jobType: string,
_concurrencyGroup: string | null,
_timeout: number,
arg: PgPrimitive,
): Promise<Job<T>> {
async publish<T>({
jobType,
concurrencyGroup: _concurrencyGroup,
timeout: _timeout,
args,
}: {
jobType: string;
concurrencyGroup: string | null;
timeout: number;
args: PgPrimitive;
}): Promise<Job<T>> {
if (this.isDestroyed) {
throw new Error(`Cannot publish job on a destroyed Queue`);
}
Expand All @@ -66,7 +71,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
jobId,
notifier,
jobType,
arg,
args,
});
this.debouncedDrainJobs();
return job;
Expand All @@ -84,7 +89,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
let jobs = [...this.jobs];
this.jobs = [];
for (let workItem of jobs) {
let { jobId, jobType, notifier, arg } = workItem;
let { jobId, jobType, notifier, args } = workItem;
let handler = this.types.get(jobType);
if (!handler) {
// no handler for this job, add it back to the queue
Expand All @@ -96,7 +101,7 @@ export class BrowserQueue implements QueuePublisher, QueueRunner {
this.onBeforeJob(jobId);
}
try {
notifier.fulfill(await handler(arg));
notifier.fulfill(await handler(args));
} catch (e: any) {
notifier.reject(e);
}
Expand Down
16 changes: 16 additions & 0 deletions packages/postgres/migrations/1736950557343_queue-priority.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
exports.up = (pgm) => {
pgm.sql('delete from job_reservations');
pgm.sql('delete from jobs');

pgm.addColumns('jobs', {
priority: { type: 'integer', notNull: true, default: 0 },
});
pgm.createIndex('jobs', 'priority');
};

exports.down = (pgm) => {
pgm.dropIndex('jobs', 'priority');
pgm.dropColumns('jobs', {
priority: 'integer',
});
};
Loading

0 comments on commit c82b5e4

Please sign in to comment.