From c1365cfbe551059e8783999436ce93dc37b60972 Mon Sep 17 00:00:00 2001 From: Eiichi Arikawa <157803904+e1arikawa@users.noreply.github.com> Date: Thu, 16 Jan 2025 00:37:59 +0900 Subject: [PATCH] Add OPFS (Origin Private File System) Support (#1856) * add opfs feature * add test for url with long query string * update s3rver cors settings * update httpfs test. * update httpfs test. * update httpfs test for eslint * Fixup patch, now allowing installing from other repositories via 'INSTALL x FROM community' * fix dropfile * Update packages/duckdb-wasm/test/opfs.test.ts Co-authored-by: asrar * Improve README * Update README.md * Add npm_tags.yml * Perform checkout * Fix registerFileHandle. * update comment * add test for using file in dirrectory --------- Co-authored-by: Carlo Piovesan Co-authored-by: asrar --- examples/esbuild-node/index.ts | 2 +- lib/include/duckdb/web/config.h | 2 + lib/include/duckdb/web/io/web_filesystem.h | 2 + lib/js-stubs.js | 4 + lib/src/arrow_type_mapping.cc | 4 + lib/src/config.cc | 3 + lib/src/io/web_filesystem.cc | 23 +- lib/src/json_typedef.cc | 6 + lib/src/webdb.cc | 22 +- .../src/system/arquero_benchmarks.ts | 104 +++--- packages/duckdb-wasm/karma/s3rver/s3rver.js | 1 + .../duckdb-wasm/src/bindings/bindings_base.ts | 43 ++- .../src/bindings/bindings_interface.ts | 3 +- packages/duckdb-wasm/src/bindings/config.ts | 4 + packages/duckdb-wasm/src/bindings/runtime.ts | 11 + .../src/bindings/runtime_browser.ts | 190 ++++++++--- .../duckdb-wasm/src/bindings/runtime_node.ts | 1 + .../src/parallel/worker_dispatcher.ts | 12 +- packages/duckdb-wasm/test/httpfs_test.ts | 27 ++ packages/duckdb-wasm/test/index_browser.ts | 2 + packages/duckdb-wasm/test/opfs.test.ts | 306 ++++++++++++++++++ .../duckdb-wasm/test/string_test_helper.ts | 20 ++ patches/duckdb/bind_copy_direct_io.patch | 15 + patches/duckdb/fix_load_database.patch | 20 ++ 24 files changed, 721 insertions(+), 106 deletions(-) create mode 100644 packages/duckdb-wasm/test/opfs.test.ts create mode 100644 packages/duckdb-wasm/test/string_test_helper.ts create mode 100644 patches/duckdb/bind_copy_direct_io.patch create mode 100644 patches/duckdb/fix_load_database.patch diff --git a/examples/esbuild-node/index.ts b/examples/esbuild-node/index.ts index 734d22c2b..183de2a7a 100644 --- a/examples/esbuild-node/index.ts +++ b/examples/esbuild-node/index.ts @@ -1,11 +1,11 @@ import * as duckdb from '@duckdb/duckdb-wasm'; import * as arrow from 'apache-arrow'; import path from 'path'; -import Worker from 'web-worker'; import { createRequire } from 'module'; const require = createRequire(import.meta.url); const DUCKDB_DIST = path.dirname(require.resolve('@duckdb/duckdb-wasm')); +const Worker = require('web-worker'); (async () => { try { diff --git a/lib/include/duckdb/web/config.h b/lib/include/duckdb/web/config.h index c112cb390..a8826eda0 100644 --- a/lib/include/duckdb/web/config.h +++ b/lib/include/duckdb/web/config.h @@ -81,6 +81,8 @@ struct WebDBConfig { std::optional access_mode = std::nullopt; /// The thread count uint32_t maximum_threads = (STATIC_WEBDB_FEATURES & (1 << WebDBFeature::THREADS)) ? 4 : 1; + /// The direct io flag + bool use_direct_io = false; /// The query config QueryConfig query = { .cast_bigint_to_double = std::nullopt, diff --git a/lib/include/duckdb/web/io/web_filesystem.h b/lib/include/duckdb/web/io/web_filesystem.h index 71a1ef780..96638f992 100644 --- a/lib/include/duckdb/web/io/web_filesystem.h +++ b/lib/include/duckdb/web/io/web_filesystem.h @@ -209,6 +209,8 @@ class WebFileSystem : public duckdb::FileSystem { DataBuffer file_buffer); /// Try to drop a specific file bool TryDropFile(std::string_view file_name); + /// drop a specific file + void DropFile(std::string_view file_name); /// Drop all files without references (including buffers) void DropDanglingFiles(); /// Configure file statistics diff --git a/lib/js-stubs.js b/lib/js-stubs.js index 2371d1e93..86a8cfbfc 100644 --- a/lib/js-stubs.js +++ b/lib/js-stubs.js @@ -15,6 +15,10 @@ addToLibrary({ duckdb_web_fs_file_sync: function (fileId) { return globalThis.DUCKDB_RUNTIME.syncFile(Module, fileId); }, + duckdb_web_fs_file_drop_file__sig: 'vpi', + duckdb_web_fs_file_drop_file: function (fileName, fileNameLen) { + return globalThis.DUCKDB_RUNTIME.dropFile(Module, fileName, fileNameLen); + }, duckdb_web_fs_file_close__sig: 'vi', duckdb_web_fs_file_close: function (fileId) { return globalThis.DUCKDB_RUNTIME.closeFile(Module, fileId); diff --git a/lib/src/arrow_type_mapping.cc b/lib/src/arrow_type_mapping.cc index 4ecbf43fc..10c995304 100644 --- a/lib/src/arrow_type_mapping.cc +++ b/lib/src/arrow_type_mapping.cc @@ -122,6 +122,10 @@ arrow::Result mapArrowTypeToDuckDB(const arrow::DataType& t case arrow::Type::type::EXTENSION: case arrow::Type::type::SPARSE_UNION: case arrow::Type::type::DENSE_UNION: + case arrow::Type::type::STRING_VIEW: + case arrow::Type::type::BINARY_VIEW: + case arrow::Type::type::LIST_VIEW: + case arrow::Type::type::LARGE_LIST_VIEW: return arrow::Status::NotImplemented("DuckDB type mapping for: ", type.ToString()); } return duckdb::LogicalTypeId::INVALID; diff --git a/lib/src/config.cc b/lib/src/config.cc index ea984f26b..887f28448 100644 --- a/lib/src/config.cc +++ b/lib/src/config.cc @@ -76,6 +76,9 @@ WebDBConfig WebDBConfig::ReadFrom(std::string_view args_json) { if (doc.HasMember("allowUnsignedExtensions") && doc["allowUnsignedExtensions"].IsBool()) { config.allow_unsigned_extensions = doc["allowUnsignedExtensions"].GetBool(); } + if (doc.HasMember("useDirectIO") && doc["useDirectIO"].IsBool()) { + config.use_direct_io = doc["useDirectIO"].GetBool(); + } if (doc.HasMember("query") && doc["query"].IsObject()) { auto q = doc["query"].GetObject(); if (q.HasMember("queryPollingInterval") && q["queryPollingInterval"].IsNumber()) { diff --git a/lib/src/io/web_filesystem.cc b/lib/src/io/web_filesystem.cc index e46f7f14e..aa6cdfa28 100644 --- a/lib/src/io/web_filesystem.cc +++ b/lib/src/io/web_filesystem.cc @@ -117,6 +117,7 @@ RT_FN(void duckdb_web_fs_file_close(size_t file_id), { auto &infos = GetLocalState(); infos.handles.erase(file_id); }); +RT_FN(void duckdb_web_fs_file_drop_file(const char *fileName, size_t pathLen), {}); RT_FN(void duckdb_web_fs_file_truncate(size_t file_id, double new_size), { GetOrOpen(file_id).Truncate(new_size); }); RT_FN(time_t duckdb_web_fs_file_get_last_modified_time(size_t file_id), { auto &file = GetOrOpen(file_id); @@ -226,6 +227,8 @@ WebFileSystem::DataProtocol WebFileSystem::inferDataProtocol(std::string_view ur proto = WebFileSystem::DataProtocol::HTTP; } else if (hasPrefix(url, "s3://")) { proto = WebFileSystem::DataProtocol::S3; + } else if (hasPrefix(url, "opfs://")) { + proto = WebFileSystem::DataProtocol::BROWSER_FSACCESS; } else if (hasPrefix(url, "file://")) { data_url = std::string_view{url}.substr(7); proto = default_data_protocol_; @@ -453,6 +456,7 @@ void WebFileSystem::DropDanglingFiles() { for (auto &[file_id, file] : files_by_id_) { if (file->handle_count_ == 0) { files_by_name_.erase(file->file_name_); + DropFile(file->file_name_); if (file->data_url_.has_value()) { files_by_url_.erase(file->data_url_.value()); } @@ -481,6 +485,13 @@ bool WebFileSystem::TryDropFile(std::string_view file_name) { return false; } +/// drop a file +void WebFileSystem::DropFile(std::string_view file_name) { + DEBUG_TRACE(); + std::string fileNameS = std::string{file_name}; + duckdb_web_fs_file_drop_file(fileNameS.c_str(), fileNameS.size()); +} + /// Write the global filesystem info rapidjson::Value WebFileSystem::WriteGlobalFileInfo(rapidjson::Document &doc, uint32_t cache_epoch) { DEBUG_TRACE(); @@ -793,7 +804,7 @@ void WebFileSystem::Write(duckdb::FileHandle &handle, void *buffer, int64_t nr_b auto file_size = file_hdl.file_->file_size_; auto writer = static_cast(buffer); file_hdl.position_ = location; - while (nr_bytes > 0 && location < file_size) { + while (nr_bytes > 0) { auto n = Write(handle, writer, nr_bytes); writer += n; nr_bytes -= n; @@ -1006,10 +1017,12 @@ void WebFileSystem::FileSync(duckdb::FileHandle &handle) { vector WebFileSystem::Glob(const std::string &path, FileOpener *opener) { std::unique_lock fs_guard{fs_mutex_}; std::vector results; - auto glob = glob_to_regex(path); - for (auto [name, file] : files_by_name_) { - if (std::regex_match(file->file_name_, glob)) { - results.push_back(std::string{name}); + if (!FileSystem::IsRemoteFile(path)) { + auto glob = glob_to_regex(path); + for (auto [name, file] : files_by_name_) { + if (std::regex_match(file->file_name_, glob)) { + results.push_back(std::string{name}); + } } } auto &state = GetLocalState(); diff --git a/lib/src/json_typedef.cc b/lib/src/json_typedef.cc index 890ee8a73..1b98e21ee 100644 --- a/lib/src/json_typedef.cc +++ b/lib/src/json_typedef.cc @@ -396,6 +396,12 @@ arrow::Result WriteSQLType(rapidjson::Document& doc, const duc case duckdb::LogicalTypeId::AGGREGATE_STATE: case duckdb::LogicalTypeId::BIT: case duckdb::LogicalTypeId::LAMBDA: + case duckdb::LogicalTypeId::STRING_LITERAL: + case duckdb::LogicalTypeId::INTEGER_LITERAL: + case duckdb::LogicalTypeId::UHUGEINT: + case duckdb::LogicalTypeId::UNION: + case duckdb::LogicalTypeId::ARRAY: + case duckdb::LogicalTypeId::VARINT: break; } return out; diff --git a/lib/src/webdb.cc b/lib/src/webdb.cc index 447e10ffe..89c769da2 100644 --- a/lib/src/webdb.cc +++ b/lib/src/webdb.cc @@ -828,6 +828,7 @@ arrow::Status WebDB::Open(std::string_view args_json) { db_config.options.access_mode = access_mode; db_config.options.duckdb_api = "wasm"; db_config.options.custom_user_agent = config_->custom_user_agent; + db_config.options.use_direct_io = config_->use_direct_io; auto db = make_shared_ptr(config_->path, &db_config); #ifndef WASM_LOADABLE_EXTENSIONS duckdb_web_parquet_init(db.get()); @@ -912,18 +913,29 @@ arrow::Status WebDB::RegisterFileBuffer(std::string_view file_name, std::unique_ /// Drop all files arrow::Status WebDB::DropFiles() { file_page_buffer_->DropDanglingFiles(); - pinned_web_files_.clear(); + std::vector files_to_drop; + for (const auto& [key, handle] : pinned_web_files_) { + files_to_drop.push_back(handle->GetName()); + } + for (const auto& fileName : files_to_drop) { + arrow::Status status = DropFile(fileName); + if (!status.ok()) { + return arrow::Status::Invalid("Failed to drop file: " + fileName); + } + } if (auto fs = io::WebFileSystem::Get()) { fs->DropDanglingFiles(); } return arrow::Status::OK(); } /// Drop a file -arrow::Status WebDB::DropFile(std::string_view file_name) { - file_page_buffer_->TryDropFile(file_name); - pinned_web_files_.erase(file_name); +arrow::Status WebDB::DropFile(std::string_view fileName) { + file_page_buffer_->TryDropFile(fileName); + pinned_web_files_.erase(fileName); if (auto fs = io::WebFileSystem::Get()) { - if (!fs->TryDropFile(file_name)) { + if (fs->TryDropFile(fileName)) { + fs->DropFile(fileName); + } else { return arrow::Status::Invalid("file is in use"); } } diff --git a/packages/benchmarks/src/system/arquero_benchmarks.ts b/packages/benchmarks/src/system/arquero_benchmarks.ts index b18f32747..5a742a21d 100644 --- a/packages/benchmarks/src/system/arquero_benchmarks.ts +++ b/packages/benchmarks/src/system/arquero_benchmarks.ts @@ -84,12 +84,12 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 2: { const tmp = this.tables['region'] .filter((d: any) => d.op.equal(d.r_name, 'EUROPE')) - .join(this.tables['nation'], ['r_regionkey', 'n_regionkey']) - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']); - const sub = tmp.join(this.tables['partsupp'], ['s_suppkey', 'ps_suppkey']); + .join(this.tables['nation'], [['r_regionkey'], ['n_regionkey']]) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]); + const sub = tmp.join(this.tables['partsupp'], [['s_suppkey'], ['ps_suppkey']]); const sub2 = this.tables['part'] .filter((d: any) => d.p_size == 15 && aq.op.match(d.p_type, /.*BRASS$/g, 0) != null) - .join(sub, ['p_partkey', 'ps_partkey']) + .join(sub, [['p_partkey'], ['ps_partkey']]) .groupby('p_partkey') .rollup({ min_ps_supplycost: (d: any) => aq.op.min(d.ps_supplycost), @@ -99,7 +99,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (a: any, b: any) => a.p_partkey == b.ps_partkey && a.min_ps_supplycost == b.ps_supplycost, ); const query = tmp - .join(sub2, ['s_suppkey', 'ps_suppkey']) + .join(sub2, [['s_suppkey'], ['ps_suppkey']]) .orderby(aq.desc('s_acctbal'), 'n_name', 's_name', 'p_partkey'); for (const v of query.objects()) { noop(v); @@ -111,8 +111,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const o = this.tables['orders'].filter((d: any) => d.o_orderdate < aq.op.timestamp('1995-03-15')); const l = this.tables['lineitem'].filter((d: any) => d.l_shipdate < aq.op.timestamp('1995-03-15')); const query = c - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]) .derive({ disc_price: (d: any) => d.l_extendedprice * (1 - d.l_discount), }) @@ -133,7 +133,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ); const l = this.tables['lineitem'].filter((d: any) => d.l_commitdate < d.l_receiptdate); const query = o - .join(l, ['o_orderkey', 'l_orderkey']) + .join(l, [['o_orderkey'], ['l_orderkey']]) .groupby('o_orderpriority') .rollup({ order_count: aq.op.count(), @@ -156,10 +156,10 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const n = this.tables['nation']; const right = r - .join(n, ['r_regionkey', 'n_regionkey']) - .join(c, ['n_nationkey', 'c_nationkey']) - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']); + .join(n, [['r_regionkey'], ['n_regionkey']]) + .join(c, [['n_nationkey'], ['c_nationkey']]) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]); const query = s .join( right, @@ -232,11 +232,11 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (a.n1_nationkey == 'GERMANY' && b.n2_nationkey == 'FRANCE'), ); const right = nations - .join(c, ['n2_nationkey', 'c_nationkey']) - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']); + .join(c, [['n2_nationkey'], ['c_nationkey']]) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]); const query = s - .join(right, ['s_suppkey', 'l_suppkey']) + .join(right, [['s_suppkey'], ['l_suppkey']]) .groupby('n1_name', 'n2_name', 'l_year') .rollup({ revenue: (d: any) => aq.op.sum(d.volume), @@ -255,9 +255,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.o_orderdate <= aq.op.timestamp('1996-12-31'), ); const sub = p - .join(this.tables['lineitem'], ['p_partkey', 'l_partkey']) - .join(o, ['l_orderkey', 'o_orderkey']) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']); + .join(this.tables['lineitem'], [['p_partkey'], ['l_partkey']]) + .join(o, [['l_orderkey'], ['o_orderkey']]) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]); const r2 = this.tables['region'] .filter((d: any) => d.r_name == 'AMERICA') .rename({ @@ -269,11 +269,11 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { n_name: 'n2_name', }); const sub2 = r2 - .join(n2, ['r2_regionkey', 'n2_regionkey']) - .join(sub, ['n2_nationkey', 'c_nationkey']) - .join(this.tables['supplier'], ['l_suppkey', 's_suppkey']); + .join(n2, [['r2_regionkey'], ['n2_regionkey']]) + .join(sub, [['n2_nationkey'], ['c_nationkey']]) + .join(this.tables['supplier'], [['l_suppkey'], ['s_suppkey']]); const query = this.tables['nation'] - .join(sub2, ['n_nationkey', 's_nationkey']) + .join(sub2, [['n_nationkey'], ['s_nationkey']]) .derive({ o_year: (d: any) => aq.op.year(d.o_orderdate), volume: (d: any) => d.l_extendedprice * (1 - d.l_discount), @@ -290,16 +290,16 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { break; } case 9: { - const sub = this.tables['nation'].join(this.tables['supplier'], ['n_nationkey', 's_nationkey']); + const sub = this.tables['nation'].join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]); const query = this.tables['part'] .filter((d: any) => aq.op.match(d.p_name, /.*green.*/g, 0) != null) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) - .join(sub, ['ps_suppkey', 's_suppkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) + .join(sub, [['ps_suppkey'], ['s_suppkey']]) .join( this.tables['lineitem'], (a: any, b: any) => a.p_partkey == b.l_partkey && a.s_suppkey == b.l_suppkey, ) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) .derive({ o_year: (d: any) => aq.op.year(d.o_orderdate), amount: (d: any) => d.l_extendedprice * (1 - d.l_discount) - d.ps_supplycost * d.l_quantity, @@ -323,10 +323,10 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ) .join( this.tables['lineitem'].filter((d: any) => d.l_returnflag == 'R'), - ['o_orderkey', 'l_orderkey'], + [['o_orderkey'], ['l_orderkey']], ) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']) - .join(this.tables['nation'], ['c_nationkey', 'n_nationkey']) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]) + .join(this.tables['nation'], [['c_nationkey'], ['n_nationkey']]) .derive({ realprice: (d: any) => d.l_extendedprice * (1 - d.l_discount), }) @@ -343,8 +343,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 11: { const temp = this.tables['nation'] .filter((d: any) => d.n_name == 'GERMANY') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .join(this.tables['partsupp'], ['s_suppkey', 'ps_suppkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .join(this.tables['partsupp'], [['s_suppkey'], ['ps_suppkey']]) .derive({ value: (d: any) => d.ps_supplycost * d.ps_availqty, }); @@ -373,7 +373,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.l_receiptdate >= aq.op.timestamp('1994-01-01') && d.l_receiptdate <= aq.op.timestamp('1994-12-31'), ) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) .derive({ high_line: (d: any) => d.o_orderpriority == '1-URGENT' || d.o_orderpriority == '2-HIGH' ? 1 : 0, @@ -396,7 +396,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (d: any) => aq.op.match(d.o_comment, /^.*special.*requests.*$/g, 0) == null, ); const query = this.tables['customer'] - .join_left(o, ['c_custkey', 'o_custkey']) + .join_left(o, [['c_custkey'], ['o_custkey']]) .derive({ o_orderkey_not_null: (d: any) => (d.o_orderkey != null ? 1 : 0), }) @@ -421,7 +421,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.l_receiptdate >= aq.op.timestamp('1995-09-01') && d.l_receiptdate <= aq.op.timestamp('1995-09-30'), ) - .join(this.tables['part'], ['l_partkey', 'p_partkey']) + .join(this.tables['part'], [['l_partkey'], ['p_partkey']]) .derive({ realprice: (d: any) => d.l_extendedprice * (1 - d.l_discount), promoprice: (d: any) => @@ -458,7 +458,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { total_revenue: (d: any) => aq.op.max(d.revenue), }) .join(temp, (a: any, b: any) => aq.op.equal(a.total_revenue, b.revenue)) - .join(this.tables['supplier'], ['l_suppkey', 's_suppkey']) + .join(this.tables['supplier'], [['l_suppkey'], ['s_suppkey']]) .orderby('s_suppkey'); for (const v of query.objects({ grouped: true })) { noop(v); @@ -482,8 +482,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.p_size == 45 || d.p_size == 19), ) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) - .antijoin(supplier, ['ps_partkey', 's_suppkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) + .antijoin(supplier, [['ps_partkey'], ['s_suppkey']]) .groupby('p_brand', 'p_type', 'p_size') .rollup({ supplier_cnt: aq.op.distinct('ps_suppkey'), @@ -497,7 +497,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 17: { const tmp = this.tables['part'] .filter((d: any) => d.p_brand == 'Brand#23' && d.p_container == 'MED BOX') - .join(this.tables['lineitem'], ['p_partkey', 'l_partkey']); + .join(this.tables['lineitem'], [['p_partkey'], ['l_partkey']]); const agg = tmp.groupby('p_partkey').rollup({ avg_qty: aq.op.mean('l_quantity'), }); @@ -518,9 +518,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { quantity: aq.op.sum('l_quantity'), }) .filter((d: any) => d.quantity > 300) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']) - .join(this.tables['lineitem'], ['o_orderkey', 'l_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]) + .join(this.tables['lineitem'], [['o_orderkey'], ['l_orderkey']]) .groupby('c_name', 'c_custkey', 'o_orderkey', 'o_orderdate', 'o_totalprice') .rollup({ quantity: aq.op.sum('l_quantity'), @@ -609,7 +609,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { }); const sub = this.tables['part'] .filter((d: any) => aq.op.match(d.p_name, /^forest.*$/, 0) != null) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) .join( qty, (a: any, b: any) => @@ -619,8 +619,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ); const query = this.tables['nation'] .filter((d: any) => d.n_name == 'CANADA') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .semijoin(sub, ['s_suppkey', 'ps_suppkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .semijoin(sub, [['s_suppkey'], ['ps_suppkey']]) .orderby('s_name'); for (const v of query.objects({ grouped: true })) { noop(v); @@ -632,9 +632,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const orders = this.tables['orders'].filter((d: any) => d.o_orderstatus == 'F'); const query = this.tables['nation'] .filter((d: any) => d.n_name == 'SAUDI ARABIA') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .join(lineitem, ['s_suppkey', 'l_suppkey']) - .join(orders, ['l_orderkey', 'o_orderkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .join(lineitem, [['s_suppkey'], ['l_suppkey']]) + .join(orders, [['l_orderkey'], ['o_orderkey']]) .antijoin(lineitem, (a: any, b: any) => a.l_suppkey != b.l_suppkey && a.l_orderkey == b.l_orderkey) .semijoin( this.tables['lineitem'], @@ -661,7 +661,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { }); const query = customers .join(total_avg, (a: any, b: any) => a.c_acctbal > b.avg_c_acctbal) - .antijoin(this.tables['orders'], ['c_custkey', 'o_custkey']) + .antijoin(this.tables['orders'], [['c_custkey'], ['o_custkey']]) .derive({ cntrycode: (d: any) => aq.op.substring(d.c_phone, 0, 2), }) @@ -979,7 +979,7 @@ export class ArqueroIntegerJoin2Benchmark implements SystemBenchmark { .params({ filter }) .filter((row: any) => row.v0 < filter) .rename({ v0: 'a0' }) - .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), ['a0', 'b1']); + .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), [['a0'], ['b1']]); let n = 0; for (const v of result) { noop(v); @@ -1048,8 +1048,8 @@ export class ArqueroIntegerJoin3Benchmark implements SystemBenchmark { .params({ filter }) .filter((row: any) => row.v0 < filter) .rename({ v0: 'a0' }) - .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), ['a0', 'b1']) - .join(this.tables['C'].rename({ v0: 'c0', v1: 'c1' }), ['b0', 'c1']); + .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), [['a0'], ['b1']]) + .join(this.tables['C'].rename({ v0: 'c0', v1: 'c1' }), [['b0'], ['c1']]); let n = 0; for (const v of result) { noop(v); diff --git a/packages/duckdb-wasm/karma/s3rver/s3rver.js b/packages/duckdb-wasm/karma/s3rver/s3rver.js index d133af956..72a8c51ec 100644 --- a/packages/duckdb-wasm/karma/s3rver/s3rver.js +++ b/packages/duckdb-wasm/karma/s3rver/s3rver.js @@ -7,6 +7,7 @@ const CORS_CONFIG = "\n" + " GET\n" + " HEAD\n" + " *\n" + + " Content-Range\n" + " \n" + ""; diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index 70d5475cf..84f01c33d 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -469,13 +469,49 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } dropResponseBuffers(this.mod); } + /** Prepare a file handle that could only be acquired aschronously */ + public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) { + const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS); + for (const item of list) { + const { handle, path: filePath, fromCached } = item; + if (!fromCached && handle.getSize()) { + await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true); + } + } + return; + } + throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`); + } /** Register a file object URL */ - public registerFileHandle( + public async registerFileHandle( name: string, handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, - ): void { + ): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) { + if( handle instanceof FileSystemSyncAccessHandle ){ + // already a handle is sync handle. + } else if( handle instanceof FileSystemFileHandle ){ + // handle is an async handle, should convert to sync handle + const fileHandle: FileSystemFileHandle = handle as any; + try { + handle = (await fileHandle.createSyncAccessHandle()) as any; + } catch (e: any) { + throw new Error( e.message + ":" + name ); + } + } else if( name != null ){ + // should get sync handle from the file name. + try { + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle(name); + handle = (await fileHandle.createSyncAccessHandle()) as any; + } catch (e: any) { + throw new Error( e.message + ":" + name ); + } + } + } const [s, d, n] = callSRet( this.mod, 'duckdb_web_fs_register_file_url', @@ -487,6 +523,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } dropResponseBuffers(this.mod); globalThis.DUCKDB_RUNTIME._files = (globalThis.DUCKDB_RUNTIME._files || new Map()).set(name, handle); + if (globalThis.DUCKDB_RUNTIME._preparedHandles?.[name]) { + delete globalThis.DUCKDB_RUNTIME._preparedHandles[name]; + } if (this.pthread) { for (const worker of this.pthread.runningWorkers) { worker.postMessage({ diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 4d2ad5e17..31ccca43b 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -41,7 +41,8 @@ export interface DuckDBBindings { handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, - ): void; + ): Promise; + prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise; globFiles(path: string): WebFile[]; dropFile(name: string): void; dropFiles(): void; diff --git a/packages/duckdb-wasm/src/bindings/config.ts b/packages/duckdb-wasm/src/bindings/config.ts index ed5bb3cdb..ce29ca0f5 100644 --- a/packages/duckdb-wasm/src/bindings/config.ts +++ b/packages/duckdb-wasm/src/bindings/config.ts @@ -50,6 +50,10 @@ export interface DuckDBConfig { * Note that this will only work with cross-origin isolated sites since it requires SharedArrayBuffers. */ maximumThreads?: number; + /** + * The direct io flag + */ + useDirectIO?: boolean; /** * The query config */ diff --git a/packages/duckdb-wasm/src/bindings/runtime.ts b/packages/duckdb-wasm/src/bindings/runtime.ts index 83632deca..4bc360db3 100644 --- a/packages/duckdb-wasm/src/bindings/runtime.ts +++ b/packages/duckdb-wasm/src/bindings/runtime.ts @@ -89,6 +89,12 @@ export interface DuckDBGlobalFileInfo { s3Config?: S3Config; } +export interface PreparedDBFileHandle { + path: string; + handle: any; + fromCached: boolean; +} + /** Call a function with packed response buffer */ export function callSRet( mod: DuckDBModule, @@ -134,6 +140,7 @@ export interface DuckDBRuntime { openFile(mod: DuckDBModule, fileId: number, flags: FileFlags): void; syncFile(mod: DuckDBModule, fileId: number): void; closeFile(mod: DuckDBModule, fileId: number): void; + dropFile(mod: DuckDBModule, fileNamePtr: number, fileNameLen:number): void; getLastFileModificationTime(mod: DuckDBModule, fileId: number): number; truncateFile(mod: DuckDBModule, fileId: number, newSize: number): void; readFile(mod: DuckDBModule, fileId: number, buffer: number, bytes: number, location: number): number; @@ -149,6 +156,9 @@ export interface DuckDBRuntime { checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean; removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void; + // Prepare a file handle that could only be acquired aschronously + prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise; + // Call a scalar UDF function callScalarUDF( mod: DuckDBModule, @@ -169,6 +179,7 @@ export const DEFAULT_RUNTIME: DuckDBRuntime = { openFile: (_mod: DuckDBModule, _fileId: number, flags: FileFlags): void => {}, syncFile: (_mod: DuckDBModule, _fileId: number): void => {}, closeFile: (_mod: DuckDBModule, _fileId: number): void => {}, + dropFile: (_mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number): void => {}, getLastFileModificationTime: (_mod: DuckDBModule, _fileId: number): number => { return 0; }, diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index 2c0c270d3..8e46955fe 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -1,5 +1,5 @@ -import { StatusCode } from '../status'; -import { addS3Headers, getHTTPUrl } from '../utils'; +import {StatusCode} from '../status'; +import {addS3Headers, getHTTPUrl} from '../utils'; import { callSRet, @@ -11,13 +11,19 @@ import { failWith, FileFlags, readString, + PreparedDBFileHandle, } from './runtime'; import { DuckDBModule } from './duckdb_module'; import * as udf from './udf_runtime'; +const OPFS_PREFIX_LEN = 'opfs://'.length; +const PATH_SEP_REGEX = /\/|\\/; + export const BROWSER_RUNTIME: DuckDBRuntime & { + _files: Map; _fileInfoCache: Map; _globalFileInfo: DuckDBGlobalFileInfo | null; + _preparedHandles: Record; getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null; getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null; @@ -26,6 +32,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _fileInfoCache: new Map(), _udfFunctions: new Map(), _globalFileInfo: null, + _preparedHandles: {} as any, getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null { try { @@ -47,13 +54,17 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { try { const info = JSON.parse(infoStr); if (info == null) { - return null; + return null; } const file = { ...info, blob: null } as DuckDBFileInfo; BROWSER_RUNTIME._fileInfoCache.set(fileId, file); + if (!BROWSER_RUNTIME._files.has(file.fileName) && BROWSER_RUNTIME._preparedHandles[file.fileName]) { + BROWSER_RUNTIME._files.set(file.fileName, BROWSER_RUNTIME._preparedHandles[file.fileName]); + delete BROWSER_RUNTIME._preparedHandles[file.fileName]; + } return file; } catch (error) { - console.warn(error); + console.warn(error); return null; } } catch (e: any) { @@ -82,7 +93,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { if (info == null) { return null; } - BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null } as DuckDBGlobalFileInfo; + BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null} as DuckDBGlobalFileInfo; return BROWSER_RUNTIME._globalFileInfo; } catch (e: any) { @@ -91,6 +102,63 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } }, + /** Prepare a file handle that could only be acquired aschronously */ + async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) { + const filePaths = [dbPath, `${dbPath}.wal`]; + const prepare = async (path: string): Promise => { + if (BROWSER_RUNTIME._files.has(path)) { + return { + path, + handle: BROWSER_RUNTIME._files.get(path), + fromCached: true, + }; + } + const opfsRoot = await navigator.storage.getDirectory(); + let dirHandle: FileSystemDirectoryHandle = opfsRoot; + // check if mkdir -p is needed + const opfsPath = path.slice(OPFS_PREFIX_LEN); + let fileName = opfsPath; + if (PATH_SEP_REGEX.test(opfsPath)) { + const folders = opfsPath.split(PATH_SEP_REGEX); + fileName = folders.pop()!; + if (!fileName) { + throw new Error(`Invalid path ${path}`); + } + // mkdir -p + for (const folder of folders) { + dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true }); + } + } + const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => { + if (e?.name === 'NotFoundError') { + console.debug(`File ${path} does not exists yet, creating...`); + return dirHandle.getFileHandle(fileName, { create: true }); + } + throw e; + }); + try { + const handle = await fileHandle.createSyncAccessHandle(); + BROWSER_RUNTIME._preparedHandles[path] = handle; + return { + path, + handle, + fromCached: false, + }; + } catch (e: any) { + throw new Error(e.message + ":" + name); + } + }; + const result: PreparedDBFileHandle[] = []; + for (const filePath of filePaths) { + const res = await prepare(filePath); + result.push(res); + } + return result; + } + throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`); + }, + testPlatformFeature: (_mod: DuckDBModule, feature: number): boolean => { switch (feature) { case 1: @@ -160,32 +228,32 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { let contentLength = null; let error: any | null = null; if (file.reliableHeadRequests || !file.allowFullHttpReads) { - try { - // Send a dummy HEAD request with range protocol - // -> good IFF status is 206 and contentLenght is present - const xhr = new XMLHttpRequest(); - if (file.dataProtocol == DuckDBDataProtocol.S3) { - xhr.open('HEAD', getHTTPUrl(file.s3Config, file.dataUrl!), false); - addS3Headers(xhr, file.s3Config, file.dataUrl!, 'HEAD'); - } else { - xhr.open('HEAD', file.dataUrl!, false); - } - xhr.setRequestHeader('Range', `bytes=0-`); - xhr.send(null); + try { + // Send a dummy HEAD request with range protocol + // -> good IFF status is 206 and contentLenght is present + const xhr = new XMLHttpRequest(); + if (file.dataProtocol == DuckDBDataProtocol.S3) { + xhr.open('HEAD', getHTTPUrl(file.s3Config, file.dataUrl!), false); + addS3Headers(xhr, file.s3Config, file.dataUrl!, 'HEAD'); + } else { + xhr.open('HEAD', file.dataUrl!, false); + } + xhr.setRequestHeader('Range', `bytes=0-`); + xhr.send(null); - // Supports range requests - contentLength = xhr.getResponseHeader('Content-Length'); - if (contentLength !== null && xhr.status == 206) { - const result = mod._malloc(2 * 8); - mod.HEAPF64[(result >> 3) + 0] = +contentLength; - mod.HEAPF64[(result >> 3) + 1] = 0; - return result; - } + // Supports range requests + contentLength = xhr.getResponseHeader('Content-Length'); + if (contentLength !== null && xhr.status == 206) { + const result = mod._malloc(2 * 8); + mod.HEAPF64[(result >> 3) + 0] = +contentLength; + mod.HEAPF64[(result >> 3) + 1] = 0; + return result; + } - } catch (e: any) { - error = e; - console.warn(`HEAD request with range header failed: ${e}`); - } + } catch (e: any) { + error = e; + console.warn(`HEAD request with range header failed: ${e}`); + } } // Try to fallback to full read? @@ -223,7 +291,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } head.setRequestHeader('Range', `bytes=0-`); head.send(null); - + // Supports range requests contentLength = head.getResponseHeader('Content-Length'); if (contentLength !== null && +contentLength > 1) { @@ -296,6 +364,20 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { mod.HEAPF64[(result >> 3) + 1] = buffer; return result; } + case DuckDBDataProtocol.BROWSER_FSACCESS: { + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); + if (!handle) { + throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); + } + if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) { + handle.truncate(0); + } + const result = mod._malloc(2 * 8); + const fileSize = handle.getSize(); + mod.HEAPF64[(result >> 3) + 0] = fileSize; + mod.HEAPF64[(result >> 3) + 1] = 0; + return result; + } } } catch (e: any) { // TODO (samansmink): this path causes the WASM code to hang @@ -343,11 +425,19 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return 0; } const contentLength = xhr2.getResponseHeader('Content-Length'); - if (contentLength && (+contentLength > 1)) { - console.warn(`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`); + if (contentLength && +contentLength > 1) { + console.warn( + `Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`, + ); } } mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [path]); + } else { + for (const [filePath] of BROWSER_RUNTIME._files!.entries() || []) { + if (filePath.startsWith(path)) { + mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [filePath]); + } + } } } catch (e: any) { console.log(e); @@ -372,6 +462,8 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } xhr.send(null); return xhr.status == 206 || xhr.status == 200; + } else { + return BROWSER_RUNTIME._files.has(path); } } catch (e: any) { console.log(e); @@ -393,7 +485,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { // XXX Remove from registry return; case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -401,6 +493,24 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } } }, + dropFile: (mod: DuckDBModule, fileNamePtr: number, fileNameLen: number) => { + const fileName = readString(mod, fileNamePtr, fileNameLen); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName); + if (handle) { + BROWSER_RUNTIME._files.delete(fileName); + if (handle instanceof FileSystemSyncAccessHandle) { + try { + handle.flush(); + handle.close(); + } catch (e: any) { + throw new Error(`Cannot drop file with name: ${fileName}`); + } + } + if (handle instanceof Blob) { + // nothing + } + } + }, truncateFile: (mod: DuckDBModule, fileId: number, newSize: number) => { const file = BROWSER_RUNTIME.getFileInfo(mod, fileId); switch (file?.dataProtocol) { @@ -461,8 +571,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } else if (xhr.status == 200) { // TODO: here we are actually throwing away all non-relevant bytes, but this is still better than failing // proper solution would require notifying duckdb-wasm cache, while we are piggybackign on browser cache - console.warn(`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`); - const src = new Uint8Array(xhr.response, location, Math.min(xhr.response.byteLength-location, bytes)); + console.warn( + `Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`, + ); + const src = new Uint8Array( + xhr.response, + location, + Math.min(xhr.response.byteLength - location, bytes), + ); mod.HEAPU8.set(src, buf); return src.byteLength; } else { @@ -486,7 +602,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return data.byteLength; } case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -523,7 +639,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { failWith(mod, 'cannot write using the html5 file reader api'); return 0; case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } diff --git a/packages/duckdb-wasm/src/bindings/runtime_node.ts b/packages/duckdb-wasm/src/bindings/runtime_node.ts index aa63822e2..2f92368bc 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_node.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_node.ts @@ -127,6 +127,7 @@ export const NODE_RUNTIME: DuckDBRuntime & { } return 0; }, + dropFile: (mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number) => {}, truncateFile: (mod: DuckDBModule, fileId: number, newSize: number) => { try { const file = NODE_RUNTIME.resolveFileInfo(mod, fileId); diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 9e1c9aa41..2ebcfd07c 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -1,4 +1,4 @@ -import { DuckDBBindings } from '../bindings'; +import { DuckDBBindings, DuckDBDataProtocol } from '../bindings'; import { WorkerResponseVariant, WorkerRequestVariant, WorkerRequestType, WorkerResponseType } from './worker_request'; import { Logger, LogEntryVariant } from '../log'; import { InstantiationProgress } from '../bindings/progress'; @@ -134,10 +134,16 @@ export abstract class AsyncDuckDBDispatcher implements Logger { this.sendOK(request); break; - case WorkerRequestType.OPEN: + case WorkerRequestType.OPEN: { + const path = request.data.path; + if (path?.startsWith('opfs://')) { + await this._bindings.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS); + request.data.useDirectIO = true; + } this._bindings.open(request.data); this.sendOK(request); break; + } case WorkerRequestType.DROP_FILE: this._bindings.dropFile(request.data); this.sendOK(request); @@ -322,7 +328,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { break; case WorkerRequestType.REGISTER_FILE_HANDLE: - this._bindings.registerFileHandle( + await this._bindings.registerFileHandle( request.data[0], request.data[1], request.data[2], diff --git a/packages/duckdb-wasm/test/httpfs_test.ts b/packages/duckdb-wasm/test/httpfs_test.ts index 6a0762d13..4ffe2ac0b 100644 --- a/packages/duckdb-wasm/test/httpfs_test.ts +++ b/packages/duckdb-wasm/test/httpfs_test.ts @@ -2,6 +2,7 @@ import * as duckdb from '../src/'; import { getS3Params, S3Params, S3PayloadParams, createS3Headers, uriEncode, getHTTPUrl } from '../src/utils'; import { AsyncDuckDBConnection, DuckDBBindings, DuckDBBindingsBase, DuckDBModule } from '../src/'; import BROWSER_RUNTIME from '../src/bindings/runtime_browser'; +import {generateLongQueryString} from "./string_test_helper"; // S3 config for tests const BUCKET_NAME = 'test-bucket'; @@ -312,5 +313,31 @@ export function testHTTPFSAsync( ), ).toBeRejectedWithError('Invalid Error: File is not opened in write mode'); }); + + it('can read parquet file from URL with long query string', async () => { + // Create S3 file + const data = await resolveData('/uni/studenten.parquet'); + await putTestFileToS3('correct_auth_test', 'parquet', data); + // Generate a long query string, similar to an S3 Presigned URL + const queryString = generateLongQueryString(); + // Execute the query + const result = await conn!.query( + `SELECT * FROM "${S3_ENDPOINT}/${BUCKET_NAME}/correct_auth_test.parquet?${queryString}";`, + ); + expect(Number((result.getChildAt(0)?.get(6)))).toEqual(Number(29120)); + }); + + it('can read csv file from URL with long query string', async () => { + // Create S3 file + const data = await resolveData('/uni/studenten.parquet'); + await putTestFileToS3('correct_auth_test', 'csv', data); + // Generate a long query string, similar to an S3 Presigned URL + const queryString = generateLongQueryString(); + // Execute the query + const result = await conn!.query( + `SELECT * FROM "${S3_ENDPOINT}/${BUCKET_NAME}/correct_auth_test.csv?${queryString}";`, + ); + expect(Number((result.getChildAt(0)?.get(6)))).toEqual(Number(29120)); + }); }); } diff --git a/packages/duckdb-wasm/test/index_browser.ts b/packages/duckdb-wasm/test/index_browser.ts index 0e3ad0c75..1588c4fdd 100644 --- a/packages/duckdb-wasm/test/index_browser.ts +++ b/packages/duckdb-wasm/test/index_browser.ts @@ -100,6 +100,7 @@ import { testBindings, testAsyncBindings } from './bindings.test'; import { testBatchStream } from './batch_stream.test'; import { testAsyncBatchStream } from './batch_stream_async.test'; import { testFilesystem } from './filesystem.test'; +import { testOPFS } from './opfs.test'; import { testArrowInsert, testArrowInsertAsync } from './insert_arrow.test'; import { testJSONInsert, testJSONInsertAsync } from './insert_json.test'; import { testCSVInsert, testCSVInsertAsync } from './insert_csv.test'; @@ -128,6 +129,7 @@ testAsyncBindings(() => adb!, dataURL, duckdb.DuckDBDataProtocol.HTTP); testBatchStream(() => db!); testAsyncBatchStream(() => adb!); testFilesystem(() => adb!, resolveData, dataURL, duckdb.DuckDBDataProtocol.HTTP); +testOPFS(dataURL, () => DUCKDB_BUNDLE!); testArrowInsert(() => db!); testArrowInsertAsync(() => adb!); testJSONInsert(() => db!); diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts new file mode 100644 index 000000000..47b9e2497 --- /dev/null +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -0,0 +1,306 @@ +import * as duckdb from '../src/'; +import {LogLevel} from '../src/'; +import * as arrow from 'apache-arrow'; + +export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): void { + let db: duckdb.AsyncDuckDB; + let conn: duckdb.AsyncDuckDBConnection; + + beforeAll(async () => { + removeFiles(); + }); + + afterAll(async () => { + if (conn) { + await conn.close(); + } + if (db) { + await db.terminate(); + } + removeFiles(); + }); + + beforeEach(async () => { + removeFiles(); + // + const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); + const worker = new Worker(bundle().mainWorker!); + db = new duckdb.AsyncDuckDB(logger, worker); + await db.instantiate(bundle().mainModule, bundle().pthreadWorker); + await db.open({ + path: 'opfs://test.db', + accessMode: duckdb.DuckDBAccessMode.READ_WRITE + }); + conn = await db.connect(); + }); + + afterEach(async () => { + if (conn) { + await conn.close(); + } + if (db) { + await db.terminate(); + } + removeFiles(); + }); + + describe('Load Data in OPFS', () => { + it('Import Small Parquet file', async () => { + await conn.send(`CREATE TABLE stu AS SELECT * FROM "${baseDir}/uni/studenten.parquet"`); + await conn.send(`CHECKPOINT;`); + const result = await conn.send(`SELECT matrnr FROM stu;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.toArray()).toEqual( + new Int32Array([24002, 25403, 26120, 26830, 27550, 28106, 29120, 29555]), + ); + }); + + it('Import Larget Parquet file', async () => { + await conn.send(`CREATE TABLE lineitem AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`CHECKPOINT;`); + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Existing DB File', async () => { + await conn.send(`CREATE TABLE tmp AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`CHECKPOINT;`); + await conn.close(); + await db.terminate(); + + const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); + const worker = new Worker(bundle().mainWorker!); + db = new duckdb.AsyncDuckDB(logger, worker); + await db.instantiate(bundle().mainModule, bundle().pthreadWorker); + await db.open({ + path: 'opfs://test.db', + accessMode: duckdb.DuckDBAccessMode.READ_WRITE + }); + conn = await db.connect(); + + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM tmp;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already with empty handler', async () => { + //1. write to opfs + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + //2. handle is empty object, because worker gets a File Handle using the file name. + await db.registerFileHandle('test.parquet', null, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already with opfs file handler in datadir', async () => { + //1. write to opfs + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const datadir = await opfsRoot.getDirectoryHandle("datadir", {create: true}); + const fileHandle = await datadir.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + //2. handle is opfs file handler + await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already', async () => { + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + + await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + await conn.send(`CREATE TABLE lineitem2 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + await conn.send(`CREATE TABLE lineitem3 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + { + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + { + const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem2;`); + const batches2 = []; + for await (const batch of result2) { + batches2.push(batch); + } + const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2); + expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + { + const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem3;`); + const batches3 = []; + for await (const batch of result3) { + batches3.push(batch); + } + const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3); + expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + }); + + it('Drop File + Export as CSV to OPFS + Load CSV', async () => { + const opfsRoot = await navigator.storage.getDirectory(); + const testHandle = await opfsRoot.getFileHandle('test.csv', {create: true}); + await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test.csv'`); + await conn.close(); + await db.dropFile('test.csv'); + await db.reset(); + + await db.open({}); + conn = await db.connect(); + await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test.csv';`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + + await db.dropFile('test.csv'); + }); + + + it('Drop Files + Export as CSV to OPFS + Load CSV', async () => { + const opfsRoot = await navigator.storage.getDirectory(); + const testHandle1 = await opfsRoot.getFileHandle('test1.csv', {create: true}); + const testHandle2 = await opfsRoot.getFileHandle('test2.csv', {create: true}); + const testHandle3 = await opfsRoot.getFileHandle('test3.csv', {create: true}); + await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test1.csv'`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test2.csv'`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test3.csv'`); + await conn.close(); + + await db.dropFiles(); + await db.reset(); + + await db.open({}); + conn = await db.connect(); + await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + { + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test1.csv';`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + { + const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test2.csv';`); + const batches2 = []; + for await (const batch of result2) { + batches2.push(batch); + } + const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2); + expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + { + const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test3.csv';`); + const batches3 = []; + for await (const batch of result3) { + batches3.push(batch); + } + const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3); + expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + await db.dropFiles(); + }); + }); + + async function removeFiles() { + const opfsRoot = await navigator.storage.getDirectory(); + await opfsRoot.removeEntry('test.db').catch(() => { + }); + await opfsRoot.removeEntry('test.db.wal').catch(() => { + }); + await opfsRoot.removeEntry('test.csv').catch(() => { + }); + await opfsRoot.removeEntry('test1.csv').catch(() => { + }); + await opfsRoot.removeEntry('test2.csv').catch(() => { + }); + await opfsRoot.removeEntry('test3.csv').catch(() => { + }); + await opfsRoot.removeEntry('test.parquet').catch(() => { + }); + try { + const datadir = await opfsRoot.getDirectoryHandle('datadir'); + datadir.removeEntry('test.parquet').catch(() => { + }); + } catch (e) { + // + } + await opfsRoot.removeEntry('datadir').catch(() => { + }); + } +} diff --git a/packages/duckdb-wasm/test/string_test_helper.ts b/packages/duckdb-wasm/test/string_test_helper.ts new file mode 100644 index 000000000..96e83b823 --- /dev/null +++ b/packages/duckdb-wasm/test/string_test_helper.ts @@ -0,0 +1,20 @@ +export function generateLongQueryString(): string { + const aaa = repeatCharacter('A', 512); + const ccc = repeatCharacter('C', 256); + const ddd = repeatCharacter('D', 512); + const eee = repeatCharacter('E', 256); + const ggg = repeatCharacter('G', 128); + + return `test=inline` + + `&Test-Security-Token=${aaa}` + + `&Test-Algorithm=${ccc}` + + `&Test-Date=${ddd}` + + `&Test-SignedHeaders=host` + + `&Test-Expires=43200` + + `&Test-Credential=${eee}` + + `&Test-Signature=${ggg}`; +} + +export function repeatCharacter(char: string, length: number): string { + return char.repeat(length); +} \ No newline at end of file diff --git a/patches/duckdb/bind_copy_direct_io.patch b/patches/duckdb/bind_copy_direct_io.patch new file mode 100644 index 000000000..994917fe8 --- /dev/null +++ b/patches/duckdb/bind_copy_direct_io.patch @@ -0,0 +1,15 @@ +diff --git a/src/planner/binder/statement/bind_copy.cpp b/src/planner/binder/statement/bind_copy.cpp +index 7db1db812d..60131d5916 100644 +--- a/src/planner/binder/statement/bind_copy.cpp ++++ b/src/planner/binder/statement/bind_copy.cpp +@@ -137,7 +137,9 @@ BoundStatement Binder::BindCopyTo(CopyStatement &stmt) { + throw NotImplementedException("Can't combine FILE_SIZE_BYTES and PARTITION_BY for COPY"); + } + bool is_remote_file = FileSystem::IsRemoteFile(stmt.info->file_path); +- if (is_remote_file) { ++ if ( is_remote_file ) { ++ use_tmp_file = false; ++ } else if( config.options.use_direct_io ) { + use_tmp_file = false; + } else { + auto &fs = FileSystem::GetFileSystem(context); diff --git a/patches/duckdb/fix_load_database.patch b/patches/duckdb/fix_load_database.patch new file mode 100644 index 000000000..e9d785701 --- /dev/null +++ b/patches/duckdb/fix_load_database.patch @@ -0,0 +1,20 @@ +diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp +index 456cd22614..c4777dd28e 100644 +--- a/src/storage/storage_manager.cpp ++++ b/src/storage/storage_manager.cpp +@@ -163,9 +163,12 @@ void SingleFileStorageManager::LoadDatabase(const optional_idx block_alloc_size) + options.use_direct_io = config.options.use_direct_io; + options.debug_initialize = config.options.debug_initialize; + +- // Check if the database file already exists. +- // Note: a file can also exist if there was a ROLLBACK on a previous transaction creating that file. +- if (!read_only && !fs.FileExists(path)) { ++ auto db_file_handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_NULL_IF_NOT_EXISTS); ++ bool is_empty_file = db_file_handle->GetFileSize() == 0; ++ db_file_handle.reset(); ++ ++ // first check if the database exists ++ if (!read_only && ( !fs.FileExists(path) || ( options.use_direct_io && is_empty_file )) ) { + // file does not exist and we are in read-write mode + // create a new file +