Skip to content

Commit

Permalink
feat: parallel queries (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz authored Oct 3, 2024
1 parent c593d47 commit 425fba3
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 19 deletions.
86 changes: 75 additions & 11 deletions packages/core/src/QueryManager.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,64 @@
import { consolidator } from './QueryConsolidator.js';
import { lruCache, voidCache } from './util/cache.js';
import { PriorityQueue } from './util/priority-queue.js';
import { QueryResult } from './util/query-result.js';
import { QueryResult, QueryState } from './util/query-result.js';
import { voidLogger } from './util/void-logger.js';

export const Priority = { High: 0, Normal: 1, Low: 2 };
export const Priority = Object.freeze({ High: 0, Normal: 1, Low: 2 });

export class QueryManager {
constructor() {
constructor(
maxConcurrentRequests = 32
) {
this.queue = new PriorityQueue(3);
this.db = null;
this.clientCache = null;
this._logger = null;
this._logger = voidLogger();
this._logQueries = false;
this.recorders = [];
this.pending = null;
this._consolidate = null;
/**
* Requests pending with the query manager.
*
* @type {QueryResult[]}
*/
this.pendingResults = [];
this.maxConcurrentRequests = maxConcurrentRequests;
this.pendingExec = false;
}

next() {
if (this.pending || this.queue.isEmpty()) return;
if (this.queue.isEmpty() || this.pendingResults.length > this.maxConcurrentRequests || this.pendingExec) {
return;
}

const { request, result } = this.queue.next();
this.pending = this.submit(request, result);
this.pending.finally(() => { this.pending = null; this.next(); });

this.pendingResults.push(result);
if (request.type === 'exec') this.pendingExec = true;

this.submit(request, result).finally(() => {
// return from the queue all requests that are ready
while (this.pendingResults.length && this.pendingResults[0].state !== QueryState.pending) {
const result = this.pendingResults.shift();
if (result.state === QueryState.ready) {
result.fulfill();
} else if (result.state === QueryState.done) {
this._logger.warn('Found resolved query in pending results.');
}
}
if (request.type === 'exec') this.pendingExec = false;
this.next();
});
}

/**
* Add an entry to the query queue with a priority.
* @param {object} entry The entry to add.
* @param {*} [entry.request] The query request.
* @param {QueryResult} [entry.result] The query result.
* @param {number} priority The query priority, defaults to `Priority.Normal`.
*/
enqueue(entry, priority = Priority.Normal) {
this.queue.insert(entry, priority);
this.next();
Expand All @@ -35,6 +70,11 @@ export class QueryManager {
}
}

/**
* Submit the query to the connector.
* @param {*} request The request.
* @param {QueryResult} result The query result.
*/
async submit(request, result) {
try {
const { query, type, cache = false, record = true, options } = request;
Expand All @@ -49,8 +89,9 @@ export class QueryManager {
if (cache) {
const cached = this.clientCache.get(sql);
if (cached) {
const data = await cached;
this._logger.debug('Cache');
result.fulfill(cached);
result.ready(data);
return;
}
}
Expand All @@ -60,10 +101,16 @@ export class QueryManager {
if (this._logQueries) {
this._logger.debug('Query', { type, sql, ...options });
}
const data = await this.db.query({ type, sql, ...options });

const promise = this.db.query({ type, sql, ...options });
if (cache) this.clientCache.set(sql, promise);

const data = await promise;

if (cache) this.clientCache.set(sql, data);

this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`);
result.fulfill(data);
result.ready(type === 'exec' ? null : data);
} catch (err) {
result.reject(err);
}
Expand Down Expand Up @@ -95,6 +142,12 @@ export class QueryManager {
}
}

/**
* Request a query result.
* @param {*} request The request.
* @param {number} priority The query priority, defaults to `Priority.Normal`.
* @returns {QueryResult} A query result promise.
*/
request(request, priority = Priority.Normal) {
const result = new QueryResult();
const entry = { request, result };
Expand All @@ -116,6 +169,12 @@ export class QueryManager {
}
return false;
});

for (const result of this.pendingResults) {
if (set.has(result)) {
result.reject('Canceled');
}
}
}
}

Expand All @@ -124,6 +183,11 @@ export class QueryManager {
result.reject('Cleared');
return true;
});

for (const result of this.pendingResults) {
result.reject('Cleared');
}
this.pendingResults = [];
}

record() {
Expand Down
46 changes: 44 additions & 2 deletions packages/core/src/util/query-result.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export const QueryState = Object.freeze({
pending: Symbol('pending'),
ready: Symbol('ready'),
error: Symbol('error'),
done: Symbol('done')
});

/**
* A query result Promise that can allows external callers
* to resolve or reject the Promise.
Expand All @@ -15,15 +22,41 @@ export class QueryResult extends Promise {
});
this._resolve = resolve;
this._reject = reject;
this._state = QueryState.pending;
this._value = undefined;
}

/**
* Resolve the result Promise with the provided value.
* Resolve the result Promise with a prepared value or the provided value.
* This method will only succeed if either a value is provided or the promise is ready.
* @param {*} value The result value.
* @returns {this}
*/
fulfill(value) {
this._resolve(value);
if (this._value !== undefined) {
if (value !== undefined) {
throw Error('Promise is ready and fulfill has a provided value');
}
this._resolve(this._value);
} else if (value === undefined) {
throw Error('Promise is neither ready nor has provided value');
} else {
this._resolve(value);
}

this._state = QueryState.done;

return this;
}

/**
* Prepare to resolve with the provided value.
* @param {*} value The result value.
* @returns {this}
*/
ready(value) {
this._state = QueryState.ready;
this._value = value;
return this;
}

Expand All @@ -33,9 +66,18 @@ export class QueryResult extends Promise {
* @returns {this}
*/
reject(error) {
this._state = QueryState.error;
this._reject(error);
return this;
}

/**
* Returns the state of this query result.
* @returns {symbol}
*/
get state() {
return this._state;
}
}

// necessary to make Promise subclass act like a Promise
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/util/void-logger.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/* eslint-disable no-unused-vars */
export function voidLogger() {
return {
debug() {},
info() {},
log() {},
warn() {},
error() {}
debug(..._) {},
info(..._) {},
log(..._) {},
warn(..._) {},
error(..._) {}
};
}
72 changes: 72 additions & 0 deletions packages/core/test/coordinator.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { describe, it, expect } from 'vitest';
import { Coordinator, coordinator } from '../src/index.js';
import { QueryResult, QueryState } from '../src/util/query-result.js';

async function wait() {
return new Promise(setTimeout);
}

describe('coordinator', () => {
it('has accessible singleton', () => {
Expand All @@ -11,4 +16,71 @@ describe('coordinator', () => {

expect(coordinator()).toBe(mc2);
});

it('query results returned in correct order', async () => {
const promises = [];

// Mock the connector
const connector = {
async query() {
const promise = new QueryResult();
promises.push(promise);
return promise;
},
};

const coord = new Coordinator(connector);

const r0 = coord.query('SELECT 0');
const r1 = coord.query('SELECT 1');
const r2 = coord.query('SELECT 2');
const r3 = coord.query('SELECT 3');

// queries have not been sent yet
expect(promises).toHaveLength(0);

await wait();

// all queries should have been sent to the connector
expect(promises).toHaveLength(4);
expect(coord.manager.pendingResults).toHaveLength(4);

// resolve promises in reverse order

promises.at(3).fulfill(0);
await wait();

expect(r0.state).toEqual(QueryState.pending);
expect(r1.state).toEqual(QueryState.pending);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(1).fulfill(0);
await wait();

expect(r0.state).toEqual(QueryState.pending);
expect(r1.state).toEqual(QueryState.ready);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(0).fulfill(0);
await wait();

expect(coord.manager.pendingResults).toHaveLength(2);

expect(r0.state).toEqual(QueryState.done);
expect(r1.state).toEqual(QueryState.done);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(2).fulfill(0);
await wait();

expect(coord.manager.pendingResults).toHaveLength(0);

expect(r0.state).toEqual(QueryState.done);
expect(r1.state).toEqual(QueryState.done);
expect(r2.state).toEqual(QueryState.done);
expect(r3.state).toEqual(QueryState.done);
});
});
55 changes: 55 additions & 0 deletions packages/core/test/query-manager.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { describe, it, expect } from 'vitest';
import { QueryManager } from '../src/QueryManager.js';
import { QueryResult } from '../src/util/query-result.js';

describe('QueryManager', () => {
it('should run a simple query', async () => {
const queryManager = new QueryManager();

// Mock the connector
queryManager.connector({
query: async ({ sql }) => {
expect(sql).toBe('SELECT 1');
return [{ column: 1 }];
}
});

const request = {
type: 'arrow',
query: 'SELECT 1'
};

const result = queryManager.request(request);
expect(result).toBeInstanceOf(QueryResult);

const data = await result;
expect(data).toEqual([{ column: 1 }]);
});

it('should not run a query when there is a pending exec', async () => {
const queryManager = new QueryManager();

// Mock the connector
queryManager.connector({
query: async ({ sql }) => {
expect(sql).toBe('CREATE TABLE test (id INT)');

Check failure on line 35 in packages/core/test/query-manager.test.js

View workflow job for this annotation

GitHub Actions / Test in Node

Unhandled error

AssertionError: expected 'SELECT * FROM test' to be 'CREATE TABLE test (id INT)' // Object.is equality Expected: "CREATE TABLE test (id INT)" Received: "SELECT * FROM test" ❯ Object.query test/query-manager.test.js:35:21 ❯ QueryManager.submit src/QueryManager.js:105:31 ❯ QueryManager.next src/QueryManager.js:40:10 ❯ src/QueryManager.js:51:12 ❯ processTicksAndRejections node:internal/process/task_queues:95:5 This error originated in "test/query-manager.test.js" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "test/query-manager.test.js". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.
return undefined;
}
});

const request1 = {
type: 'exec',
query: 'CREATE TABLE test (id INT)'
};

const request2 = {
type: 'arrow',
query: 'SELECT * FROM test'
};

queryManager.request(request1);
queryManager.request(request2);

expect(queryManager.pendingResults).toHaveLength(1);
});
});
2 changes: 1 addition & 1 deletion packages/duckdb-server-rust/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn handle_post(

pub fn app() -> Result<Router> {
// Database and state setup
let db = ConnectionPool::new(":memory:", 16)?;
let db = ConnectionPool::new(":memory:", 1)?; // TODO: we can only use one connection since temp tables are scoped per connection
let cache = lru::LruCache::new(1000.try_into()?);

let state = Arc::new(AppState {
Expand Down

0 comments on commit 425fba3

Please sign in to comment.