Skip to content

Commit 6cdf9d2

Browse files
authored
Merge pull request #326 from wudidapaopao/yuxiaozhe-dev
Add support for streaming query in chDB
2 parents fc7209a + 99bfd74 commit 6cdf9d2

File tree

13 files changed

+1206
-116
lines changed

13 files changed

+1206
-116
lines changed

README-zh.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,55 @@ print(query("select sum_udf(12,22)"))
147147
参见: [test_udf.py](tests/test_udf.py).
148148
</details>
149149

150+
<details>
151+
<summary><h4>🗂️ 流式查询</h4></summary>
152+
153+
通过分块流式处理大数据集,保持内存使用恒定。
154+
155+
```python
156+
from chdb import session as chs
157+
158+
sess = chs.Session()
159+
160+
# 示例1:流式查询基础用法
161+
rows_cnt = 0
162+
with sess.send_query("SELECT * FROM numbers(200000)", "CSV") as stream_result:
163+
for chunk in stream_result:
164+
rows_cnt += chunk.rows_read()
165+
166+
print(rows_cnt) # 200000
167+
168+
# 示例2:使用fetch()手动迭代
169+
rows_cnt = 0
170+
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
171+
while True:
172+
chunk = stream_result.fetch()
173+
if chunk is None:
174+
break
175+
rows_cnt += chunk.rows_read()
176+
177+
print(rows_cnt) # 200000
178+
179+
# 示例3:提前取消查询
180+
rows_cnt = 0
181+
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
182+
while True:
183+
chunk = stream_result.fetch()
184+
if chunk is None:
185+
break
186+
if rows_cnt > 0:
187+
stream_result.cancel()
188+
break
189+
rows_cnt += chunk.rows_read()
190+
191+
print(rows_cnt) # 65409
192+
193+
sess.close()
194+
```
195+
196+
参见: [test_streaming_query.py](tests/test_streaming_query.py)
197+
</details>
198+
150199
更多示例,请参见 [examples](examples)[tests](tests)
151200

152201
## 演示和示例

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,56 @@ see also: [test_udf.py](tests/test_udf.py).
223223
</details>
224224
225225
226+
<details>
227+
<summary><h4>🗂️ Streaming Query</h4></summary>
228+
229+
Process large datasets with constant memory usage through chunked streaming.
230+
231+
```python
232+
from chdb import session as chs
233+
234+
sess = chs.Session()
235+
236+
# Example 1: Basic example of using streaming query
237+
rows_cnt = 0
238+
with sess.send_query("SELECT * FROM numbers(200000)", "CSV") as stream_result:
239+
for chunk in stream_result:
240+
rows_cnt += chunk.rows_read()
241+
242+
print(rows_cnt) # 200000
243+
244+
# Example 2: Manual iteration with fetch()
245+
rows_cnt = 0
246+
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
247+
while True:
248+
chunk = stream_result.fetch()
249+
if chunk is None:
250+
break
251+
rows_cnt += chunk.rows_read()
252+
253+
print(rows_cnt) # 200000
254+
255+
# Example 3: Early cancellation demo
256+
rows_cnt = 0
257+
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
258+
while True:
259+
chunk = stream_result.fetch()
260+
if chunk is None:
261+
break
262+
if rows_cnt > 0:
263+
stream_result.cancel()
264+
break
265+
rows_cnt += chunk.rows_read()
266+
267+
print(rows_cnt) # 65409
268+
269+
sess.close()
270+
```
271+
272+
For more details, see [test_streaming_query.py](tests/test_streaming_query.py).
273+
</details>
274+
275+
226276
<details>
227277
<summary><h4>🗂️ Python Table Engine</h4></summary>
228278

chdb/session/state.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import chdb
66
from ..state import sqlitelike as chdb_stateful
7-
7+
from ..state.sqlitelike import StreamingResult
88

99
g_session = None
1010
g_session_path = None
@@ -120,3 +120,16 @@ def query(self, sql, fmt="CSV", udf_path=""):
120120

