diff --git a/sparrow-py/pysrc/sparrow_py/_timestream.py b/sparrow-py/pysrc/sparrow_py/_timestream.py index 6d05fa708..1a74b02bc 100644 --- a/sparrow-py/pysrc/sparrow_py/_timestream.py +++ b/sparrow-py/pysrc/sparrow_py/_timestream.py @@ -670,8 +670,12 @@ def run( Result The `Result` object to use for accessing the results. """ + expr = self + if not pa.types.is_struct(self.data_type): + # The execution engine requires a struct, so wrap this in a record. + expr = record({"result": self}) options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size) - execution = self._ffi_expr.execute(options) + execution = expr._ffi_expr.execute(options) return Result(execution) diff --git a/sparrow-py/pysrc/sparrow_py/sources/arrow.py b/sparrow-py/pysrc/sparrow_py/sources/arrow.py index 2d54345d1..8fdc519f9 100644 --- a/sparrow-py/pysrc/sparrow_py/sources/arrow.py +++ b/sparrow-py/pysrc/sparrow_py/sources/arrow.py @@ -60,7 +60,8 @@ class CsvSource(ArrowSource): """Source reading data from CSV via Pandas.""" def __init__( - self, time_column_name: str, key_column_name: str, csv_string: str, **kwargs + self, time_column_name: str, key_column_name: str, csv_string: str, + **kwargs ) -> None: content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs) super().__init__(time_column_name, key_column_name, content) diff --git a/sparrow-py/pytests/golden/timestream_test/test_timestream_run_non_record.json b/sparrow-py/pytests/golden/timestream_test/test_timestream_run_non_record.json new file mode 100644 index 000000000..362793073 --- /dev/null +++ b/sparrow-py/pytests/golden/timestream_test/test_timestream_run_non_record.json @@ -0,0 +1,6 @@ +{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","result":5.0} +{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","result":24.0} +{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","result":17.0} +{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","result":null} +{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","result":12.0} +{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","result":null} diff --git a/sparrow-py/pytests/timestream_test.py b/sparrow-py/pytests/timestream_test.py index 8d989123b..98305a5d2 100644 --- a/sparrow-py/pytests/timestream_test.py +++ b/sparrow-py/pytests/timestream_test.py @@ -89,7 +89,7 @@ def test_timestream_arithmetic_types(source1) -> None: assert "Arg[1]: Timestream[int32]" in e.value.__notes__ -def test_timestream_preview(source1, golden) -> None: +def test_timestream_preview(golden) -> None: content = "\n".join( [ "time,key,m,n", @@ -104,3 +104,18 @@ def test_timestream_preview(source1, golden) -> None: source = kt.sources.CsvSource("time", "key", content) golden(source.preview(limit=4)) + +def test_timestream_run_non_record(golden) -> None: + content = "\n".join( + [ + "time,key,m,n", + "1996-12-19T16:39:57-08:00,A,5,10", + "1996-12-19T16:39:58-08:00,B,24,3", + "1996-12-19T16:39:59-08:00,A,17,6", + "1996-12-19T16:40:00-08:00,A,,9", + "1996-12-19T16:40:01-08:00,A,12,", + "1996-12-19T16:40:02-08:00,A,,", + ] + ) + source = kt.sources.CsvSource("time", "key", content) + golden(source["m"])