Skip to content

Commit

Permalink
Added support for file CSV export
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Fuller <[email protected]>
  • Loading branch information
mike-finopsorg committed Jun 4, 2024
1 parent 51ff39e commit 1111285
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
8 changes: 8 additions & 0 deletions focus_converter_base/focus_converter/common/cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing_extensions import Annotated

from focus_converter.data_loaders.data_loader import DataFormats, ParquetDataFormat
from focus_converter.data_loaders.data_exporter import ExportDataFormats

PROVIDER_OPTION = Annotated[
str,
Expand All @@ -16,12 +17,19 @@
str, typer.Option(help="Target data path", rich_help_panel="Data Export")
]

EXPORT_DATA_FORMAT = Annotated[
ExportDataFormats,
typer.Option(help="Target data format", rich_help_panel="Data Export"),
]

DATA_PATH = Annotated[
str, typer.Option(help="Source data path", rich_help_panel="Source Data")
]

DATA_FORMAT_OPTION = Annotated[
DataFormats, typer.Option(help="Data format", rich_help_panel="Source Data")
]

PARQUET_DATA_FORMAT_OPTION = Annotated[
ParquetDataFormat,
typer.Option(help="Parquet data format", rich_help_panel="Source Data"),
Expand Down
40 changes: 29 additions & 11 deletions focus_converter_base/focus_converter/data_loaders/data_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@
import re
from queue import Empty
from typing import List

from enum import Enum
from uuid import uuid4
import polars as pl
import pyarrow.parquet as pq
from pyarrow import Table
from pyarrow import Table, csv

from focus_converter.models.focus_column_names import FocusColumnNames


class ExportDataFormats(Enum):
CSV = "csv"
PARQUET = "parquet"


def __writer_process__(
export_path, queue: multiprocessing.Queue, basename_template: str
queue: multiprocessing.Queue, export_path, export_format, basename_template: str
):
while True:
try:
Expand All @@ -22,12 +28,19 @@ def __writer_process__(
if not isinstance(table, Table):
break

pq.write_to_dataset(
root_path=export_path,
compression="snappy",
table=table,
basename_template=basename_template,
)
if export_format == ExportDataFormats.PARQUET:
pq.write_to_dataset(
root_path=export_path,
compression="snappy",
table=table,
basename_template=basename_template,
)
elif export_format == ExportDataFormats.CSV:
if not basename_template:
basename_template = str(uuid4()).replace("-", "") + ".csv"
csv.write_csv(table, export_path + basename_template.replace("-{i}", ""))
else:
raise Exception("Unknown output format")


class DataExporter:
Expand All @@ -37,11 +50,15 @@ def __init__(
export_include_source_columns: bool,
basename_template: str = None,
process_count: int = multiprocessing.cpu_count(),
export_format: str = "parquet",
):
self.__export_path__ = export_path
self.__export_include_source_columns__ = export_include_source_columns
if basename_template and not re.search(r"-{i}\.parquet$", basename_template):
basename_template += "-{i}.parquet"
self.__export_format__ = export_format
if basename_template and not re.search(
f"-{{i}}\.{self.__export_format__.value}$", basename_template
):
basename_template += "-{i}." + str(self.__export_format__.value)
self.__basename_template__ = basename_template
self.__queue__ = queue = multiprocessing.Queue(maxsize=process_count)

Expand All @@ -52,6 +69,7 @@ def __init__(
kwargs={
"queue": queue,
"export_path": self.__export_path__,
"export_format": self.__export_format__,
"basename_template": self.__basename_template__,
},
)
Expand Down
7 changes: 6 additions & 1 deletion focus_converter_base/focus_converter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DATA_PATH,
EXPORT_INCLUDE_SOURCE_COLUMNS,
EXPORT_PATH_OPTION,
EXPORT_DATA_FORMAT,
PARQUET_DATA_FORMAT_OPTION,
PLAN_GRAPH_PATH,
PROVIDER_OPTION,
Expand All @@ -32,6 +33,7 @@
def main_auto(
data_path: DATA_PATH,
export_path: EXPORT_PATH_OPTION,
export_format: EXPORT_DATA_FORMAT = "parquet",
export_include_source_columns: EXPORT_INCLUDE_SOURCE_COLUMNS = True,
column_prefix: Annotated[
str,
Expand All @@ -57,7 +59,7 @@ def main_auto(
basename_template: Annotated[
str,
typer.Option(
help="Specify a template string for output filename as opposed to guid-{i}.",
help="Specify a template string for output filename as opposed to `guid-{i}`.",
rich_help_panel="Data Export",
),
] = None,
Expand All @@ -78,6 +80,7 @@ def main_auto(
export_path=export_path,
export_include_source_columns=export_include_source_columns,
basename_template=basename_template,
export_format=export_format,
)
converter.prepare_horizontal_conversion_plan(provider=provider_sensor.provider)
converter.convert()
Expand All @@ -101,6 +104,7 @@ def main(
export_path: EXPORT_PATH_OPTION,
data_format: DATA_FORMAT_OPTION,
data_path: DATA_PATH,
export_format: EXPORT_DATA_FORMAT = "parquet",
parquet_data_format: PARQUET_DATA_FORMAT_OPTION = None,
export_include_source_columns: EXPORT_INCLUDE_SOURCE_COLUMNS = True,
column_prefix: Annotated[
Expand Down Expand Up @@ -150,6 +154,7 @@ def main(
export_path=export_path,
export_include_source_columns=export_include_source_columns,
basename_template=basename_template,
export_format=export_format,
)
converter.prepare_horizontal_conversion_plan(provider=provider)
converter.convert()
Expand Down

0 comments on commit 1111285

Please sign in to comment.