Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallel queries #475

Merged
merged 59 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
a19ff08
Preparing pull request
kzhang31415 May 23, 2024
7668936
Moved vega-lite from peer-dependencies to dependencies of vega-embed
kzhang31415 May 23, 2024
0a43819
Revert "fix: Add null value sensitivity to Arrow data conversion."
kzhang31415 May 23, 2024
07ccc5f
Trying again
kzhang31415 May 24, 2024
adf29f6
Merge branch 'main' of https://github.com/kzhang31415/parallel-queries
kzhang31415 May 24, 2024
44d586a
Issue resolved
kzhang31415 May 24, 2024
eff8c02
pulling
kzhang31415 Jun 7, 2024
f85a50a
Merging
kzhang31415 Jun 11, 2024
2c9c4fc
Merging
kzhang31415 Jun 11, 2024
1b3b402
Merge branch 'main' of https://github.com/kzhang31415/parallel-queries
kzhang31415 Jun 19, 2024
7d64cb6
Asynch querying implemented
kzhang31415 Jun 20, 2024
9afb66d
Cleaning up pull request
kzhang31415 Jun 21, 2024
be32274
Merge branch 'uwdata:main' into main
kzhang31415 Jun 25, 2024
cd92ae7
Removing unnessecary changes
kzhang31415 Jun 25, 2024
bb0b7c3
Merge branch 'main' of https://github.com/kzhang31415/parallel-queries
kzhang31415 Jun 25, 2024
c623415
Trying again
kzhang31415 Jun 25, 2024
0ad62bb
Trying again
kzhang31415 Jun 25, 2024
4d36384
the usual
kzhang31415 Jun 25, 2024
c8f5c14
the usual
kzhang31415 Jun 25, 2024
7f809b2
Merge branch 'uwdata:main' into main
kzhang31415 Jul 11, 2024
d29baa4
Unit tests complete
kzhang31415 Jul 11, 2024
fd76dbb
Removing package-lock.json changes
kzhang31415 Jul 11, 2024
a353331
Attempt 1
kzhang31415 Jul 15, 2024
e30970d
Removing unnessecary changes
kzhang31415 Jul 15, 2024
1ba2d91
Removing unnessecary changes
kzhang31415 Jul 15, 2024
8978347
Fixed possible incorrect indexing in submitBatch by storing index in …
kzhang31415 Jul 15, 2024
b0d6d83
Removing more unnessecary changes
kzhang31415 Jul 15, 2024
7c9a9f2
Removed timeout from coordinator tests
kzhang31415 Jul 17, 2024
f9e9448
Removed bad imports
kzhang31415 Jul 17, 2024
079b5fa
Merge branch 'uwdata:main' into main
kzhang31415 Jul 22, 2024
8af23d8
Merge branch 'uwdata:main' into main
kzhang31415 Jul 27, 2024
6033eae
simplify coordinator test
domoritz Aug 2, 2024
dbbbe65
fix comment
domoritz Aug 2, 2024
bdaaa74
reset
domoritz Aug 2, 2024
20d1e9f
Merge branch 'main' into dom/parallel
domoritz Aug 2, 2024
1d0560d
naive parallel
domoritz Aug 2, 2024
7152b3e
correct order test
domoritz Aug 2, 2024
1d66a4e
return results in correct order
domoritz Aug 4, 2024
da257ce
simplify
domoritz Aug 4, 2024
d11392d
fix fulfill for consolidated queries
domoritz Aug 5, 2024
b4aa891
fix wasm connector support
domoritz Aug 5, 2024
dabe9c5
don't issue same query twice
domoritz Aug 13, 2024
28f3b59
just put promises in the cache
domoritz Aug 13, 2024
2f1527e
Fix types
domoritz Aug 14, 2024
35aace8
fix types
domoritz Aug 14, 2024
24f304e
Address comments
domoritz Aug 22, 2024
1a0ae81
prepare -> ready
domoritz Aug 22, 2024
2b6b06b
Merge branch 'main' into dom/parallel
domoritz Aug 31, 2024
d65a7d8
fix
domoritz Aug 31, 2024
f5f8852
Don't submit queries until pending exec queries are done
domoritz Aug 31, 2024
cbe2162
don't allow exec until any prior exec queries are done
domoritz Aug 31, 2024
490e1df
no parallel exec
domoritz Aug 31, 2024
764aeda
allow arguments to void logger
domoritz Aug 31, 2024
5da9023
fmt
domoritz Aug 31, 2024
43fa201
Merge branch 'main' into dom/parallel
domoritz Sep 25, 2024
f8a0586
update tests
domoritz Sep 25, 2024
96dc637
Address comments
domoritz Sep 29, 2024
b312e30
ci: bump python
domoritz Sep 29, 2024
4a08e33
Merge branch 'main' into dom/parallel
domoritz Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 75 additions & 11 deletions packages/core/src/QueryManager.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,64 @@
import { consolidator } from './QueryConsolidator.js';
import { lruCache, voidCache } from './util/cache.js';
import { PriorityQueue } from './util/priority-queue.js';
import { QueryResult } from './util/query-result.js';
import { QueryResult, QueryState } from './util/query-result.js';
import { voidLogger } from './util/void-logger.js';