121121
# alias sql = query
122122
sql = query
123+
124+
def send_query(self, sql, fmt="CSV") -> StreamingResult:
125+
"""
126+
Execute a streaming query.
127+
"""
128+
if fmt == "Debug":
129+
warnings.warn(
130+
"""Debug format is not supported in Session.query
131+
Please try use parameters in connection string instead:
132+
Eg: conn = connect(f"db_path?verbose&log-level=test")"""
133+
)
134+
fmt = "CSV"
135+
return self._conn.send_query(sql, fmt)

chdb/state/sqlitelike.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,57 @@ def to_df(r):
4040
return t.to_pandas(use_threads=True)
4141

4242

43+
class StreamingResult:
44+
def __init__(self, c_result, conn, result_func):
45+
self._result = c_result
46+
self._result_func = result_func
47+
self._conn = conn
48+
self._exhausted = False
49+
50+
def fetch(self):
51+
"""Fetch next chunk of streaming results"""
52+
if self._exhausted:
53+
return None
54+
55+
try:
56+
result = self._conn.streaming_fetch_result(self._result)
57+
if result is None or result.rows_read() == 0:
58+
self._exhausted = True
59+
return None
60+
return self._result_func(result)
61+
except Exception as e:
62+
self._exhausted = True
63+
raise RuntimeError(f"Streaming query failed: {str(e)}") from e
64+
65+
def __iter__(self):
66+
return self
67+
68+
def __next__(self):
69+
if self._exhausted:
70+
raise StopIteration
71+
72+
chunk = self.fetch()
73+
if chunk is None:
74+
self._exhausted = True
75+
raise StopIteration
76+
77+
return chunk
78+
79+
def __enter__(self):
80+
return self
81+
82+
def __exit__(self, exc_type, exc_val, exc_tb):
83+
pass
84+
85+
def cancel(self):
86+
self._exhausted = True
87+
88+
try:
89+
self._conn.streaming_cancel_query(self._result)
90+
except Exception as e:
91+
raise RuntimeError(f"Failed to cancel streaming query: {str(e)}") from e
92+
93+
4394
class Connection:
4495
def __init__(self, connection_string: str):
4596
# print("Connection", connection_string)
@@ -59,6 +110,15 @@ def query(self, query: str, format: str = "CSV") -> Any:
59110
result = self._conn.query(query, format)
60111
return result_func(result)
61112

113+
def send_query(self, query: str, format: str = "CSV") -> StreamingResult:
114+
lower_output_format = format.lower()
115+
result_func = _process_result_format_funs.get(lower_output_format, lambda x: x)
116+
if lower_output_format in _arrow_format:
117+
format = "Arrow"
118+
119+
c_stream_result = self._conn.send_query(query, format)
120+
return StreamingResult(c_stream_result, self._conn, result_func)
121+
62122
def close(self) -> None:
63123
# print("close")
64124
if self._cursor:

programs/local/LocalChdb.cpp

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,62 @@ query_result * connection_wrapper::query(const std::string & query_str, const st
315315
}
316316
if (result->error_message)
317317
{
318-
throw std::runtime_error(result->error_message);
318+
std::string msg_copy(result->error_message);
319+
free_result_v2(result);
320+
throw std::runtime_error(msg_copy);
319321
}
320322
return new query_result(result, false);
321323
}
322324

