diff --git a/scripts/parquet_to_json.py b/scripts/parquet_to_json.py index 5a9b10b..83cc3f2 100644 --- a/scripts/parquet_to_json.py +++ b/scripts/parquet_to_json.py @@ -1,21 +1,31 @@ import os -import pandas as pd +import pyarrow.parquet as pq -def convert_parquet_to_json(directory: str, filenames: list, output_file: str) -> None: +def convert_parquet_to_json_streaming( + directory: str, filenames: list, output_file: str, batch_size: int = 10_000 +) -> None: """ - Convert a list of Parquet files to a single JSON file. + Convert a list of Parquet files to a single JSON Lines file in a memory-efficient manner. + Data is read and written in batches to avoid loading entire datasets into memory at once. :param directory: Directory containing the Parquet files. :param filenames: List of Parquet files to be combined. - :param output_file: Output JSON file. + :param output_file: Output JSON file (JSON Lines format). + :param batch_size: Number of rows per batch (adjust as needed). """ - combined_df = pd.concat( - [pd.read_parquet(os.path.join(directory, filename)) for filename in filenames], - ignore_index=True, - ) + with open(output_file, "w", encoding="utf-8") as f_out: + for filename in filenames: + filepath = os.path.join(directory, filename) + print(f"Started processing: {filepath}") + + parquet_file = pq.ParquetFile(filepath) + + for batch in parquet_file.iter_batches(batch_size=batch_size): + df = batch.to_pandas() + json_str = df.to_json(orient="records", lines=True) + f_out.write(json_str) - combined_df.to_json(output_file, orient="records", lines=True) print(f"Conversion to JSON completed successfully! Output saved to: {output_file}") @@ -24,4 +34,4 @@ def convert_parquet_to_json(directory: str, filenames: list, output_file: str) - filenames = ["a.parquet", "b.parquet", "c.parquet", "d.parquet"] output_file = "samples/wikipedia.json" - convert_parquet_to_json(directory, filenames, output_file) + convert_parquet_to_json_streaming(directory, filenames, output_file)