From 0fa03d7f575f36123d4dc700125f3a803a3f921b Mon Sep 17 00:00:00 2001 From: mgrenonville Date: Fri, 13 Dec 2024 16:06:46 +0100 Subject: [PATCH 1/2] Rework chdb-node api This commit includes the new LocalChdb api using connect / chdb_conn to offer stateful query with long live clickhouse engine instance bind with connection. It also include a new NApi wrapper around local_result_v2 to prevent unnecessary copy and direct access to byte array (in case you want to use binary format and nodejs as a passhrough process). Thanks to `Napi::ObjectWrap`, nodejs is correctly calling free when needed (ie: the result isn't referenced anymore, thus garbage collected) --- binding.gyp | 6 ++- index.d.ts | 61 +++++++++++++++++++++++ index.js | 68 +++++++++++++++++++++++++- lib/LocalResultV2Wrapper.cpp | 68 ++++++++++++++++++++++++++ lib/LocalResultV2Wrapper.h | 30 ++++++++++++ lib/chdb_connect_api.cpp | 93 ++++++++++++++++++++++++++++++++++++ lib/chdb_connect_api.h | 10 ++++ lib/chdb_node.cpp | 74 ++++++++++++++-------------- lib/chdb_node.h | 5 ++ lib/module.cpp | 21 ++++++++ package-lock.json | 4 +- test.js | 45 ++++++++++++++++- 12 files changed, 439 insertions(+), 46 deletions(-) create mode 100644 lib/LocalResultV2Wrapper.cpp create mode 100644 lib/LocalResultV2Wrapper.h create mode 100644 lib/chdb_connect_api.cpp create mode 100644 lib/chdb_connect_api.h create mode 100644 lib/module.cpp diff --git a/binding.gyp b/binding.gyp index 3692a93..0f0e242 100644 --- a/binding.gyp +++ b/binding.gyp @@ -2,7 +2,11 @@ "targets": [ { "target_name": "chdb_node", - "sources": [ "lib/chdb_node.cpp" ], + "sources": [ "lib/chdb_node.cpp", + "lib/chdb_connect_api.cpp", + "lib/LocalResultV2Wrapper.cpp", + "lib/module.cpp" + ], "include_dirs": [ "(info) { + result_ = info[0].As>().Data(); +} + +LocalResultV2Wrapper::~LocalResultV2Wrapper() { + if (result_ != nullptr) { + free_result_v2(result_); + } +} + +Napi::Object LocalResultV2Wrapper::NewInstance( + Napi::Env env, Napi::External external) { + return constructor.New({external}); +} + +// Accessor Implementations +Napi::Value LocalResultV2Wrapper::GetBuffer(const Napi::CallbackInfo &info) { + return Napi::Buffer::New(info.Env(), result_->buf, result_->len); +} + +Napi::Value LocalResultV2Wrapper::GetLength(const Napi::CallbackInfo &info) { + return Napi::Number::New(info.Env(), result_->len); +} + +Napi::Value LocalResultV2Wrapper::GetElapsed(const Napi::CallbackInfo &info) { + return Napi::Number::New(info.Env(), result_->elapsed); +} + +Napi::Value LocalResultV2Wrapper::GetRowsRead(const Napi::CallbackInfo &info) { + return Napi::Number::New(info.Env(), result_->rows_read); +} + +Napi::Value LocalResultV2Wrapper::GetBytesRead(const Napi::CallbackInfo &info) { + return Napi::Number::New(info.Env(), result_->bytes_read); +} + +Napi::Value LocalResultV2Wrapper::GetErrorMessage( + const Napi::CallbackInfo &info) { + return result_->error_message == nullptr + ? info.Env().Null() + : Napi::String::New(info.Env(), result_->error_message); +} diff --git a/lib/LocalResultV2Wrapper.h b/lib/LocalResultV2Wrapper.h new file mode 100644 index 0000000..0fa58f3 --- /dev/null +++ b/lib/LocalResultV2Wrapper.h @@ -0,0 +1,30 @@ +#ifndef LOCAL_RESULT_V2_WRAPPER_H +#define LOCAL_RESULT_V2_WRAPPER_H + +#include + +#include "chdb.h" + +class LocalResultV2Wrapper : public Napi::ObjectWrap { + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + static Napi::Object NewInstance(Napi::Env env, + Napi::External external); + + LocalResultV2Wrapper(const Napi::CallbackInfo &info); + ~LocalResultV2Wrapper(); + + private: + static Napi::FunctionReference constructor; + local_result_v2 *result_; + + // Accessors + Napi::Value GetBuffer(const Napi::CallbackInfo &info); + Napi::Value GetLength(const Napi::CallbackInfo &info); + Napi::Value GetElapsed(const Napi::CallbackInfo &info); + Napi::Value GetRowsRead(const Napi::CallbackInfo &info); + Napi::Value GetBytesRead(const Napi::CallbackInfo &info); + Napi::Value GetErrorMessage(const Napi::CallbackInfo &info); +}; + +#endif // LOCAL_RESULT_V2_WRAPPER_H diff --git a/lib/chdb_connect_api.cpp b/lib/chdb_connect_api.cpp new file mode 100644 index 0000000..86874f8 --- /dev/null +++ b/lib/chdb_connect_api.cpp @@ -0,0 +1,93 @@ + +#include +#include +#include +#include + +#include + +#include "LocalResultV2Wrapper.h" +#include "chdb.h" +#include "chdb_node.h" + +// Wrapper to free_result_v2 +void FreeResultV2(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + if (info.Length() != 1 || !info[0].IsExternal()) { + Napi::TypeError::New(env, "Expected an external local_result_v2").ThrowAsJavaScriptException(); + return; + } + + auto result = info[0].As>().Data(); + free_result_v2(result); +} + +// Wrapper to connect_chdb +Napi::Value ConnectChdb(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + if (info.Length() < 1 || !info[0].IsArray()) { + Napi::TypeError::New(env, "Expected an array of arguments").ThrowAsJavaScriptException(); + return env.Null(); + } + + Napi::Array args = info[0].As(); + std::vector argv; + for (size_t i = 0; i < args.Length(); i++) { + argv.push_back((char *)args.Get(i).As().Utf8Value().c_str()); + } + + auto conn = connect_chdb(argv.size(), argv.data()); + return Napi::External::New(env, *conn); +} + +// Wrapper to close_conn +void CloseConn(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + if (info.Length() != 1 || !info[0].IsExternal()) { + Napi::TypeError::New(env, "Expected an external chdb_conn") + .ThrowAsJavaScriptException(); + return; + } + + auto conn = info[0].As>().Data(); + close_conn(&conn); +} + +// Wrapper to query_conn +Napi::Value QueryConn(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + + if (info.Length() != 3 || !info[0].IsExternal() || !info[2].IsString()) { + Napi::TypeError::New(env, "Expected a connection, query (string or Buffer), and format string") + .ThrowAsJavaScriptException(); + return env.Null(); + } + + auto conn = info[0].As>().Data(); + + // Extract query + const char *queryData; + local_result_v2 *result; + std::string format = info[2].As(); + std::cout << "buffer: " << std::endl; + + if (info[1].IsString()) { + std::string query = info[1].As(); + result = query_conn(conn, query.c_str(), format.c_str()); + } else if (info[1].IsBuffer()) { + Napi::Buffer buffer = info[1].As>(); + result = query_conn(conn, buffer.Data(), format.c_str()); + } else { + Napi::TypeError::New(env, "Query must be a string or a Buffer").ThrowAsJavaScriptException(); + return env.Null(); + } + + + + Napi::Object wrapper = LocalResultV2Wrapper::NewInstance( + env, Napi::External::New(env, result)); + return wrapper; +} diff --git a/lib/chdb_connect_api.h b/lib/chdb_connect_api.h new file mode 100644 index 0000000..9353267 --- /dev/null +++ b/lib/chdb_connect_api.h @@ -0,0 +1,10 @@ +#ifndef SRC_CONNECT_API_H_ +#define SRC_CONNECT_API_H_ + +#include +void FreeResultV2(const Napi::CallbackInfo &info); +Napi::Value ConnectChdb(const Napi::CallbackInfo &info); +Napi::Value CloseConn(const Napi::CallbackInfo &info); +Napi::Value QueryConn(const Napi::CallbackInfo &info); + +#endif \ No newline at end of file diff --git a/lib/chdb_node.cpp b/lib/chdb_node.cpp index 4af15e3..7c1eb66 100644 --- a/lib/chdb_node.cpp +++ b/lib/chdb_node.cpp @@ -1,10 +1,15 @@ -#include "chdb.h" #include "chdb_node.h" + +#include #include #include #include + #include -#include + +#include "LocalResultV2Wrapper.h" +#include "chdb.h" +#include "chdb_connect_api.h" #define MAX_FORMAT_LENGTH 64 #define MAX_PATH_LENGTH 4096 @@ -64,34 +69,6 @@ char *Query(const char *query, const char *format, char **error_message) { return result; } -// QuerySession function will save the session to the path -char *QuerySession(const char *query, const char *format, const char *path, - char **error_message) { - char dataFormat[MAX_FORMAT_LENGTH]; - char dataPath[MAX_PATH_LENGTH]; - char *dataQuery; - char *args[MAX_ARG_COUNT] = {"clickhouse", "--multiquery", NULL, NULL, NULL}; - int argc = 5; - - construct_arg(dataFormat, "--output-format=", format, MAX_FORMAT_LENGTH); - args[2] = dataFormat; - - dataQuery = (char *)malloc(strlen(query) + strlen("--query=") + 1); - if (dataQuery == NULL) { - return NULL; - } - construct_arg(dataQuery, "--query=", query, - strlen(query) + strlen("--query=") + 1); - args[3] = dataQuery; - - construct_arg(dataPath, "--path=", path, MAX_PATH_LENGTH); - args[4] = dataPath; - - char *result = general_query(argc, args, error_message); - free(dataQuery); - return result; -} - Napi::String QueryWrapper(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); @@ -121,6 +98,34 @@ Napi::String QueryWrapper(const Napi::CallbackInfo &info) { return Napi::String::New(env, result); } +// QuerySession function will save the session to the path +char *QuerySession(const char *query, const char *format, const char *path, + char **error_message) { + char dataFormat[MAX_FORMAT_LENGTH]; + char dataPath[MAX_PATH_LENGTH]; + char *dataQuery; + char *args[MAX_ARG_COUNT] = {"clickhouse", "--multiquery", NULL, NULL, NULL}; + int argc = 5; + + construct_arg(dataFormat, "--output-format=", format, MAX_FORMAT_LENGTH); + args[2] = dataFormat; + + dataQuery = (char *)malloc(strlen(query) + strlen("--query=") + 1); + if (dataQuery == NULL) { + return NULL; + } + construct_arg(dataQuery, "--query=", query, + strlen(query) + strlen("--query=") + 1); + args[3] = dataQuery; + + construct_arg(dataPath, "--path=", path, MAX_PATH_LENGTH); + args[4] = dataPath; + + char *result = general_query(argc, args, error_message); + free(dataQuery); + return result; +} + Napi::String QuerySessionWrapper(const Napi::CallbackInfo &info) { Napi::Env env = info.Env(); @@ -152,12 +157,3 @@ Napi::String QuerySessionWrapper(const Napi::CallbackInfo &info) { // Return the result return Napi::String::New(env, result); } - -Napi::Object Init(Napi::Env env, Napi::Object exports) { - // Export the functions - exports.Set("Query", Napi::Function::New(env, QueryWrapper)); - exports.Set("QuerySession", Napi::Function::New(env, QuerySessionWrapper)); - return exports; -} - -NODE_API_MODULE(NODE_GYP_MODULE_NAME, Init) \ No newline at end of file diff --git a/lib/chdb_node.h b/lib/chdb_node.h index 24594ad..253276d 100644 --- a/lib/chdb_node.h +++ b/lib/chdb_node.h @@ -1,4 +1,9 @@ #pragma once +#include + char *Query(const char *query, const char *format); char *QuerySession(const char *query, const char *format, const char *path); + +Napi::String QueryWrapper(const Napi::CallbackInfo &info); +Napi::String QuerySessionWrapper(const Napi::CallbackInfo &info); \ No newline at end of file diff --git a/lib/module.cpp b/lib/module.cpp new file mode 100644 index 0000000..657c462 --- /dev/null +++ b/lib/module.cpp @@ -0,0 +1,21 @@ + + +#include "LocalResultV2Wrapper.h" +#include "chdb_connect_api.h" +#include "chdb_node.h" + +Napi::Object Init(Napi::Env env, Napi::Object exports) { + LocalResultV2Wrapper::Init(env, exports); + // Export the functions + exports.Set("Query", Napi::Function::New(env, QueryWrapper)); + exports.Set("QuerySession", Napi::Function::New(env, QuerySessionWrapper)); + + exports.Set(Napi::String::New(env, "freeResultV2"), Napi::Function::New(env, FreeResultV2)); + exports.Set(Napi::String::New(env, "connectChdb"), Napi::Function::New(env, ConnectChdb)); + exports.Set(Napi::String::New(env, "closeConn"), Napi::Function::New(env, CloseConn)); + exports.Set(Napi::String::New(env, "queryConn"), Napi::Function::New(env, QueryConn)); + + return exports; +} + +NODE_API_MODULE(NODE_GYP_MODULE_NAME, Init) \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 8622a25..7876f67 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "chdb", - "version": "1.1.4", + "version": "1.2.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "chdb", - "version": "1.1.4", + "version": "1.2.1", "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { diff --git a/test.js b/test.js index 42c2a16..c613dd9 100644 --- a/test.js +++ b/test.js @@ -1,5 +1,5 @@ const { expect } = require('chai'); -const { query, Session } = require("."); +const { query, Session, LocalChDB } = require("."); describe('chDB Queries', function () { @@ -21,7 +21,7 @@ describe('chDB Queries', function () { before(function () { // Create a new session instance before running the tests - session = new Session("./chdb-node-tmp"); + session = new Session(""); }); after(function () { @@ -57,5 +57,46 @@ describe('chDB Queries', function () { }); }); + + describe('LocalChDB Queries in memory', function () { + let session; + + before(function () { + // Create a new session instance before running the tests + session = new LocalChDB(); + }); + + after(function () { + // Clean up the session after all tests are done + session.cleanup(); + }); + + it('should return a simple query result from session', function () { + const ret = session.query("SELECT 123", "CSV"); + console.log("Session Query Result:", ret); + + expect(ret.getBuffer()).to.be.instanceOf(Buffer); + expect(ret.getBuffer().toString()).to.include('123'); + }); + + it('should create database and table, then insert and query data', function () { + session.query("CREATE TABLE IF NOT EXISTS testtable (id UInt32) ENGINE = Memory"); + session.query("INSERT INTO testtable VALUES (1), (2), (3);"); + + const ret = session.query("SELECT * FROM testtable;", "CSV"); + console.log("Session Query Result:", ret); + expect(ret.getBuffer()).to.be.instanceOf(Buffer); + let retString = ret.getBuffer().toString(); + expect(retString).to.include('1'); + expect(retString).to.include('2'); + expect(retString).to.include('3'); + }); + + it('should throw an error when querying a non-existent table', function () { + expect(() => { + session.query("SELECT * FROM non_existent_table;", "CSV"); + }).to.throw(Error, /Unknown table expression identifier/); + }); + }); }); From cb6bccab144e7ec6fc3e6c279a73e6114bc4d992 Mon Sep 17 00:00:00 2001 From: mgrenonville Date: Thu, 30 Jan 2025 13:33:19 +0100 Subject: [PATCH 2/2] Fix @auxten comments from PR --- index.js | 22 +++++++--------------- test.js | 6 +++--- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/index.js b/index.js index 88a2c96..5b3df46 100644 --- a/index.js +++ b/index.js @@ -12,15 +12,7 @@ function query(query, format = "CSV") { return chdbNode.Query(query, format); } -// Standalone exported query function -function queryBuffer(query, format = "CSV") { - if (!query) { - return ""; - } - return chdbNode.QueryBuffer(query, format); -} - -class LocalChDB { +class Connect { constructor(path = ":memory:") { let args = [] if (path === ":memory:") { @@ -56,6 +48,11 @@ class LocalChDB { return res; } + insert_into(query, values, format = "CSV") { + let q = Buffer.concat([Buffer.from(query), values]) + return this.query(q, format) + } + cleanup() { console.log("cleanup: ", this.isTemp, this.in_memory); @@ -87,11 +84,6 @@ class Session { return chdbNode.QuerySession(query, format, this.path); } - queryBuffer(query, format = "CSV") { - if (!query) return ""; - return chdbNode.QuerySessionBuffer(query, format, this.path); - } - // Cleanup method to delete the temporary directory cleanup() { if (this.isTemp) { @@ -100,4 +92,4 @@ class Session { } } -module.exports = { query, queryBuffer, Session, LocalChDB }; +module.exports = { query, Session, Connect }; diff --git a/test.js b/test.js index c613dd9..0923c9c 100644 --- a/test.js +++ b/test.js @@ -1,5 +1,5 @@ const { expect } = require('chai'); -const { query, Session, LocalChDB } = require("."); +const { query, Session, Connect } = require("."); describe('chDB Queries', function () { @@ -58,12 +58,12 @@ describe('chDB Queries', function () { }); - describe('LocalChDB Queries in memory', function () { + describe('Connect Queries in memory', function () { let session; before(function () { // Create a new session instance before running the tests - session = new LocalChDB(); + session = new Connect(); }); after(function () {