diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index 8946b572e..401bcf0ea 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -584,16 +584,16 @@ def select(self, *args: str) -> Timestream: """ return Timestream._call("select_fields", self, *args) - def substring(self, start: Optional[int], end: Optional[int] = None) -> Timestream: + def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream: """Return a Timestream with a substring between the start and end indices. Args: start: The inclusive index to start at. `None` indicates the beginning of the string. Negative indices count backwards from the end of the string. - end: (optional) The exclusive index to end at. `None` indicates the - length of the string. Negative indices count backwards from the - end of the string. + end: The exclusive index to end at. `None` indicates the length of + the string. Negative indices count backwards from the end of + the string. Notes: Returns the substring starting at `start` (inclusive) up to but not @@ -602,7 +602,7 @@ def substring(self, start: Optional[int], end: Optional[int] = None) -> Timestre If the input is `null`, returns `null`. If `end` > `start` an empty string is returned. """ - return Timestream._call("substring", self) + return Timestream._call("substring", self, start, end) def remove(self, *args: str) -> Timestream: """Return a Timestream removing the given fields from `self`. diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 2480e63f6..b2e61b9ee 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -246,6 +246,10 @@ def __init__( strings_can_be_null=True, ) + _parse_options = pyarrow.csv.ParseOptions( + escape_char="\\", + ) + @staticmethod async def create( csv_string: Optional[str | BytesIO] = None, @@ -277,7 +281,7 @@ async def create( if schema is None: if csv_string is None: raise ValueError("Must provide schema or csv_string") - schema = pa.csv.read_csv(csv_string).schema + schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema csv_string.seek(0) source = CsvString( @@ -297,7 +301,7 @@ async def add_string(self, csv_string: str | BytesIO) -> None: """Add data to the source.""" if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) - content = pa.csv.read_csv(csv_string, convert_options=self._convert_options) + content = pa.csv.read_csv(csv_string, convert_options=self._convert_options, parse_options=CsvString._parse_options) for batch in content.to_batches(): await self._ffi_table.add_pyarrow(batch) diff --git a/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl new file mode 100644 index 000000000..0cc1ecab4 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_boolean.jsonl @@ -0,0 +1,7 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","a":true,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","a":null,"is_valid":false} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","a":true,"is_valid":true} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":false,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","a":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl new file mode 100644 index 000000000..2de3c7561 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_f64.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.2,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.3,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.6,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.4,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl new file mode 100644 index 000000000..e3795c3bf --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_i64.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":24.0,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":17.0,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":12.0,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":null,"is_valid":false} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl new file mode 100644 index 000000000..7c43cba6a --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_record.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","is_valid":{"time":"1996-12-19T16:39:57.000000000","key":"A","n":2.0,"m":4.0,"other_time":"2003-12-19T16:39:57.000000000","fruit":"pear"}} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","is_valid":{"time":"1996-12-19T16:39:58.000000000","key":"B","n":4.0,"m":3.0,"other_time":"1994-11-19T16:39:57.000000000","fruit":"watermelon"}} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","is_valid":{"time":"1996-12-19T16:39:59.000000000","key":"B","n":5.0,"m":null,"other_time":"1998-12-19T16:39:57.000000000","fruit":"mango"}} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","is_valid":{"time":"1996-12-19T16:40:00.000000000","key":"B","n":null,"m":null,"other_time":"1992-12-19T16:39:57.000000000","fruit":null}} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","is_valid":{"time":"1996-12-19T16:40:01.000000000","key":"B","n":8.0,"m":8.0,"other_time":null,"fruit":null}} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","is_valid":{"time":"1996-12-19T16:40:02.000000000","key":"B","n":23.0,"m":11.0,"other_time":"1994-12-19T16:39:57.000000000","fruit":"mango"}} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl new file mode 100644 index 000000000..2fb59057c --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_string.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","s":"hEllo","is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","s":"World","is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","s":"hello world","is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","s":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","s":null,"is_valid":false} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","s":"goodbye","is_valid":true} diff --git a/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl b/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl new file mode 100644 index 000000000..177dd4664 --- /dev/null +++ b/python/pytests/golden/is_valid_test/test_is_valid_timestamp_ns.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","n":2.0,"is_valid":true} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","n":4.0,"is_valid":true} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","n":5.0,"is_valid":true} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","n":null,"is_valid":false} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","n":8.0,"is_valid":true} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","n":23.0,"is_valid":true} diff --git a/python/pytests/golden/len_test/test_len.jsonl b/python/pytests/golden/len_test/test_len.jsonl new file mode 100644 index 000000000..e6c369fc8 --- /dev/null +++ b/python/pytests/golden/len_test/test_len.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","len":5.0} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","len":5.0} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","len":11.0} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","len":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","len":7.0} diff --git a/python/pytests/golden/substring_test/test_substring.jsonl b/python/pytests/golden/substring_test/test_substring.jsonl new file mode 100644 index 000000000..e9d5ee346 --- /dev/null +++ b/python/pytests/golden/substring_test/test_substring.jsonl @@ -0,0 +1,6 @@ +{"_time":"1996-12-19T16:39:57.000000000","_key":"A","substring_0_2":"hE","substring_1":"Ello","substring_0_i":"","substring_i":"hEllo"} +{"_time":"1996-12-19T16:39:58.000000000","_key":"B","substring_0_2":"Wo","substring_1":"orld","substring_0_i":"World","substring_i":""} +{"_time":"1996-12-19T16:39:59.000000000","_key":"B","substring_0_2":"he","substring_1":"ello world","substring_0_i":"hello wor","substring_i":"ld"} +{"_time":"1996-12-19T16:40:00.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:01.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null} +{"_time":"1996-12-19T16:40:02.000000000","_key":"B","substring_0_2":"go","substring_1":"oodbye","substring_0_i":"goodbye","substring_i":"goodbye"} diff --git a/python/pytests/is_valid_test.py b/python/pytests/is_valid_test.py new file mode 100644 index 000000000..a1da63a13 --- /dev/null +++ b/python/pytests/is_valid_test.py @@ -0,0 +1,152 @@ +import kaskada as kd +import pytest + +@pytest.fixture(scope="module") +async def boolean_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,a,b", + '1996-12-19T16:39:57,A,true,true', + '1996-12-19T16:39:58,B,false,false', + '1996-12-19T16:39:59,B,,true', + '1996-12-19T16:40:00,B,true,false', + '1996-12-19T16:40:01,B,false,true', + '1996-12-19T16:40:02,B,false,', + '1996-12-19T16:40:02,B,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def f64_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,m,n", + '1996-12-19T16:39:57,A,5.2,10', + '1996-12-19T16:39:58,B,24.3,3.9', + '1996-12-19T16:39:59,A,17.6,6.2', + '1996-12-19T16:40:00,A,,9.25', + '1996-12-19T16:40:01,A,12.4,', + '1996-12-19T16:40:02,A,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def i64_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,m,n", + '1996-12-19T16:39:57,A,5,10', + '1996-12-19T16:39:58,B,24,3', + '1996-12-19T16:39:59,A,17,6', + '1996-12-19T16:40:00,A,,9', + '1996-12-19T16:40:01,A,12,', + '1996-12-19T16:40:02,A,,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def string_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def timestamp_ns_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,n,m,other_time,fruit", + '1996-12-19T16:39:57,A,2,4,2003-12-19T16:39:57,pear', + '1996-12-19T16:39:58,B,4,3,1994-11-19T16:39:57,watermelon', + '1996-12-19T16:39:59,B,5,,1998-12-19T16:39:57,mango', + '1996-12-19T16:40:00,B,,,1992-12-19T16:39:57,', + '1996-12-19T16:40:01,B,8,8,,', + '1996-12-19T16:40:02,B,23,11,1994-12-19T16:39:57,mango', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +async def test_is_valid_boolean(boolean_source, golden) -> None: + a = boolean_source.col("a") + golden.jsonl( + kd.record( + { + "a": a, + "is_valid": a.is_valid(), + } + ) + ) + +async def test_is_valid_f64(f64_source, golden) -> None: + m = f64_source.col("m") + golden.jsonl( + kd.record( + { + "m": m, + "is_valid": m.is_valid(), + } + ) + ) + +async def test_is_valid_i64(i64_source, golden) -> None: + m = i64_source.col("m") + golden.jsonl( + kd.record( + { + "m": m, + "is_valid": m.is_valid(), + } + ) + ) + +async def test_is_valid_string(string_source, golden) -> None: + s = string_source.col("s") + golden.jsonl( + kd.record( + { + "s": s, + "is_valid": s.is_valid(), + } + ) + ) + +async def test_is_valid_timestamp_ns(timestamp_ns_source, golden) -> None: + n = timestamp_ns_source.col("n") + golden.jsonl( + kd.record( + { + "n": n, + "is_valid": n.is_valid(), + } + ) + ) + +async def test_is_valid_record(timestamp_ns_source, golden) -> None: + golden.jsonl( + kd.record( + { + "is_valid": timestamp_ns_source, + } + ) + ) diff --git a/python/pytests/json_test.py b/python/pytests/json_test.py new file mode 100644 index 000000000..7c75f2f2b --- /dev/null +++ b/python/pytests/json_test.py @@ -0,0 +1,148 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,json_str", + '1996-12-19T16:39:57,A,"{\"a\": 10\\, \"b\": \"dog\"}"', + '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b\": \"lizard\"}"', + '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', + '1996-12-19T16:40:00,B,"{\"a\": 34}"', + '1996-12-19T16:40:01,B,"{\"a\": 34}"', + '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +@pytest.fixture(scope="module") +async def invalid_source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,json_str", + '1996-12-19T16:39:57,A,"{a: 10\\, \"b\": \"dog\"}"', + '1996-12-19T16:39:58,B,"{\"a\": 4\\, \"b: lizard\"}"', + '1996-12-19T16:39:59,B,"{\"a\": 1\\, \"c\": 3.3}"', + '1996-12-19T16:40:00,B,"{\"a\": 12\\, \"b\": \"cat\"}"', + '1996-12-19T16:40:01,B,"{\"a\"\\, 34}"', + '1996-12-19T16:40:02,B,"{\"a\": 6\\, \"b\": \"dog\"}"', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + +# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" +async def test_json_parse_field(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a_test": j.col("a"), + "b_test": json_str.json().b, + } + ) + ) +""" +# "let json = json(Json.json) in { string: json.b, len: len(json.b) }" +async def test_json_string_field_usable_in_string_functions(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "string": j.col("b"), + "len": j.col("b").len(), + } + ) + ) + +# "let json = json(Json.json) in { num_as_str: json.a as string, len: len(json.a as string) }" +async def test_json_field_number_as_string(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "num_as_str": j.col("a").cast("str"), + "len": j.col("a").cast("str").len(), + } + ) + ) + +# "let json = json(Json.json) in { a: json.a, plus_one: (json.a as i64) + 1 }" +async def test_json_field_as_number_with_addition(source, golden) -> None: + json_str = source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a": j.col("a"), + "plus_one": j.col("a").cast("i64") + 1, + } + ) + ) + +# "let json = json(Json.json) in { a_test: json.a as i64, b_test: json(Json.json).b }" +# +# I guess this behavior is somewhat strange, in that creating a record with all +# nulls produces nothing, while one non-null field in a record causes us to +# print "null" in other fields. +async def test_incorrect_json_format_produces_null(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a_test": j.col("a").cast("i64"), + "b_test": json_str.json().b, + } + ) + ) + +# "let json = json(Json.json) in { a: json(json) }" +async def test_json_of_json_object_errors(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "a": j.json(), + } + ) + ) + +# "{ out: json(Json.json).a.b }" +# +# There's a way we can probably produce a better error message, +# but, on the other hand, it's marked as an experimental feature, plus +# this returns an error rather than incorrect results :shrug: +# +# The `dfg` would need to check if it recursively encounters the pattern +# `(field_ref (json ?value ?op) ?field ?op)` +async def test_nested_json_produces_error(invalid_source, golden) -> None: + json_str = invalid_source.col("json_str") + j = json_str.json() + golden.jsonl( + kd.record( + { + "out": j.json().col("a").col("b"), + } + ) + ) + +# \"{ out: json(Json.json) }" +async def test_json_as_output_field_produces_error(invalid_source, golden) -> None: + golden.jsonl( + kd.record( + { + "out": invalid_source.col("json_str").json(), + } + ) + ) + """ \ No newline at end of file diff --git a/python/pytests/len_test.py b/python/pytests/len_test.py new file mode 100644 index 000000000..4bdcf5257 --- /dev/null +++ b/python/pytests/len_test.py @@ -0,0 +1,31 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_len(source, golden) -> None: + s = source.col("s") + golden.jsonl( + kd.record( + { + "len": s.len() + } + ) + ) diff --git a/python/pytests/substring_test.py b/python/pytests/substring_test.py new file mode 100644 index 000000000..9a171c46c --- /dev/null +++ b/python/pytests/substring_test.py @@ -0,0 +1,35 @@ +import kaskada as kd +import pytest + + +@pytest.fixture(scope="module") +async def source() -> kd.sources.CsvString: + content = "\n".join( + [ + "time,key,s,n,t", + '1996-12-19T16:39:57,A,"hEllo",0,"hEllo"', + '1996-12-19T16:39:58,B,"World",5,"world"', + '1996-12-19T16:39:59,B,"hello world",-2,"hello world"', + '1996-12-19T16:40:00,B,,-2,"greetings"', + '1996-12-19T16:40:01,B,,2,"salutations"', + '1996-12-19T16:40:02,B,"goodbye",,', + ] + ) + return await kd.sources.CsvString.create( + content, time_column="time", key_column="key" + ) + + +async def test_substring(source, golden) -> None: + s = source.col("s") + n = source.col("n") + golden.jsonl( + kd.record( + { + "substring_0_2": s.substring(start=0, end=2), + "substring_1": s.substring(start=1), + "substring_0_i": s.substring(end=n), + "substring_i": s.substring(start=n), + } + ) + )