Skip to content

Commit

Permalink
feat: Allow running non-record columns (#604)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjchambers authored Aug 7, 2023
1 parent f427918 commit 98dec21
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 3 deletions.
6 changes: 5 additions & 1 deletion sparrow-py/pysrc/sparrow_py/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,12 @@ def run(
Result
The `Result` object to use for accessing the results.
"""
expr = self
if not pa.types.is_struct(self.data_type):
# The execution engine requires a struct, so wrap this in a record.
expr = record({"result": self})
options = ExecutionOptions(row_limit=row_limit, max_batch_size=max_batch_size)
execution = self._ffi_expr.execute(options)
execution = expr._ffi_expr.execute(options)
return Result(execution)


Expand Down
3 changes: 2 additions & 1 deletion sparrow-py/pysrc/sparrow_py/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class CsvSource(ArrowSource):
"""Source reading data from CSV via Pandas."""

def __init__(
self, time_column_name: str, key_column_name: str, csv_string: str, **kwargs
self, time_column_name: str, key_column_name: str, csv_string: str,
**kwargs
) -> None:
content = pd.read_csv(StringIO(csv_string), dtype_backend="pyarrow", **kwargs)
super().__init__(time_column_name, key_column_name, content)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":851042397000000000,"_subsort":0,"_key_hash":12960666915911099378,"_key":"A","result":5.0}
{"_time":851042398000000000,"_subsort":1,"_key_hash":2867199309159137213,"_key":"B","result":24.0}
{"_time":851042399000000000,"_subsort":2,"_key_hash":12960666915911099378,"_key":"A","result":17.0}
{"_time":851042400000000000,"_subsort":3,"_key_hash":12960666915911099378,"_key":"A","result":null}
{"_time":851042401000000000,"_subsort":4,"_key_hash":12960666915911099378,"_key":"A","result":12.0}
{"_time":851042402000000000,"_subsort":5,"_key_hash":12960666915911099378,"_key":"A","result":null}
17 changes: 16 additions & 1 deletion sparrow-py/pytests/timestream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_timestream_arithmetic_types(source1) -> None:
assert "Arg[1]: Timestream[int32]" in e.value.__notes__


def test_timestream_preview(source1, golden) -> None:
def test_timestream_preview(golden) -> None:
content = "\n".join(
[
"time,key,m,n",
Expand All @@ -104,3 +104,18 @@ def test_timestream_preview(source1, golden) -> None:
source = kt.sources.CsvSource("time", "key", content)

golden(source.preview(limit=4))

def test_timestream_run_non_record(golden) -> None:
content = "\n".join(
[
"time,key,m,n",
"1996-12-19T16:39:57-08:00,A,5,10",
"1996-12-19T16:39:58-08:00,B,24,3",
"1996-12-19T16:39:59-08:00,A,17,6",
"1996-12-19T16:40:00-08:00,A,,9",
"1996-12-19T16:40:01-08:00,A,12,",
"1996-12-19T16:40:02-08:00,A,,",
]
)
source = kt.sources.CsvSource("time", "key", content)
golden(source["m"])

0 comments on commit 98dec21

Please sign in to comment.