From c6f36001f1dafce27836d3ed059434c2502af208 Mon Sep 17 00:00:00 2001 From: David Sschneider Date: Wed, 3 Apr 2024 21:55:50 -0600 Subject: [PATCH] POC write parquet files with specific filenames --- .../focus_converter/converter.py | 7 ++++++- .../data_loaders/data_exporter.py | 15 +++++++++++---- focus_converter_base/focus_converter/main.py | 16 ++++++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/focus_converter_base/focus_converter/converter.py b/focus_converter_base/focus_converter/converter.py index cafe7329..8a356984 100644 --- a/focus_converter_base/focus_converter/converter.py +++ b/focus_converter_base/focus_converter/converter.py @@ -65,7 +65,9 @@ class FocusConverter: # converted column prefix to be added to converted columns __converted_column_prefix__: Optional[str] = None - def __init__(self, column_prefix=None, converted_column_prefix=None): + __basename_template__: Optional[str] = None + + def __init__(self, column_prefix=None, converted_column_prefix=None, basename_template=None): self.__temporary_columns__ = [] self.__column_prefix__ = column_prefix self.__converted_column_prefix__ = converted_column_prefix @@ -77,6 +79,9 @@ def __init__(self, column_prefix=None, converted_column_prefix=None): # deferred column plans, these plans are applied after lazyframe is loaded self.__deferred_column_plans__ = DeferredColumnFunctions() + self.__basename_template__ = basename_template + + def load_provider_conversion_configs(self): plans = {} diff --git a/focus_converter_base/focus_converter/data_loaders/data_exporter.py b/focus_converter_base/focus_converter/data_loaders/data_exporter.py index 5ed2d3a2..3369be16 100644 --- a/focus_converter_base/focus_converter/data_loaders/data_exporter.py +++ b/focus_converter_base/focus_converter/data_loaders/data_exporter.py @@ -6,7 +6,7 @@ import pyarrow.parquet as pq -def __writer_process__(export_path, queue: multiprocessing.Queue): +def __writer_process__(export_path, queue: multiprocessing.Queue, basename_template: str): while True: try: df = queue.get(timeout=0.1) @@ -17,7 +17,9 @@ def __writer_process__(export_path, queue: multiprocessing.Queue): break pq.write_to_dataset( - root_path=export_path, compression="snappy", table=df.to_arrow() + root_path=export_path, + compression="snappy", table=df.to_arrow(), + basename_template=basename_template ) @@ -26,18 +28,23 @@ def __init__( self, export_path, export_include_source_columns: bool, + basename_template: str = None, process_count: int = multiprocessing.cpu_count(), ): self.__export_path__ = export_path self.__export_include_source_columns__ = export_include_source_columns - + self.__basename_template__ = basename_template self.__queue__ = queue = multiprocessing.Queue(maxsize=process_count) self.__processes__ = processes = [] for _ in range(process_count): p = multiprocessing.Process( target=__writer_process__, - kwargs={"queue": queue, "export_path": self.__export_path__}, + kwargs={ + "queue": queue, + "export_path": self.__export_path__, + "basename_template": self.__basename_template__ + }, ) processes.append(p) diff --git a/focus_converter_base/focus_converter/main.py b/focus_converter_base/focus_converter/main.py index 46a5445c..12098ae4 100644 --- a/focus_converter_base/focus_converter/main.py +++ b/focus_converter_base/focus_converter/main.py @@ -54,6 +54,13 @@ def main_auto( rich_help_panel="Validation", ), ] = False, + basename_template: Annotated[ + str, + typer.Option( + help="Specify a template string for output filename as opposed to `guid-{i}`.", + rich_help_panel="Data Export", + ), + ] = None, ): provider_sensor = ProviderSensor(base_path=data_path) provider_sensor.load() @@ -70,6 +77,7 @@ def main_auto( converter.configure_data_export( export_path=export_path, export_include_source_columns=export_include_source_columns, + basename_template=basename_template, ) converter.prepare_horizontal_conversion_plan(provider=provider_sensor.provider) converter.convert() @@ -116,6 +124,13 @@ def main( rich_help_panel="Validation", ), ] = False, + basename_template: Annotated[ + str, + typer.Option( + help="Specify a template string for output filename as opposed to `guid-{i}`.", + rich_help_panel="Data Export", + ), + ] = None, ): # compute function for conversion @@ -134,6 +149,7 @@ def main( converter.configure_data_export( export_path=export_path, export_include_source_columns=export_include_source_columns, + basename_template=basename_template, ) converter.prepare_horizontal_conversion_plan(provider=provider) converter.convert()