Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
epinzur committed Sep 26, 2023
1 parent 36d8f72 commit ccfd6a3
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
14 changes: 7 additions & 7 deletions python/pysrc/kaskada/sources/arrow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Provide sources based on PyArrow, including Pandas and CSV."""
from __future__ import annotations

import os
from io import BytesIO
from typing import Optional

Expand Down Expand Up @@ -309,6 +308,7 @@ async def add_string(self, csv_string: str | BytesIO) -> None:
for batch in content.to_batches():
await self._ffi_table.add_pyarrow(batch)


class JsonlFile(Source):
"""Source reading data from line-delimited JSON files using PyArrow."""

Expand Down Expand Up @@ -356,7 +356,7 @@ async def create(
schema: Optional[pa.Schema] = None,
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> JsonlString:
) -> JsonlFile:
"""Create a source reading a line-delimited JSON file.
Args:
Expand Down Expand Up @@ -395,12 +395,13 @@ async def create(

async def add_file(self, path: str) -> None:
"""Add data to the source."""
path = Source._get_absolute_path(path)
abs_path = Source._get_absolute_path(path)

batches = pa.json.read_json(path, parse_options=self._parse_options)
batches = pa.json.read_json(abs_path, parse_options=self._parse_options)
for batch in batches.to_batches():
await self._ffi_table.add_pyarrow(batch)


class JsonlString(Source):
"""Source reading data from line-delimited JSON strings using PyArrow."""

Expand Down Expand Up @@ -558,7 +559,6 @@ async def create(
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
If not specified (and not specified in the data), nanosecond will be assumed.
"""

path = Source._get_absolute_path(path)

if schema is None:
Expand All @@ -581,6 +581,6 @@ async def create(

async def add_file(self, path: str) -> None:
"""Add data to the source."""
path = Source._get_absolute_path(path)
abs_path = Source._get_absolute_path(path)

await self._ffi_table.add_parquet(path)
await self._ffi_table.add_parquet(abs_path)
2 changes: 1 addition & 1 deletion python/pysrc/kaskada/sources/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _validate_column(field_name: Optional[str], schema: pa.Schema) -> None:
raise ValueError(f"Column: {field_name!r} must be non-nullable")

@staticmethod
def _get_absolute_path(path: Optional[str]) -> str|None:
def _get_absolute_path(path: Optional[str]) -> Optional[str]:
if path is None or path.startswith("/"):
return path
return os.getcwd() + "/" + path

0 comments on commit ccfd6a3

Please sign in to comment.