export const Priority = { High: 0, Normal: 1, Low: 2 };
export const Priority = Object.freeze({ High: 0, Normal: 1, Low: 2 });

export class QueryManager {
constructor() {
constructor(
maxConcurrentRequests = 32
) {
this.queue = new PriorityQueue(3);
this.db = null;
this.clientCache = null;
this._logger = null;
this._logger = voidLogger();
this._logQueries = false;
this.recorders = [];
this.pending = null;
this._consolidate = null;
/**
* Requests pending with the query manager.
*
* @type {QueryResult[]}
*/
this.pendingResults = [];
this.maxConcurrentRequests = maxConcurrentRequests;
this.pendingExec = false;
}

next() {
if (this.pending || this.queue.isEmpty()) return;
if (this.queue.isEmpty() || this.pendingResults.length > this.maxConcurrentRequests || this.pendingExec) {
return;
}

const { request, result } = this.queue.next();
this.pending = this.submit(request, result);
this.pending.finally(() => { this.pending = null; this.next(); });

this.pendingResults.push(result);
if (request.type === 'exec') this.pendingExec = true;

this.submit(request, result).finally(() => {
// return from the queue all requests that are ready
while (this.pendingResults.length && this.pendingResults[0].state !== QueryState.pending) {
const result = this.pendingResults.shift();
if (result.state === QueryState.ready) {
result.fulfill();
} else if (result.state === QueryState.done) {
this._logger.warn('Found resolved query in pending results.');
}
}
if (request.type === 'exec') this.pendingExec = false;
this.next();
});
}

/**
* Add an entry to the query queue with a priority.
* @param {object} entry The entry to add.
* @param {*} [entry.request] The query request.
* @param {QueryResult} [entry.result] The query result.
* @param {number} priority The query priority, defaults to `Priority.Normal`.
*/
enqueue(entry, priority = Priority.Normal) {
this.queue.insert(entry, priority);
this.next();
Expand All @@ -35,6 +70,11 @@ export class QueryManager {
}
}

/**
* Submit the query to the connector.
* @param {*} request The request.
* @param {QueryResult} result The query result.
*/
async submit(request, result) {
try {
const { query, type, cache = false, record = true, options } = request;
Expand All @@ -49,8 +89,9 @@ export class QueryManager {
if (cache) {
const cached = this.clientCache.get(sql);
if (cached) {
const data = await cached;
this._logger.debug('Cache');
result.fulfill(cached);
result.ready(data);
return;
}
}
Expand All @@ -60,10 +101,16 @@ export class QueryManager {
if (this._logQueries) {
this._logger.debug('Query', { type, sql, ...options });
}
const data = await this.db.query({ type, sql, ...options });

const promise = this.db.query({ type, sql, ...options });
if (cache) this.clientCache.set(sql, promise);

const data = await promise;

if (cache) this.clientCache.set(sql, data);

this._logger.debug(`Request: ${(performance.now() - t0).toFixed(1)}`);
result.fulfill(data);
result.ready(type === 'exec' ? null : data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this here so that we don't err in QueryResult.fulfill.

} catch (err) {
result.reject(err);
}
Expand Down Expand Up @@ -95,6 +142,12 @@ export class QueryManager {
}
}

/**
* Request a query result.
* @param {*} request The request.
* @param {number} priority The query priority, defaults to `Priority.Normal`.
* @returns {QueryResult} A query result promise.
*/
request(request, priority = Priority.Normal) {
const result = new QueryResult();
const entry = { request, result };
Expand All @@ -116,6 +169,12 @@ export class QueryManager {
}
return false;
});

for (const result of this.pendingResults) {
if (set.has(result)) {
result.reject('Canceled');
}
}
}
}

Expand All @@ -124,6 +183,11 @@ export class QueryManager {
result.reject('Cleared');
return true;
});

for (const result of this.pendingResults) {
result.reject('Cleared');
}
this.pendingResults = [];
}

