Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OPFS support #1490

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add opfs support
dengkunli committed Nov 21, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 3ca5b161720e7742e8cc455b5aa8ed87d73e3082
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
/.emscripten_cache
.DS_Store
compile_commands.json
*.map

/target

14 changes: 14 additions & 0 deletions examples/esbuild-browser/index.ts
Original file line number Diff line number Diff line change
@@ -24,9 +24,23 @@ import * as arrow from 'apache-arrow';
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(DUCKDB_CONFIG.mainModule, DUCKDB_CONFIG.pthreadWorker);

// in-memory
const conn = await db.connect();
await conn.query<{ v: arrow.Int }>(`SELECT count(*)::INTEGER as v FROM generate_series(0, 100) t(v)`);

// opfs
// const opfsRoot = await navigator.storage.getDirectory();
// await opfsRoot.removeEntry('test.db').catch(e => {});
// await db.open({
// path: 'opfs://test.db',
// accessMode: duckdb.DuckDBAccessMode.READ_WRITE,
// });
// const conn = await db.connect();
// await conn.send(`CREATE TABLE integers(i INTEGER, j INTEGER);`);
// await conn.send(`INSERT INTO integers VALUES (3, 4), (5, 6);`);
// await conn.send(`CHECKPOINT;`);
// console.log(await conn.query(`SELECT * FROM integers;`));

