diff --git a/focus_converter_base/focus_converter/conversion_configs/gcp/0_dimension_dtypes_S001.yaml b/focus_converter_base/focus_converter/conversion_configs/gcp/0_dimension_dtypes_S001.yaml new file mode 100644 index 00000000..f7d83c55 --- /dev/null +++ b/focus_converter_base/focus_converter/conversion_configs/gcp/0_dimension_dtypes_S001.yaml @@ -0,0 +1,8 @@ +plan_name: adds dtypes to the columns required for the conversion +conversion_type: set_column_dtypes +column: PlaceHolder +focus_column: PlaceHolder +conversion_args: + dtype_args: + - column_name: billing_account_id + dtype: string diff --git a/focus_converter_base/focus_converter/data_loaders/data_exporter.py b/focus_converter_base/focus_converter/data_loaders/data_exporter.py index fa84f223..73fa3664 100644 --- a/focus_converter_base/focus_converter/data_loaders/data_exporter.py +++ b/focus_converter_base/focus_converter/data_loaders/data_exporter.py @@ -5,6 +5,7 @@ import polars as pl import pyarrow.parquet as pq +from pyarrow import Table def __writer_process__( @@ -12,17 +13,17 @@ def __writer_process__( ): while True: try: - df = queue.get(timeout=0.1) + table = queue.get() except Empty: continue - if not isinstance(df, pl.DataFrame): + if not isinstance(table, Table): break pq.write_to_dataset( root_path=export_path, compression="snappy", - table=df.to_arrow(), + table=table, basename_template=basename_template, ) @@ -80,4 +81,4 @@ def collect(self, lf: pl.LazyFrame, collected_columns: List[str]): # compute final dataframe df: pl.DataFrame = lf.collect(streaming=True) - self.__queue__.put(df) + self.__queue__.put(df.to_arrow()) diff --git a/focus_converter_base/focus_converter/data_loaders/data_loader.py b/focus_converter_base/focus_converter/data_loaders/data_loader.py index 4e7ac248..fc076d00 100644 --- a/focus_converter_base/focus_converter/data_loaders/data_loader.py +++ b/focus_converter_base/focus_converter/data_loaders/data_loader.py @@ -49,6 +49,11 @@ def load_pyarrow_dataset(self) -> Iterable[pl.LazyFrame]: with tqdm(total=total_rows) as pobj: for batch in scanner.to_batches(): df = pl.from_arrow(batch) + + # skip if number of rows empty + if df.shape[0] == 0: + continue + yield df.lazy() pobj.update(df.shape[0]) diff --git a/focus_converter_base/pyproject.toml b/focus_converter_base/pyproject.toml index c242eb79..128d6bd6 100644 --- a/focus_converter_base/pyproject.toml +++ b/focus_converter_base/pyproject.toml @@ -8,7 +8,7 @@ packages = [{ include = "focus_converter" }] [tool.poetry.dependencies] python = "^3.9" -polars = "^0.20.6" +polars = "0.20.10" pydantic = "^2.1.1" pandas = "^2.0.3" pyarrow = "^14" diff --git a/focus_converter_base/tests/test_polars_pyarrow_compatibility.py b/focus_converter_base/tests/test_polars_pyarrow_compatibility.py new file mode 100644 index 00000000..e3a3d55e --- /dev/null +++ b/focus_converter_base/tests/test_polars_pyarrow_compatibility.py @@ -0,0 +1,50 @@ +import os + +import polars as pl +import pyarrow as pa +import pytest +from unittest import TestCase +import pyarrow.parquet as pq +import pyarrow.dataset as ds +import tempfile + +from focus_converter.data_loaders.data_loader import ( + DEFAULT_BATCH_READ_SIZE, + FRAGMENT_READ_AHEAD, + BATCH_READ_AHEAD, +) + + +class TestPolarsPyarrowCompatibility(TestCase): + """ + Test that the Polars and PyArrow data types are compatible. + + When trying to load a PyArrow table into Polars, raises error AttributeError: 'pyarrow.lib.StructArray' object has no attribute 'num_chunks' + """ + + def test_polars_pyarrow_compatibility(self): + # Create a PyArrow table + table = pa.table( + {"a": [1, 2, 3], "b": [4, 5, 6], "c": [{"d": 7}, {"d": 8}, {"d": 9}]} + ) + + with tempfile.TemporaryDirectory() as tempdir: + pq.write_table(table, f"{tempdir}/test.pq") + table = pq.read_table(f"{tempdir}/test.pq") + + pl.from_arrow(table) + + os.system(f"ls -lh {tempdir}") + dataset = ds.dataset(tempdir) + + # Load the PyArrow dataset into Polars, this will raise an error AttributeError: 'pyarrow.lib.StructArray' object has no attribute 'num_chunks' + scanner = dataset.scanner( + batch_size=DEFAULT_BATCH_READ_SIZE, + use_threads=True, + fragment_readahead=FRAGMENT_READ_AHEAD, + batch_readahead=BATCH_READ_AHEAD, + ) + + for batch in scanner.to_batches(): + df = pl.from_arrow(batch) + self.assertIsInstance(df, pl.DataFrame)