Skip to content

Commit

Permalink
Fix multiprocessing queue and downgraded polars version temporarily u…
Browse files Browse the repository at this point in the history
…ntil pyarrow compatibility is restored. (finopsfoundation#341)

Signed-off-by: Varun Mittal <[email protected]>
  • Loading branch information
varunmittal91 authored Apr 16, 2024
1 parent d45674b commit a438f5a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
plan_name: adds dtypes to the columns required for the conversion
conversion_type: set_column_dtypes
column: PlaceHolder
focus_column: PlaceHolder
conversion_args:
dtype_args:
- column_name: billing_account_id
dtype: string
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@

import polars as pl
import pyarrow.parquet as pq
from pyarrow import Table


def __writer_process__(
export_path, queue: multiprocessing.Queue, basename_template: str
):
while True:
try:
df = queue.get(timeout=0.1)
table = queue.get()
except Empty:
continue

if not isinstance(df, pl.DataFrame):
if not isinstance(table, Table):
break

pq.write_to_dataset(
root_path=export_path,
compression="snappy",
table=df.to_arrow(),
table=table,
basename_template=basename_template,
)

Expand Down Expand Up @@ -80,4 +81,4 @@ def collect(self, lf: pl.LazyFrame, collected_columns: List[str]):

# compute final dataframe
df: pl.DataFrame = lf.collect(streaming=True)
self.__queue__.put(df)
self.__queue__.put(df.to_arrow())
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def load_pyarrow_dataset(self) -> Iterable[pl.LazyFrame]:
with tqdm(total=total_rows) as pobj:
for batch in scanner.to_batches():
df = pl.from_arrow(batch)

# skip if number of rows empty
if df.shape[0] == 0:
continue

yield df.lazy()
pobj.update(df.shape[0])

Expand Down
2 changes: 1 addition & 1 deletion focus_converter_base/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ packages = [{ include = "focus_converter" }]

[tool.poetry.dependencies]
python = "^3.9"
polars = "^0.20.6"
polars = "0.20.10"
pydantic = "^2.1.1"
pandas = "^2.0.3"
pyarrow = "^14"
Expand Down
50 changes: 50 additions & 0 deletions focus_converter_base/tests/test_polars_pyarrow_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os

import polars as pl
import pyarrow as pa
import pytest
from unittest import TestCase
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import tempfile

from focus_converter.data_loaders.data_loader import (
DEFAULT_BATCH_READ_SIZE,
FRAGMENT_READ_AHEAD,
BATCH_READ_AHEAD,
)


class TestPolarsPyarrowCompatibility(TestCase):
"""
Test that the Polars and PyArrow data types are compatible.
When trying to load a PyArrow table into Polars, raises error AttributeError: 'pyarrow.lib.StructArray' object has no attribute 'num_chunks'
"""

def test_polars_pyarrow_compatibility(self):
# Create a PyArrow table
table = pa.table(
{"a": [1, 2, 3], "b": [4, 5, 6], "c": [{"d": 7}, {"d": 8}, {"d": 9}]}
)

with tempfile.TemporaryDirectory() as tempdir:
pq.write_table(table, f"{tempdir}/test.pq")
table = pq.read_table(f"{tempdir}/test.pq")

pl.from_arrow(table)

os.system(f"ls -lh {tempdir}")
dataset = ds.dataset(tempdir)

# Load the PyArrow dataset into Polars, this will raise an error AttributeError: 'pyarrow.lib.StructArray' object has no attribute 'num_chunks'
scanner = dataset.scanner(
batch_size=DEFAULT_BATCH_READ_SIZE,
use_threads=True,
fragment_readahead=FRAGMENT_READ_AHEAD,
batch_readahead=BATCH_READ_AHEAD,
)

for batch in scanner.to_batches():
df = pl.from_arrow(batch)
self.assertIsInstance(df, pl.DataFrame)

0 comments on commit a438f5a

Please sign in to comment.