325+
streaming_query_result * connection_wrapper::send_query(const std::string & query_str, const std::string & format)
326+
{
327+
global_query_obj = findQueryableObjFromQuery(query_str);
328+
329+
py::gil_scoped_release release;
330+
auto * result = query_conn_streaming(*conn, query_str.c_str(), format.c_str());
331+
const auto * error_msg = chdb_streaming_result_error(result);
332+
if (error_msg)
333+
{
334+
std::string msg_copy(error_msg);
335+
chdb_destroy_result(result);
336+
throw std::runtime_error(msg_copy);
337+
}
338+
339+
return new streaming_query_result(result);
340+
}
341+
342+
query_result * connection_wrapper::streaming_fetch_result(streaming_query_result * streaming_result)
343+
{
344+
py::gil_scoped_release release;
345+
346+
if (!streaming_result || !streaming_result->get_result())
347+
return nullptr;
348+
349+
auto * result = chdb_streaming_fetch_result(*conn, streaming_result->get_result());
350+
351+
if (result->len == 0)
352+
LOG_DEBUG(getLogger("CHDB"), "Empty result returned for streaming query");
353+
354+
if (result->error_message)
355+
{
356+
std::string msg_copy(result->error_message);
357+
free_result_v2(result);
358+
throw std::runtime_error(msg_copy);
359+
}
360+
361+
return new query_result(result, false);
362+
}
363+
364+
void connection_wrapper::streaming_cancel_query(streaming_query_result * streaming_result)
365+
{
366+
py::gil_scoped_release release;
367+
368+
if (!streaming_result || !streaming_result->get_result())
369+
return;
370+
371+
chdb_streaming_cancel_query(*conn, streaming_result->get_result());
372+
}
373+
323374
void cursor_wrapper::execute(const std::string & query_str)
324375
{
325376
release_result();
@@ -407,6 +458,11 @@ PYBIND11_MODULE(_chdb, m)
407458
.def("has_error", &query_result::has_error)
408459
.def("error_message", &query_result::error_message);
409460

461+
py::class_<streaming_query_result>(m, "streaming_query_result")
462+
.def(py::init<chdb_streaming_result *>(), py::return_value_policy::take_ownership)
463+
.def("has_error", &streaming_query_result::has_error)
464+
.def("error_message", &streaming_query_result::error_message);
465+
410466
py::class_<DB::PyReader, std::shared_ptr<DB::PyReader>>(m, "PyReader")
411467
.def(
412468
py::init<const py::object &>(),
@@ -460,7 +516,23 @@ PYBIND11_MODULE(_chdb, m)
460516
&connection_wrapper::query,
461517
py::arg("query_str"),
462518
py::arg("format") = "CSV",
463-
"Execute a query and return a query_result object");
519+
"Execute a query and return a query_result object")
520+
.def(
521+
"send_query",
522+
&connection_wrapper::send_query,
523+
py::arg("query_str"),
524+
py::arg("format") = "CSV",
525+
"Send a streaming query and return a streaming query result object")
526+
.def(
527+
"streaming_fetch_result",
528+
&connection_wrapper::streaming_fetch_result,
529+
py::arg("streaming_result"),
530+
"Fetches a data chunk from the streaming result. This function should be called repeatedly until the result is exhausted")
531+
.def(
532+
"streaming_cancel_query",
533+
&connection_wrapper::streaming_cancel_query,
534+
py::arg("streaming_result"),
535+
"Cancel a streaming query");
464536

465537
m.def(
466538
"query",

programs/local/LocalChdb.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class __attribute__((visibility("default"))) connection_wrapper;
2525
class __attribute__((visibility("default"))) cursor_wrapper;
2626
class __attribute__((visibility("default"))) memoryview_wrapper;
2727
class __attribute__((visibility("default"))) query_result;
28+
class __attribute__((visibility("default"))) streaming_query_result;
2829

2930
class connection_wrapper
3031
{
@@ -42,6 +43,9 @@ class connection_wrapper
4243
void commit();
4344
void close();
4445
query_result * query(const std::string & query_str, const std::string & format = "CSV");
46+
streaming_query_result * send_query(const std::string & query_str, const std::string & format = "CSV");
47+
query_result * streaming_fetch_result(streaming_query_result * streaming_result);
48+
void streaming_cancel_query(streaming_query_result * streaming_result);
4549

4650
// Move the private methods declarations here
4751
std::pair<std::string, std::map<std::string, std::string>> parse_connection_string(const std::string & conn_str);
@@ -169,6 +173,22 @@ class query_result
169173
memoryview_wrapper * get_memview();
170174
};
171175

176+
class streaming_query_result
177+
{
178+
private:
179+
chdb_streaming_result * result;
180+
181+
public:
182+
streaming_query_result(chdb_streaming_result * result_) : result(result_) {}
183+
~streaming_query_result()
184+
{
185+
chdb_destroy_result(result);
186+
}
187+
bool has_error() { return chdb_streaming_result_error(result) != nullptr; }
188+
py::str error_message() { return chdb_streaming_result_error(result); }
189+
chdb_streaming_result * get_result() { return result; }
190+
};
191+
172192
class memoryview_wrapper
173193
{
174194
private:

0 commit comments

Comments
 (0)