diff --git a/python/Cargo.lock b/python/Cargo.lock index 4ca113d45..fa67a0130 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -3695,6 +3695,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "sparrow-batch" +version = "0.11.0" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "arrow-select", + "derive_more", + "error-stack", + "itertools 0.11.0", + "static_init", +] + [[package]] name = "sparrow-compiler" version = "0.11.0" @@ -3724,6 +3738,7 @@ dependencies = [ "sparrow-core", "sparrow-instructions", "sparrow-merge", + "sparrow-sources", "sparrow-syntax", "static_init", "strum 0.25.0", @@ -3784,6 +3799,18 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-interfaces" +version = "0.11.0" +dependencies = [ + "arrow-schema", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", +] + [[package]] name = "sparrow-kernels" version = "0.11.0" @@ -3912,6 +3939,7 @@ dependencies = [ "sparrow-instructions", "sparrow-merge", "sparrow-runtime", + "sparrow-sources", "sparrow-syntax", "static_init", "tempfile", @@ -3920,6 +3948,24 @@ dependencies = [ "uuid 1.4.1", ] +[[package]] +name = "sparrow-sources" +version = "0.11.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-broadcast", + "async-stream", + "derive_more", + "error-stack", + "futures", + "sparrow-batch", + "sparrow-core", + "sparrow-interfaces", + "sparrow-merge", + "tracing", +] + [[package]] name = "sparrow-syntax" version = "0.11.0" diff --git a/python/pytests/execution_test.py b/python/pytests/execution_test.py index afa53e4de..a987c944e 100644 --- a/python/pytests/execution_test.py +++ b/python/pytests/execution_test.py @@ -34,7 +34,7 @@ async def test_iter_pandas(golden, source_int64) -> None: next(batches) -async def test_iter_rows(golden, source_int64) -> None: +async def test_iter_rows(source_int64) -> None: results: Iterator[dict] = source_int64.run_iter("row", row_limit=2) assert next(results)["m"] == 5 assert next(results)["m"] == 24 @@ -42,6 +42,19 @@ async def test_iter_rows(golden, source_int64) -> None: next(results) +@kd.udf("add_one(x: N) -> N") +def add_one(x: pd.Series) -> pd.Series: + """Use Pandas to one""" + print("In UDF") + return x + 1 + + +async def test_iter_udf(source_int64) -> None: + results = source_int64.col("m").pipe(add_one).run_iter("row", row_limit=2) + assert next(results)["result"] == 6 + assert next(results)["result"] == 25 + + async def test_iter_pandas_async(golden, source_int64) -> None: batches: AsyncIterator[pd.DataFrame] = source_int64.run_iter( row_limit=4, max_batch_size=2 diff --git a/python/src/execution.rs b/python/src/execution.rs index 5f38063be..c042c10c2 100644 --- a/python/src/execution.rs +++ b/python/src/execution.rs @@ -73,7 +73,11 @@ impl Execution { fn next_pyarrow(&mut self, py: Python<'_>) -> Result> { let mut execution = self.execution()?; - let batch = execution.as_mut().unwrap().next_blocking()?; + + // Explicitly allow threads to take the gil during execution, so + // the udf evaluator can acquire it to execute python. + let batch = py.allow_threads(move || execution.as_mut().unwrap().next_blocking())?; + let result = match batch { Some(batch) => Some(batch.to_pyarrow(py)?), None => None,