await conn.close();
await db.terminate();
await worker.terminate();
2 changes: 2 additions & 0 deletions lib/include/duckdb/web/config.h
Original file line number Diff line number Diff line change
@@ -79,6 +79,8 @@ struct WebDBConfig {
std::optional<int8_t> access_mode = std::nullopt;
/// The thread count
uint32_t maximum_threads = 1;
/// The direct io flag
bool use_direct_io = false;
/// The query config
QueryConfig query = {
.cast_bigint_to_double = std::nullopt,
3 changes: 3 additions & 0 deletions lib/src/config.cc
Original file line number Diff line number Diff line change
@@ -69,6 +69,9 @@ WebDBConfig WebDBConfig::ReadFrom(std::string_view args_json) {
if (doc.HasMember("maximumThreads") && doc["maximumThreads"].IsNumber()) {
config.maximum_threads = doc["maximumThreads"].GetInt();
}
if (doc.HasMember("useDirectIO") && doc["useDirectIO"].IsBool()) {
config.use_direct_io = doc["useDirectIO"].GetBool();
}
if (doc.HasMember("query") && doc["query"].IsObject()) {
auto q = doc["query"].GetObject();
if (q.HasMember("queryPollingInterval") && q["queryPollingInterval"].IsNumber()) {
4 changes: 3 additions & 1 deletion lib/src/io/web_filesystem.cc
Original file line number Diff line number Diff line change
@@ -226,6 +226,8 @@ WebFileSystem::DataProtocol WebFileSystem::inferDataProtocol(std::string_view ur
proto = WebFileSystem::DataProtocol::HTTP;
} else if (hasPrefix(url, "s3://")) {
proto = WebFileSystem::DataProtocol::S3;
} else if (hasPrefix(url, "opfs://")) {
proto = WebFileSystem::DataProtocol::BROWSER_FSACCESS;
} else if (hasPrefix(url, "file://")) {
data_url = std::string_view{url}.substr(7);
proto = default_data_protocol_;
@@ -778,7 +780,7 @@ void WebFileSystem::Write(duckdb::FileHandle &handle, void *buffer, int64_t nr_b
auto file_size = file_hdl.file_->file_size_;
auto writer = static_cast<char *>(buffer);
file_hdl.position_ = location;
while (nr_bytes > 0 && location < file_size) {
while (nr_bytes > 0) {
auto n = Write(handle, writer, nr_bytes);
writer += n;
nr_bytes -= n;
1 change: 1 addition & 0 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
@@ -818,6 +818,7 @@ arrow::Status WebDB::Open(std::string_view args_json) {
db_config.options.maximum_threads = config_->maximum_threads;
db_config.options.use_temporary_directory = false;
db_config.options.access_mode = access_mode;
db_config.options.use_direct_io = config_->use_direct_io;
auto db = std::make_shared<duckdb::DuckDB>(config_->path, &db_config);
#ifndef WASM_LOADABLE_EXTENSIONS
duckdb_web_parquet_init(db.get());
26 changes: 24 additions & 2 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
@@ -444,13 +444,32 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
}
/** Prepare a file handle that could only be acquired aschronously */
public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) {
const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS);
for (const item of list) {
const { handle, path: filePath, fromCached } = item;
if (!fromCached && handle.getSize()) {
await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true);
}
}
return;
}
throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`);
}
/** Register a file object URL */
public registerFileHandle<HandleType>(
public async registerFileHandle<HandleType>(
name: string,
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void {
): Promise<void> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && handle instanceof FileSystemFileHandle) {
// handle is an async handle, should convert to sync handle
const fileHandle: FileSystemFileHandle = handle as any;
handle = (await fileHandle.createSyncAccessHandle()) as any;
}
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_fs_register_file_url',
@@ -462,6 +481,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
}
dropResponseBuffers(this.mod);
globalThis.DUCKDB_RUNTIME._files = (globalThis.DUCKDB_RUNTIME._files || new Map()).set(name, handle);
if (globalThis.DUCKDB_RUNTIME._preparedHandles?.[name]) {
delete globalThis.DUCKDB_RUNTIME._preparedHandles[name];
}
if (this.pthread) {
for (const worker of this.pthread.runningWorkers) {
worker.postMessage({
3 changes: 2 additions & 1 deletion packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
@@ -41,7 +41,8 @@ export interface DuckDBBindings {
handle: HandleType,
protocol: DuckDBDataProtocol,
directIO: boolean,
): void;
): Promise<void>;
prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise<void>;
globFiles(path: string): WebFile[];
dropFile(name: string): void;
dropFiles(): void;
4 changes: 4 additions & 0 deletions packages/duckdb-wasm/src/bindings/config.ts
Original file line number Diff line number Diff line change
@@ -49,6 +49,10 @@ export interface DuckDBConfig {
* Note that this will only work with cross-origin isolated sites since it requires SharedArrayBuffers.
*/
maximumThreads?: number;
/**
* The direct io flag
*/
useDirectIO?: boolean;
/**
* The query config
*/
8 changes: 8 additions & 0 deletions packages/duckdb-wasm/src/bindings/runtime.ts
Original file line number Diff line number Diff line change
@@ -87,6 +87,12 @@ export interface DuckDBGlobalFileInfo {
s3Config?: S3Config;
}

export interface PreparedDBFileHandle {
path: string;
handle: any;
fromCached: boolean;
}

/** Call a function with packed response buffer */
export function callSRet(
mod: DuckDBModule,
@@ -147,6 +153,8 @@ export interface DuckDBRuntime {
checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean;
removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void;

prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise<PreparedDBFileHandle[]>;

// Call a scalar UDF function
callScalarUDF(
mod: DuckDBModule,
109 changes: 100 additions & 9 deletions packages/duckdb-wasm/src/bindings/runtime_browser.ts
Original file line number Diff line number Diff line change
@@ -11,13 +11,19 @@ import {
failWith,
FileFlags,
readString,
PreparedDBFileHandle,
} from './runtime';
import { DuckDBModule } from './duckdb_module';
import * as udf from './udf_runtime';

const OPFS_PREFIX_LEN = 'opfs://'.length;
const PATH_SEP_REGEX = /\/|\\/;

export const BROWSER_RUNTIME: DuckDBRuntime & {
_files: Map<string, any>;
_fileInfoCache: Map<number, DuckDBFileInfo>;
_globalFileInfo: DuckDBGlobalFileInfo | null;
_preparedHandles: Record<string, any>;

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null;
getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null;
@@ -26,6 +32,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
_fileInfoCache: new Map<number, DuckDBFileInfo>(),
_udfFunctions: new Map(),
_globalFileInfo: null,
_preparedHandles: {} as any,

getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null {
try {
@@ -50,6 +57,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
const file = { ...info, blob: null } as DuckDBFileInfo;
BROWSER_RUNTIME._fileInfoCache.set(fileId, file);
if (!BROWSER_RUNTIME._files.has(file.fileName) && BROWSER_RUNTIME._preparedHandles[file.fileName]) {
BROWSER_RUNTIME._files.set(file.fileName, BROWSER_RUNTIME._preparedHandles[file.fileName]);
delete BROWSER_RUNTIME._preparedHandles[file.fileName];
}
return file;
} catch (e: any) {
console.log(e);
@@ -86,6 +97,59 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
},

/** Prepare a file handle that could only be acquired aschronously */
async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise<PreparedDBFileHandle[]> {
if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) {
const filePaths = [dbPath, `${dbPath}.wal`];
const prepare = async (path: string): Promise<PreparedDBFileHandle> => {
if (BROWSER_RUNTIME._files.has(path)) {
return {
path,
handle: BROWSER_RUNTIME._files.get(path),
fromCached: true,
};
}
const opfsRoot = await navigator.storage.getDirectory();
let dirHandle: FileSystemDirectoryHandle = opfsRoot;
// check if mkdir -p is needed
const opfsPath = path.slice(OPFS_PREFIX_LEN);
let fileName = opfsPath;
if (PATH_SEP_REGEX.test(opfsPath)) {
const folders = opfsPath.split(PATH_SEP_REGEX);
fileName = folders.pop()!;
if (!fileName) {
throw new Error(`Invalid path ${path}`);
}
// mkdir -p
for (const folder of folders) {
dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true });
}
}
const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => {
if (e?.name === 'NotFoundError') {
console.log(`File ${path} does not exists yet, creating`);
return dirHandle.getFileHandle(fileName, { create: true });
}
throw e;
});
const handle = await fileHandle.createSyncAccessHandle();
BROWSER_RUNTIME._preparedHandles[path] = handle;
return {
path,
handle,
fromCached: false,
};
};
const result: PreparedDBFileHandle[] = [];
for (const filePath of filePaths) {
const res = await prepare(filePath);
result.push(res);
}
return result;
}
throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`);
},

testPlatformFeature: (_mod: DuckDBModule, feature: number): boolean => {
switch (feature) {
case 1:
@@ -182,7 +246,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {

// Try to fallback to full read?
if (file.allowFullHttpReads) {
if ((contentLength !== null) && (+contentLength > 1)) {
if (contentLength !== null && +contentLength > 1) {
// 2. Send a dummy GET range request querying the first byte of the file
// -> good IFF status is 206 and contentLenght2 is 1
// -> otherwise, iff 200 and contentLenght2 == contentLenght
@@ -264,6 +328,21 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
mod.HEAPF64[(result >> 3) + 1] = buffer;
return result;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) {
handle.truncate(0);
}
const result = mod._malloc(2 * 8);
const fileSize = handle.getSize();
console.log(`[BROWSER_RUNTIME] opening ${file.fileName} with size ${fileSize}`);
mod.HEAPF64[(result >> 3) + 0] = fileSize;
mod.HEAPF64[(result >> 3) + 1] = 0;
return result;
}
}
} catch (e: any) {
// TODO (samansmink): this path causes the WASM code to hang
@@ -311,8 +390,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return;
}
const contentLength = xhr2.getResponseHeader('Content-Length');
if (contentLength && (+contentLength > 1)) {
console.warn(`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`);
if (contentLength && +contentLength > 1) {
console.warn(
`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`,
);
}
}
mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [path]);
@@ -340,6 +421,8 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
}
xhr.send(null);
return xhr.status == 206 || xhr.status == 200;
} else {
return BROWSER_RUNTIME._files.has(path);
}
} catch (e: any) {
console.log(e);
@@ -361,11 +444,13 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
// XXX Remove from registry
return;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
return handle.flush();
handle.flush();
handle.close();
BROWSER_RUNTIME._files.delete(file.fileName);
}
}
},
@@ -429,8 +514,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
} else if (xhr.status == 200) {
// TODO: here we are actually throwing away all non-relevant bytes, but this is still better than failing
// proper solution would require notifying duckdb-wasm cache, while we are piggybackign on browser cache
console.warn(`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`);
const src = new Uint8Array(xhr.response, location, Math.min(xhr.response.byteLength-location, bytes));
console.warn(
`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`,
);
const src = new Uint8Array(
xhr.response,
location,
Math.min(xhr.response.byteLength - location, bytes),
);
mod.HEAPU8.set(src, buf);
return src.byteLength;
} else {
@@ -454,7 +545,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
return data.byteLength;
}
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
@@ -491,7 +582,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & {
failWith(mod, 'cannot write using the html5 file reader api');
return 0;
case DuckDBDataProtocol.BROWSER_FSACCESS: {
const handle = BROWSER_RUNTIME._files?.get(file.fileName);
const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName);
if (!handle) {
throw new Error(`No OPFS access handle registered with name: ${file.fileName}`);
}
Loading