record() {
Expand Down
46 changes: 44 additions & 2 deletions packages/core/src/util/query-result.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export const QueryState = Object.freeze({
pending: Symbol('pending'),
ready: Symbol('ready'),
error: Symbol('error'),
done: Symbol('done')
});

/**
* A query result Promise that can allows external callers
* to resolve or reject the Promise.
Expand All @@ -15,15 +22,41 @@ export class QueryResult extends Promise {
});
this._resolve = resolve;
this._reject = reject;
this._state = QueryState.pending;
this._value = undefined;
}

/**
* Resolve the result Promise with the provided value.
* Resolve the result Promise with a prepared value or the provided value.
* This method will only succeed if either a value is provided or the promise is ready.
* @param {*} value The result value.
* @returns {this}
*/
fulfill(value) {
this._resolve(value);
if (this._value !== undefined) {
if (value !== undefined) {
throw Error('Promise is ready and fulfill has a provided value');
}
this._resolve(this._value);
} else if (value === undefined) {
throw Error('Promise is neither ready nor has provided value');
} else {
this._resolve(value);
}

this._state = QueryState.done;

return this;
}

/**
* Prepare to resolve with the provided value.
* @param {*} value The result value.
* @returns {this}
*/
ready(value) {
this._state = QueryState.ready;
this._value = value;
return this;
}

Expand All @@ -33,9 +66,18 @@ export class QueryResult extends Promise {
* @returns {this}
*/
reject(error) {
this._state = QueryState.error;
this._reject(error);
return this;
}

/**
* Returns the state of this query result.
* @returns {symbol}
*/
get state() {
return this._state;
}
}

// necessary to make Promise subclass act like a Promise
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/util/void-logger.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/* eslint-disable no-unused-vars */
export function voidLogger() {
return {
debug() {},
info() {},
log() {},
warn() {},
error() {}
debug(..._) {},
info(..._) {},
log(..._) {},
warn(..._) {},
error(..._) {}
};
}
72 changes: 72 additions & 0 deletions packages/core/test/coordinator.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { describe, it, expect } from 'vitest';
import { Coordinator, coordinator } from '../src/index.js';
import { QueryResult, QueryState } from '../src/util/query-result.js';

async function wait() {
return new Promise(setTimeout);
}

describe('coordinator', () => {
it('has accessible singleton', () => {
Expand All @@ -11,4 +16,71 @@ describe('coordinator', () => {

expect(coordinator()).toBe(mc2);
});

it('query results returned in correct order', async () => {
const promises = [];

// Mock the connector
const connector = {
async query() {
const promise = new QueryResult();
promises.push(promise);
return promise;
},
};

const coord = new Coordinator(connector);

const r0 = coord.query('SELECT 0');
const r1 = coord.query('SELECT 1');
const r2 = coord.query('SELECT 2');
const r3 = coord.query('SELECT 3');

// queries have not been sent yet
expect(promises).toHaveLength(0);

await wait();

// all queries should have been sent to the connector
expect(promises).toHaveLength(4);
expect(coord.manager.pendingResults).toHaveLength(4);

// resolve promises in reverse order

promises.at(3).fulfill(0);
await wait();

expect(r0.state).toEqual(QueryState.pending);
expect(r1.state).toEqual(QueryState.pending);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(1).fulfill(0);
await wait();

expect(r0.state).toEqual(QueryState.pending);
expect(r1.state).toEqual(QueryState.ready);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(0).fulfill(0);
await wait();

expect(coord.manager.pendingResults).toHaveLength(2);

expect(r0.state).toEqual(QueryState.done);
expect(r1.state).toEqual(QueryState.done);
expect(r2.state).toEqual(QueryState.pending);
expect(r3.state).toEqual(QueryState.ready);

promises.at(2).fulfill(0);
await wait();

expect(coord.manager.pendingResults).toHaveLength(0);

expect(r0.state).toEqual(QueryState.done);
expect(r1.state).toEqual(QueryState.done);
expect(r2.state).toEqual(QueryState.done);
expect(r3.state).toEqual(QueryState.done);
});
});
55 changes: 55 additions & 0 deletions packages/core/test/query-manager.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { describe, it, expect } from 'vitest';
import { QueryManager } from '../src/QueryManager.js';
import { QueryResult } from '../src/util/query-result.js';

describe('QueryManager', () => {
it('should run a simple query', async () => {
const queryManager = new QueryManager();

// Mock the connector
queryManager.connector({
query: async ({ sql }) => {
expect(sql).toBe('SELECT 1');
return [{ column: 1 }];
}
});

const request = {
type: 'arrow',
query: 'SELECT 1'
};

const result = queryManager.request(request);
expect(result).toBeInstanceOf(QueryResult);

const data = await result;
expect(data).toEqual([{ column: 1 }]);
});

it('should not run a query when there is a pending exec', async () => {
const queryManager = new QueryManager();

// Mock the connector
queryManager.connector({
query: async ({ sql }) => {
expect(sql).toBe('CREATE TABLE test (id INT)');

Check failure on line 35 in packages/core/test/query-manager.test.js

View workflow job for this annotation

GitHub Actions / Test in Node

Unhandled error

AssertionError: expected 'SELECT * FROM test' to be 'CREATE TABLE test (id INT)' // Object.is equality Expected: "CREATE TABLE test (id INT)" Received: "SELECT * FROM test" ❯ Object.query test/query-manager.test.js:35:21 ❯ QueryManager.submit src/QueryManager.js:105:31 ❯ QueryManager.next src/QueryManager.js:40:10 ❯ src/QueryManager.js:51:12 ❯ processTicksAndRejections node:internal/process/task_queues:95:5 This error originated in "test/query-manager.test.js" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "test/query-manager.test.js". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.
return undefined;
}
});

const request1 = {
type: 'exec',
query: 'CREATE TABLE test (id INT)'
};

const request2 = {
type: 'arrow',
query: 'SELECT * FROM test'
};

queryManager.request(request1);
queryManager.request(request2);

expect(queryManager.pendingResults).toHaveLength(1);
});
});
2 changes: 1 addition & 1 deletion packages/duckdb-server-rust/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn handle_post(

pub fn app() -> Result<Router> {
// Database and state setup
let db = ConnectionPool::new(":memory:", 16)?;
let db = ConnectionPool::new(":memory:", 1)?; // TODO: we can only use one connection since temp tables are scoped per connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once #519 lands data cube indexes will no longer be temp tables. But other Mosaic exec calls may make temp tables, so I don't think we can expand the connection pool...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will leave this at 1 for now.

What are your thoughts on not using temp tables at all anymore? We could change all the examples to write tables into a separate namespace so they are still easy to clean up.

let cache = lru::LruCache::new(1000.try_into()?);

let state = Arc::new(AppState {
Expand Down