From 05afc2b2a60b4cdcbf36871e1bca7dfb53e4bd2b Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Mon, 16 Oct 2023 14:22:45 -0700 Subject: [PATCH 1/3] fix: release the GIL during sync iteration (#810) --- python/Cargo.lock | 46 ++++++++++++++++++++++++++++++++ python/pytests/execution_test.py | 15 ++++++++++- python/src/execution.rs | 6 ++++- 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/python/Cargo.lock b/python/Cargo.lock index 4ca113d45..fa67a0130 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3695,6 +3695,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "sparrow-batch" +version = "0.11.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "arrow-select", + "derive_more", + "error-stack", + "itertools 0.11.0", + "static_init", +] + [[package]] name = "sparrow-compiler" version = "0.11.0" @@ -3724,6 +3738,7 @@ dependencies = [ "sparrow-core", "sparrow-instructions", "sparrow-merge", + "sparrow-sources", "sparrow-syntax", "static_init", "strum 0.25.0", @@ -3784,6 +3799,18 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-interfaces" +version = "0.11.0" +dependencies = [ + "arrow-schema", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", +] + [[package]] name = "sparrow-kernels" version = "0.11.0" @@ -3912,6 +3939,7 @@ dependencies = [ "sparrow-instructions", "sparrow-merge", "sparrow-runtime", + "sparrow-sources", "sparrow-syntax", "static_init", "tempfile", @@ -3920,6 +3948,24 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-sources" +version = "0.11.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-broadcast", + "async-stream", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", + "sparrow-interfaces", + "sparrow-merge", + "tracing", +] + [[package]] name = "sparrow-syntax" version = "0.11.0" diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index afa53e4de..a987c944e 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -34,7 +34,7 @@ async def test_iter_pandas(golden, source_int64) -> None: next(batches) -async def test_iter_rows(golden, source_int64) -> None: +async def test_iter_rows(source_int64) -> None: results: Iterator[dict] = source_int64.run_iter("row", row_limit=2) assert next(results)["m"] == 5 assert next(results)["m"] == 24 @@ -42,6 +42,19 @@ async def test_iter_rows(golden, source_int64) -> None: next(results) +@kd.udf("add_one(x: N) -> N") +def add_one(x: pd.Series) -> pd.Series: + """Use Pandas to one""" + print("In UDF") + return x + 1 + + +async def test_iter_udf(source_int64) -> None: + results = source_int64.col("m").pipe(add_one).run_iter("row", row_limit=2) + assert next(results)["result"] == 6 + assert next(results)["result"] == 25 + + async def test_iter_pandas_async(golden, source_int64) -> None: batches: AsyncIterator[pd.DataFrame] = source_int64.run_iter( row_limit=4, max_batch_size=2 diff --git a/python/src/execution.rs b/python/src/execution.rs index 5f38063be..c042c10c2 100644 --- a/python/src/execution.rs +++ b/python/src/execution.rs @@ -73,7 +73,11 @@ impl Execution { fn next_pyarrow(&mut self, py: Python<'_>) -> Result> { let mut execution = self.execution()?; - let batch = execution.as_mut().unwrap().next_blocking()?; + + // Explicitly allow threads to take the gil during execution, so + // the udf evaluator can acquire it to execute python. + let batch = py.allow_threads(move || execution.as_mut().unwrap().next_blocking())?; + let result = match batch { Some(batch) => Some(batch.to_pyarrow(py)?), None => None, From 8f598baeada5a8239000a046430cd2d4ef807ff1 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Tue, 17 Oct 2023 10:27:20 -0700 Subject: [PATCH 2/3] Update README.md (#811) --- README.md | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index bedf02e94..ad4f38233 100644 --- a/README.md +++ b/README.md @@ -2,18 +2,12 @@ # Kaskada: Modern, open-source event-processing

- - Protobuf CI - - - Engine CI + + Python CI Rust CI (Nightly) - - Python Client CI - Notebooks CI @@ -57,4 +51,4 @@ Most development discussions take place on GitHub in this repo. ## Contributing All contributions -- issues, fixes, documentation improvements, features and ideas -- are welcome. -See [CONTRIBUTING.md](CONTRIBUTING.md) for more details. \ No newline at end of file +See [CONTRIBUTING.md](CONTRIBUTING.md) for more details. From c09b256968447060dd14c2eb777ea11883dff072 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Tue, 17 Oct 2023 10:28:04 -0700 Subject: [PATCH 3/3] Update execution_test.py (#812) --- python/pytests/execution_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index a987c944e..27a3fb768 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -44,8 +44,7 @@ async def test_iter_rows(source_int64) -> None: @kd.udf("add_one(x: N) -> N") def add_one(x: pd.Series) -> pd.Series: - """Use Pandas to one""" - print("In UDF") + """Use Pandas to add one.""" return x + 1