Skip to content

Commit

Permalink
added new jsonl-file source (#773)
Browse files Browse the repository at this point in the history
In my notebooks, I'm tired of loading `jsonl` files into arrays before
inputting them into kaskada. So I made a new source to handle `jsonl`
files directly.
  • Loading branch information
epinzur authored Sep 26, 2023
1 parent ac280b5 commit d5c8856
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 8 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
Source
CsvString
JsonlFile
JsonlString
Pandas
Parquet
Expand Down
4 changes: 2 additions & 2 deletions python/pysrc/kaskada/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Sources of data for Kaskada queries."""
from .arrow import CsvString, JsonlString, Pandas, Parquet, PyDict
from .arrow import CsvString, JsonlFile, JsonlString, Pandas, Parquet, PyDict
from .source import Source


__all__ = ["Source", "CsvString", "Pandas", "JsonlString", "PyDict", "Parquet"]
__all__ = ["Source", "CsvString", "Pandas", "JsonlFile", "JsonlString", "PyDict", "Parquet"]
104 changes: 98 additions & 6 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Provide sources based on PyArrow, including Pandas and CSV."""
from __future__ import annotations

import os
from io import BytesIO
from typing import Optional

Expand Down Expand Up @@ -310,6 +309,100 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
await self._ffi_table.add_pyarrow(batch)


class JsonlFile(Source):
"""Source reading data from line-delimited JSON files using PyArrow."""

def __init__(
self,
*,
time_column: str,
key_column: str,
schema: pa.Schema,
subsort_column: Optional[str] = None,
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
"""Create a line-delimited JSON File Source.
Args:
time_column: The name of the column containing the time.
key_column: The name of the column containing the key.
schema: The schema to use.
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""
super().__init__(
schema=schema,
time_column=time_column,
key_column=key_column,
subsort_column=subsort_column,
grouping_name=grouping_name,
time_unit=time_unit,
)
self._parse_options = pyarrow.json.ParseOptions(explicit_schema=schema)

@staticmethod
async def create(
path: Optional[str] = None,
*,
time_column: str,
key_column: str,
subsort_column: Optional[str] = None,
schema: Optional[pa.Schema] = None,
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> JsonlFile:
"""Create a source reading a line-delimited JSON file.
Args:
path: The path to the line-delimited JSON file to add. This can be relative to
the current working directory or an absolute path (prefixed by '/').
time_column: The name of the column containing the time.
key_column: The name of the column containing the key.
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""
path = Source._get_absolute_path(path)

if schema is None:
if path is None:
raise ValueError("Must provide schema or path to jsonl file")
schema = pa.json.read_json(path).schema

source = JsonlFile(
time_column=time_column,
key_column=key_column,
subsort_column=subsort_column,
schema=schema,
grouping_name=grouping_name,
time_unit=time_unit,
)

if path:
await source.add_file(path)
return source

async def add_file(self, path: str) -> None:
"""Add data to the source."""
batches = pa.json.read_json(
Source._get_absolute_path(path),
parse_options=self._parse_options
)
for batch in batches.to_batches():
await self._ffi_table.add_pyarrow(batch)


class JsonlString(Source):
"""Source reading data from line-delimited JSON strings using PyArrow."""

Expand Down Expand Up @@ -467,16 +560,18 @@ async def create(
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""
path = Source._get_absolute_path(path)

if schema is None:
if path is None:
raise ValueError("Must provide schema or path to parquet file")
schema = pa.parquet.read_schema(path)

source = Parquet(
schema=schema,
time_column=time_column,
key_column=key_column,
subsort_column=subsort_column,
schema=schema,
grouping_name=grouping_name,
time_unit=time_unit,
)
Expand All @@ -487,7 +582,4 @@ async def create(

async def add_file(self, path: str) -> None:
"""Add data to the source."""
if not path.startswith("/"):
path = os.getcwd() + "/" + path

await self._ffi_table.add_parquet(path)
await self._ffi_table.add_parquet(str(Source._get_absolute_path(path)))
7 changes: 7 additions & 0 deletions python/pysrc/kaskada/sources/source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Provide the base-class for Kaskada sources."""
from typing import Literal, Optional
import os

import kaskada._ffi as _ffi
import pyarrow as pa
Expand Down Expand Up @@ -89,3 +90,9 @@ def _validate_column(field_name: Optional[str], schema: pa.Schema) -> None:
raise KeyError(f"Column {field_name!r} does not exist")
if field.nullable:
raise ValueError(f"Column: {field_name!r} must be non-nullable")

@staticmethod
def _get_absolute_path(path: Optional[str]) -> Optional[str]:
if path is None or path.startswith("/"):
return path
return os.getcwd() + "/" + path
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
{"_time":"1970-01-01T00:26:18.268800000","_key":"patrick","id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0}
{"_time":"1970-01-01T00:26:18.268800000","_key":"spongebob","id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1}
{"_time":"1970-01-01T00:26:18.355200000","_key":"spongebob","id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2}
{"_time":"1970-01-01T00:26:18.441600000","_key":"karen","id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.441600000","_key":"patrick","id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"_time":"1970-01-01T00:26:17.836800000","_key":"karen","id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"_time":"1970-01-01T00:26:17.836800000","_key":"patrick","id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"_time":"1970-01-01T00:26:17.923200000","_key":"karen","id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"_time":"1970-01-01T00:26:17.923200000","_key":"patrick","id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.009600000","_key":"karen","id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"_time":"1970-01-01T00:26:18.009600000","_key":"patrick","id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"_time":"1970-01-01T00:26:18.096000000","_key":"patrick","id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"_time":"1970-01-01T00:26:18.096000000","_key":"karen","id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"_time":"1970-01-01T00:26:18.182400000","_key":"karen","id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"_time":"1970-01-01T00:26:18.182400000","_key":"patrick","id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
{"_time":"1970-01-01T00:26:18.268800000","_key":"patrick","id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0}
{"_time":"1970-01-01T00:26:18.268800000","_key":"spongebob","id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1}
{"_time":"1970-01-01T00:26:18.355200000","_key":"spongebob","id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2}
{"_time":"1970-01-01T00:26:18.441600000","_key":"karen","id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3}
{"_time":"1970-01-01T00:26:18.441600000","_key":"patrick","id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4}
26 changes: 26 additions & 0 deletions python/pytests/jsonl_file_source_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import kaskada as kd


async def test_read_jsonl_file(golden) -> None:
source = await kd.sources.JsonlFile.create(
"../testdata/purchases/purchases_part1.jsonl",
time_column="purchase_time",
key_column="customer_id",
)
golden.jsonl(source)

await source.add_file("../testdata/purchases/purchases_part2.jsonl")
golden.jsonl(source)


async def test_read_jsonl_file_with_subsort(golden) -> None:
source = await kd.sources.JsonlFile.create(
"../testdata/purchases/purchases_part1.jsonl",
time_column="purchase_time",
key_column="customer_id",
subsort_column="subsort_id",
)
golden.jsonl(source)

await source.add_file("../testdata/purchases/purchases_part2.jsonl")
golden.jsonl(source)
10 changes: 10 additions & 0 deletions testdata/purchases/purchases_part1.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"id":"cb_001","purchase_time":1577836800000,"customer_id":"karen","vendor_id":"chum_bucket","amount":9,"subsort_id":0}
{"id":"kk_001","purchase_time":1577836800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":3,"subsort_id":1}
{"id":"cb_002","purchase_time":1577923200000,"customer_id":"karen","vendor_id":"chum_bucket","amount":2,"subsort_id":2}
{"id":"kk_002","purchase_time":1577923200000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":5,"subsort_id":3}
{"id":"cb_003","purchase_time":1578009600000,"customer_id":"karen","vendor_id":"chum_bucket","amount":4,"subsort_id":4}
{"id":"kk_003","purchase_time":1578009600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":12,"subsort_id":5}
{"id":"cb_004","purchase_time":1578096000000,"customer_id":"patrick","vendor_id":"chum_bucket","amount":5000,"subsort_id":6}
{"id":"cb_005","purchase_time":1578096000000,"customer_id":"karen","vendor_id":"chum_bucket","amount":3,"subsort_id":7}
{"id":"cb_006","purchase_time":1578182400000,"customer_id":"karen","vendor_id":"chum_bucket","amount":5,"subsort_id":8}
{"id":"kk_004","purchase_time":1578182400000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":9}
5 changes: 5 additions & 0 deletions testdata/purchases/purchases_part2.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"id":"kk_005","purchase_time":1578268800000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":2,"subsort_id":0}
{"id":"wh_001","purchase_time":1578268800000,"customer_id":"spongebob","vendor_id":"weenie_hut","amount":7,"subsort_id":1}
{"id":"cb_007","purchase_time":1578355200000,"customer_id":"spongebob","vendor_id":"chum_bucket","amount":34,"subsort_id":2}
{"id":"wh_002","purchase_time":1578441600000,"customer_id":"karen","vendor_id":"weenie_hut","amount":8,"subsort_id":3}
{"id":"kk_006","purchase_time":1578441600000,"customer_id":"patrick","vendor_id":"krusty_krab","amount":9,"subsort_id":4}

0 comments on commit d5c8856

Please sign in to comment.