Skip to content

Commit

Permalink
added substring, json, len, is_valid methods (#767)
Browse files Browse the repository at this point in the history
this closes #722
  • Loading branch information
epinzur authored Sep 22, 2023
1 parent fdcdeb7 commit dcc00ea
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 2 deletions.
2 changes: 2 additions & 0 deletions python/docs/source/reference/timestream/string.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
.. autosummary::
:toctree: ../apidocs/
Timestream.len
Timestream.lower
Timestream.substring
Timestream.upper
```
28 changes: 28 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ def hash(self) -> Timestream:
"""
return Timestream._call("hash", self)

def len(self) -> Timestream:
"""Return a Timestream with the length of input string.
Notes:
Returns `0` for an empty string and `null` for `null`
"""
return Timestream._call("len", self)

def lower(self) -> Timestream:
"""Return a Timestream with all values converted to lower case."""
return Timestream._call("lower", self)
Expand Down Expand Up @@ -557,6 +565,26 @@ def select(self, *args: str) -> Timestream:
"""
return Timestream._call("select_fields", self, *args)

def substring(self, start: Optional[int] = None, end: Optional[int] = None) -> Timestream:
"""Return a Timestream with a substring between the start and end indices.
Args:
start: The inclusive index to start at. `None` indicates the beginning
of the string. Negative indices count backwards from the end of
the string.
end: The exclusive index to end at. `None` indicates the length of
the string. Negative indices count backwards from the end of
the string.
Notes:
Returns the substring starting at `start` (inclusive) up to but not
including the `end`.
If the input is `null`, returns `null`. If `end` > `start` an empty
string is returned.
"""
return Timestream._call("substring", self, start, end)

def remove(self, *args: str) -> Timestream:
"""Return a Timestream removing the given fields from `self`.
Expand Down
12 changes: 10 additions & 2 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ def __init__(
strings_can_be_null=True,
)

_parse_options = pyarrow.csv.ParseOptions(
escape_char="\\",
)

@staticmethod
async def create(
csv_string: Optional[str | BytesIO] = None,
Expand Down Expand Up @@ -277,7 +281,7 @@ async def create(
if schema is None:
if csv_string is None:
raise ValueError("Must provide schema or csv_string")
schema = pa.csv.read_csv(csv_string).schema
schema = pa.csv.read_csv(csv_string, parse_options=CsvString._parse_options).schema
csv_string.seek(0)

source = CsvString(
Expand All @@ -297,7 +301,11 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
"""Add data to the source."""
if isinstance(csv_string, str):
csv_string = BytesIO(csv_string.encode("utf-8"))
content = pa.csv.read_csv(csv_string, convert_options=self._convert_options)
content = pa.csv.read_csv(
csv_string,
convert_options=self._convert_options,
parse_options=CsvString._parse_options
)
for batch in content.to_batches():
await self._ffi_table.add_pyarrow(batch)

Expand Down
6 changes: 6 additions & 0 deletions python/pytests/golden/len_test/test_len.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","len":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","len":5.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"B","len":11.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"B","len":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"B","len":null}
{"_time":"1996-12-19T16:40:02.000000000","_key":"B","len":7.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/substring_test/test_substring.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","substring_0_2":"hE","substring_1":"Ello","substring_0_i":"","substring_i":"hEllo"}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","substring_0_2":"Wo","substring_1":"orld","substring_0_i":"World","substring_i":""}
{"_time":"1996-12-19T16:39:59.000000000","_key":"B","substring_0_2":"he","substring_1":"ello world","substring_0_i":"hello wor","substring_i":"ld"}
{"_time":"1996-12-19T16:40:00.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"B","substring_0_2":null,"substring_1":null,"substring_0_i":null,"substring_i":null}
{"_time":"1996-12-19T16:40:02.000000000","_key":"B","substring_0_2":"go","substring_1":"oodbye","substring_0_i":"goodbye","substring_i":"goodbye"}
31 changes: 31 additions & 0 deletions python/pytests/len_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import kaskada as kd
import pytest


@pytest.fixture(scope="module")
async def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,s,n,t",
'1996-12-19T16:39:57,A,"hEllo",0,"hEllo"',
'1996-12-19T16:39:58,B,"World",5,"world"',
'1996-12-19T16:39:59,B,"hello world",-2,"hello world"',
'1996-12-19T16:40:00,B,,-2,"greetings"',
'1996-12-19T16:40:01,B,,2,"salutations"',
'1996-12-19T16:40:02,B,"goodbye",,',
]
)
return await kd.sources.CsvString.create(
content, time_column="time", key_column="key"
)


async def test_len(source, golden) -> None:
s = source.col("s")
golden.jsonl(
kd.record(
{
"len": s.len()
}
)
)
35 changes: 35 additions & 0 deletions python/pytests/substring_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import kaskada as kd
import pytest


@pytest.fixture(scope="module")
async def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,s,n,t",
'1996-12-19T16:39:57,A,"hEllo",0,"hEllo"',
'1996-12-19T16:39:58,B,"World",5,"world"',
'1996-12-19T16:39:59,B,"hello world",-2,"hello world"',
'1996-12-19T16:40:00,B,,-2,"greetings"',
'1996-12-19T16:40:01,B,,2,"salutations"',
'1996-12-19T16:40:02,B,"goodbye",,',
]
)
return await kd.sources.CsvString.create(
content, time_column="time", key_column="key"
)


async def test_substring(source, golden) -> None:
s = source.col("s")
n = source.col("n")
golden.jsonl(
kd.record(
{
"substring_0_2": s.substring(start=0, end=2),
"substring_1": s.substring(start=1),
"substring_0_i": s.substring(end=n),
"substring_i": s.substring(start=n),
}
)
)

0 comments on commit dcc00ea

Please sign in to comment.