Skip to content

Commit

Permalink
POC write parquet files with specific filenames
Browse files Browse the repository at this point in the history
  • Loading branch information
David Sschneider committed Apr 4, 2024
1 parent 2be8ffd commit c6f3600
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
7 changes: 6 additions & 1 deletion focus_converter_base/focus_converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class FocusConverter:
# converted column prefix to be added to converted columns
__converted_column_prefix__: Optional[str] = None

def __init__(self, column_prefix=None, converted_column_prefix=None):
__basename_template__: Optional[str] = None

def __init__(self, column_prefix=None, converted_column_prefix=None, basename_template=None):
self.__temporary_columns__ = []
self.__column_prefix__ = column_prefix
self.__converted_column_prefix__ = converted_column_prefix
Expand All @@ -77,6 +79,9 @@ def __init__(self, column_prefix=None, converted_column_prefix=None):
# deferred column plans, these plans are applied after lazyframe is loaded
self.__deferred_column_plans__ = DeferredColumnFunctions()

self.__basename_template__ = basename_template


def load_provider_conversion_configs(self):
plans = {}

Expand Down
15 changes: 11 additions & 4 deletions focus_converter_base/focus_converter/data_loaders/data_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pyarrow.parquet as pq


def __writer_process__(export_path, queue: multiprocessing.Queue):
def __writer_process__(export_path, queue: multiprocessing.Queue, basename_template: str):
while True:
try:
df = queue.get(timeout=0.1)
Expand All @@ -17,7 +17,9 @@ def __writer_process__(export_path, queue: multiprocessing.Queue):
break

pq.write_to_dataset(
root_path=export_path, compression="snappy", table=df.to_arrow()
root_path=export_path,
compression="snappy", table=df.to_arrow(),
basename_template=basename_template
)


Expand All @@ -26,18 +28,23 @@ def __init__(
self,
export_path,
export_include_source_columns: bool,
basename_template: str = None,
process_count: int = multiprocessing.cpu_count(),
):
self.__export_path__ = export_path
self.__export_include_source_columns__ = export_include_source_columns

self.__basename_template__ = basename_template
self.__queue__ = queue = multiprocessing.Queue(maxsize=process_count)

self.__processes__ = processes = []
for _ in range(process_count):
p = multiprocessing.Process(
target=__writer_process__,
kwargs={"queue": queue, "export_path": self.__export_path__},
kwargs={
"queue": queue,
"export_path": self.__export_path__,
"basename_template": self.__basename_template__
},
)
processes.append(p)

Expand Down
16 changes: 16 additions & 0 deletions focus_converter_base/focus_converter/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ def main_auto(
rich_help_panel="Validation",
),
] = False,
basename_template: Annotated[
str,
typer.Option(
help="Specify a template string for output filename as opposed to `guid-{i}`.",
rich_help_panel="Data Export",
),
] = None,
):
provider_sensor = ProviderSensor(base_path=data_path)
provider_sensor.load()
Expand All @@ -70,6 +77,7 @@ def main_auto(
converter.configure_data_export(
export_path=export_path,
export_include_source_columns=export_include_source_columns,
basename_template=basename_template,
)
converter.prepare_horizontal_conversion_plan(provider=provider_sensor.provider)
converter.convert()
Expand Down Expand Up @@ -116,6 +124,13 @@ def main(
rich_help_panel="Validation",
),
] = False,
basename_template: Annotated[
str,
typer.Option(
help="Specify a template string for output filename as opposed to `guid-{i}`.",
rich_help_panel="Data Export",
),
] = None,
):
# compute function for conversion

Expand All @@ -134,6 +149,7 @@ def main(
converter.configure_data_export(
export_path=export_path,
export_include_source_columns=export_include_source_columns,
basename_template=basename_template,
)
converter.prepare_horizontal_conversion_plan(provider=provider)
converter.convert()
Expand Down

0 comments on commit c6f3600

Please sign in to comment.