diff --git a/packages/core/src/QueryManager.js b/packages/core/src/QueryManager.js index c91f3550..138ffa48 100644 --- a/packages/core/src/QueryManager.js +++ b/packages/core/src/QueryManager.js @@ -1,29 +1,64 @@ import { consolidator } from './QueryConsolidator.js'; import { lruCache, voidCache } from './util/cache.js'; import { PriorityQueue } from './util/priority-queue.js'; -import { QueryResult } from './util/query-result.js'; +import { QueryResult, QueryState } from './util/query-result.js'; +import { voidLogger } from './util/void-logger.js'; -export const Priority = { High: 0, Normal: 1, Low: 2 }; +export const Priority = Object.freeze({ High: 0, Normal: 1, Low: 2 }); export class QueryManager { - constructor() { + constructor( + maxConcurrentRequests = 32 + ) { this.queue = new PriorityQueue(3); this.db = null; this.clientCache = null; - this._logger = null; + this._logger = voidLogger(); this._logQueries = false; this.recorders = []; - this.pending = null; this._consolidate = null; + /** + * Requests pending with the query manager. + * + * @type {QueryResult[]} + */ + this.pendingResults = []; + this.maxConcurrentRequests = maxConcurrentRequests; + this.pendingExec = false; } next() { - if (this.pending || this.queue.isEmpty()) return; + if (this.queue.isEmpty() || this.pendingResults.length > this.maxConcurrentRequests || this.pendingExec) { + return; + } + const { request, result } = this.queue.next(); - this.pending = this.submit(request, result); - this.pending.finally(() => { this.pending = null; this.next(); }); + + this.pendingResults.push(result); + if (request.type === 'exec') this.pendingExec = true; + + this.submit(request, result).finally(() => { + // return from the queue all requests that are ready + while (this.pendingResults.length && this.pendingResults[0].state !== QueryState.pending) { + const result = this.pendingResults.shift(); + if (result.state === QueryState.ready) { + result.fulfill(); + } else if (result.state === QueryState.done) { + this._logger.warn('Found resolved query in pending results.'); + } + } + if (request.type === 'exec') this.pendingExec = false; + this.next(); + }); } + /** + * Add an entry to the query queue with a priority. + * @param {object} entry The entry to add. + * @param {*} [entry.request] The query request. + * @param {QueryResult} [entry.result] The query result. + * @param {number} priority The query priority, defaults to `Priority.Normal`. + */ enqueue(entry, priority = Priority.Normal) { this.queue.insert(entry, priority); this.next(); @@ -35,6 +70,11 @@ export class QueryManager { } } + /** + * Submit the query to the connector. + * @param {*} request The request. + * @param {QueryResult} result The query result. + */ async submit(request, result) { try { const { query, type, cache = false, record = true, options } = request; @@ -49,8 +89,9 @@ export class QueryManager { if (cache) { const cached = this.clientCache.get(sql); if (cached) { + const data = await cached; this._logger.debug('Cache'); - result.fulfill(cached); + result.ready(data); return; } } @@ -60,10 +101,16 @@ export class QueryManager { if (this._logQueries) { this._logger.debug('Query', { type, sql, ...options }); } - const data = await this.db.query({ type, sql, ...options }); + + const promise = this.db.query({ type, sql, ...options }); + if (cache) this.clientCache.set(sql, promise); + + const data = await promise; + if (cache) this.clientCache.set(sql, data); + this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`); - result.fulfill(data); + result.ready(type === 'exec' ? null : data); } catch (err) { result.reject(err); } @@ -95,6 +142,12 @@ export class QueryManager { } } + /** + * Request a query result. + * @param {*} request The request. + * @param {number} priority The query priority, defaults to `Priority.Normal`. + * @returns {QueryResult} A query result promise. + */ request(request, priority = Priority.Normal) { const result = new QueryResult(); const entry = { request, result }; @@ -116,6 +169,12 @@ export class QueryManager { } return false; }); + + for (const result of this.pendingResults) { + if (set.has(result)) { + result.reject('Canceled'); + } + } } } @@ -124,6 +183,11 @@ export class QueryManager { result.reject('Cleared'); return true; }); + + for (const result of this.pendingResults) { + result.reject('Cleared'); + } + this.pendingResults = []; } record() { diff --git a/packages/core/src/util/query-result.js b/packages/core/src/util/query-result.js index 6cd224a7..15465b3d 100644 --- a/packages/core/src/util/query-result.js +++ b/packages/core/src/util/query-result.js @@ -1,3 +1,10 @@ +export const QueryState = Object.freeze({ + pending: Symbol('pending'), + ready: Symbol('ready'), + error: Symbol('error'), + done: Symbol('done') +}); + /** * A query result Promise that can allows external callers * to resolve or reject the Promise. @@ -15,15 +22,41 @@ export class QueryResult extends Promise { }); this._resolve = resolve; this._reject = reject; + this._state = QueryState.pending; + this._value = undefined; } /** - * Resolve the result Promise with the provided value. + * Resolve the result Promise with a prepared value or the provided value. + * This method will only succeed if either a value is provided or the promise is ready. * @param {*} value The result value. * @returns {this} */ fulfill(value) { - this._resolve(value); + if (this._value !== undefined) { + if (value !== undefined) { + throw Error('Promise is ready and fulfill has a provided value'); + } + this._resolve(this._value); + } else if (value === undefined) { + throw Error('Promise is neither ready nor has provided value'); + } else { + this._resolve(value); + } + + this._state = QueryState.done; + + return this; + } + + /** + * Prepare to resolve with the provided value. + * @param {*} value The result value. + * @returns {this} + */ + ready(value) { + this._state = QueryState.ready; + this._value = value; return this; } @@ -33,9 +66,18 @@ export class QueryResult extends Promise { * @returns {this} */ reject(error) { + this._state = QueryState.error; this._reject(error); return this; } + + /** + * Returns the state of this query result. + * @returns {symbol} + */ + get state() { + return this._state; + } } // necessary to make Promise subclass act like a Promise diff --git a/packages/core/src/util/void-logger.js b/packages/core/src/util/void-logger.js index b650a3be..1097049f 100644 --- a/packages/core/src/util/void-logger.js +++ b/packages/core/src/util/void-logger.js @@ -1,9 +1,10 @@ +/* eslint-disable no-unused-vars */ export function voidLogger() { return { - debug() {}, - info() {}, - log() {}, - warn() {}, - error() {} + debug(..._) {}, + info(..._) {}, + log(..._) {}, + warn(..._) {}, + error(..._) {} }; } diff --git a/packages/core/test/coordinator.test.js b/packages/core/test/coordinator.test.js index e08d02e4..fd05c709 100644 --- a/packages/core/test/coordinator.test.js +++ b/packages/core/test/coordinator.test.js @@ -1,5 +1,10 @@ import { describe, it, expect } from 'vitest'; import { Coordinator, coordinator } from '../src/index.js'; +import { QueryResult, QueryState } from '../src/util/query-result.js'; + +async function wait() { + return new Promise(setTimeout); +} describe('coordinator', () => { it('has accessible singleton', () => { @@ -11,4 +16,71 @@ describe('coordinator', () => { expect(coordinator()).toBe(mc2); }); + + it('query results returned in correct order', async () => { + const promises = []; + + // Mock the connector + const connector = { + async query() { + const promise = new QueryResult(); + promises.push(promise); + return promise; + }, + }; + + const coord = new Coordinator(connector); + + const r0 = coord.query('SELECT 0'); + const r1 = coord.query('SELECT 1'); + const r2 = coord.query('SELECT 2'); + const r3 = coord.query('SELECT 3'); + + // queries have not been sent yet + expect(promises).toHaveLength(0); + + await wait(); + + // all queries should have been sent to the connector + expect(promises).toHaveLength(4); + expect(coord.manager.pendingResults).toHaveLength(4); + + // resolve promises in reverse order + + promises.at(3).fulfill(0); + await wait(); + + expect(r0.state).toEqual(QueryState.pending); + expect(r1.state).toEqual(QueryState.pending); + expect(r2.state).toEqual(QueryState.pending); + expect(r3.state).toEqual(QueryState.ready); + + promises.at(1).fulfill(0); + await wait(); + + expect(r0.state).toEqual(QueryState.pending); + expect(r1.state).toEqual(QueryState.ready); + expect(r2.state).toEqual(QueryState.pending); + expect(r3.state).toEqual(QueryState.ready); + + promises.at(0).fulfill(0); + await wait(); + + expect(coord.manager.pendingResults).toHaveLength(2); + + expect(r0.state).toEqual(QueryState.done); + expect(r1.state).toEqual(QueryState.done); + expect(r2.state).toEqual(QueryState.pending); + expect(r3.state).toEqual(QueryState.ready); + + promises.at(2).fulfill(0); + await wait(); + + expect(coord.manager.pendingResults).toHaveLength(0); + + expect(r0.state).toEqual(QueryState.done); + expect(r1.state).toEqual(QueryState.done); + expect(r2.state).toEqual(QueryState.done); + expect(r3.state).toEqual(QueryState.done); + }); }); diff --git a/packages/core/test/query-manager.test.js b/packages/core/test/query-manager.test.js new file mode 100644 index 00000000..40a142ac --- /dev/null +++ b/packages/core/test/query-manager.test.js @@ -0,0 +1,55 @@ +import { describe, it, expect } from 'vitest'; +import { QueryManager } from '../src/QueryManager.js'; +import { QueryResult } from '../src/util/query-result.js'; + +describe('QueryManager', () => { + it('should run a simple query', async () => { + const queryManager = new QueryManager(); + + // Mock the connector + queryManager.connector({ + query: async ({ sql }) => { + expect(sql).toBe('SELECT 1'); + return [{ column: 1 }]; + } + }); + + const request = { + type: 'arrow', + query: 'SELECT 1' + }; + + const result = queryManager.request(request); + expect(result).toBeInstanceOf(QueryResult); + + const data = await result; + expect(data).toEqual([{ column: 1 }]); + }); + + it('should not run a query when there is a pending exec', async () => { + const queryManager = new QueryManager(); + + // Mock the connector + queryManager.connector({ + query: async ({ sql }) => { + expect(sql).toBe('CREATE TABLE test (id INT)'); + return undefined; + } + }); + + const request1 = { + type: 'exec', + query: 'CREATE TABLE test (id INT)' + }; + + const request2 = { + type: 'arrow', + query: 'SELECT * FROM test' + }; + + queryManager.request(request1); + queryManager.request(request2); + + expect(queryManager.pendingResults).toHaveLength(1); + }); +}); diff --git a/packages/duckdb-server-rust/src/app.rs b/packages/duckdb-server-rust/src/app.rs index 6ed4e3ec..abad25b5 100644 --- a/packages/duckdb-server-rust/src/app.rs +++ b/packages/duckdb-server-rust/src/app.rs @@ -42,7 +42,7 @@ async fn handle_post( pub fn app() -> Result { // Database and state setup - let db = ConnectionPool::new(":memory:", 16)?; + let db = ConnectionPool::new(":memory:", 1)?; // TODO: we can only use one connection since temp tables are scoped per connection let cache = lru::LruCache::new(1000.try_into()?); let state = Arc::new(AppState {