From c493ce0096ce4d2fbc3901883442dbc48ef8be6b Mon Sep 17 00:00:00 2001 From: Maddie Dawson Date: Fri, 20 Oct 2023 15:14:15 -0700 Subject: [PATCH 1/2] Init --- streaming/base/converters/dataframe_to_mds.py | 74 +++++++++++++++---- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/streaming/base/converters/dataframe_to_mds.py b/streaming/base/converters/dataframe_to_mds.py index 1d62ef4e7..1adf07eee 100644 --- a/streaming/base/converters/dataframe_to_mds.py +++ b/streaming/base/converters/dataframe_to_mds.py @@ -6,6 +6,7 @@ import logging import os import shutil +import time from collections.abc import Iterable from typing import Any, Callable, Dict, Iterable, Optional, Tuple @@ -224,8 +225,8 @@ def write_mds(iterator: Iterable): ], axis=1) - if dataframe is None or dataframe.isEmpty(): - raise ValueError(f'Input dataframe is None or Empty!') + if dataframe is None: + raise ValueError(f'Input dataframe must be provided') if not mds_kwargs: mds_kwargs = {} @@ -261,6 +262,9 @@ def write_mds(iterator: Iterable): if cu.remote is None: mds_path = (cu.local, '') else: + if dataframe.isStreaming: + raise ValueError( + 'dataframe_to_mds currently only supports outputting to a local directory') mds_path = (cu.local, cu.remote) # Prepare partition schema @@ -269,21 +273,59 @@ def write_mds(iterator: Iterable): StructField('mds_path_remote', StringType(), False), StructField('fail_count', IntegerType(), False) ]) - partitions = dataframe.mapInPandas(func=write_mds, schema=result_schema).collect() + mapped_df = dataframe.mapInPandas(func=write_mds, schema=result_schema) + + if mapped_df.isStreaming: + + def merge_and_log(df: DataFrame, batch_id: int): + partitions = df.collect() + if len(partitions) == 0: + return + + if merge_index: + index_files = [ + (row['mds_path_local'], row['mds_path_remote']) for row in partitions + ] + lock_file_path = os.path.join(out, '.merge.lock') + # Acquire the lock. + while True: + try: + fd = os.open(lock_file_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + except OSError: + time.sleep(1) # File already exists, wait and try again + else: + break + do_merge_index(index_files, out, keep_local=keep_local, download_timeout=60) + # Release the lock. + os.close(fd) + + sum_fail_count = 0 + for row in partitions: + sum_fail_count += row['fail_count'] + + if sum_fail_count > 0: + logger.warning( + f'[Batch #{batch_id}] Total failed records = {sum_fail_count}\nOverall records {dataframe.count()}' + ) + + mapped_df.writeStream.foreachBatch(merge_and_log).start() + return None, 0 + else: + partitions = mapped_df.collect() - if merge_index: - index_files = [(row['mds_path_local'], row['mds_path_remote']) for row in partitions] - do_merge_index(index_files, out, keep_local=keep_local, download_timeout=60) + if merge_index: + index_files = [(row['mds_path_local'], row['mds_path_remote']) for row in partitions] + do_merge_index(index_files, out, keep_local=keep_local, download_timeout=60) - if cu.remote is not None: - if 'keep_local' in mds_kwargs and mds_kwargs['keep_local'] == False: - shutil.rmtree(cu.local, ignore_errors=True) + if cu.remote is not None: + if 'keep_local' in mds_kwargs and mds_kwargs['keep_local'] == False: + shutil.rmtree(cu.local, ignore_errors=True) - sum_fail_count = 0 - for row in partitions: - sum_fail_count += row['fail_count'] + sum_fail_count = 0 + for row in partitions: + sum_fail_count += row['fail_count'] - if sum_fail_count > 0: - logger.warning( - f'Total failed records = {sum_fail_count}\nOverall records {dataframe.count()}') - return mds_path, sum_fail_count + if sum_fail_count > 0: + logger.warning( + f'Total failed records = {sum_fail_count}\nOverall records {dataframe.count()}') + return mds_path, sum_fail_count From 8b2e560813b02f0d0d492ab2a09959d6fc64fcd4 Mon Sep 17 00:00:00 2001 From: Maddie Dawson Date: Thu, 2 Nov 2023 15:09:29 -0700 Subject: [PATCH 2/2] Update --- streaming/base/converters/dataframe_to_mds.py | 21 ++++++++----------- streaming/base/format/mds/writer.py | 4 +++- streaming/base/util.py | 4 +++- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/streaming/base/converters/dataframe_to_mds.py b/streaming/base/converters/dataframe_to_mds.py index 1adf07eee..c9a59a704 100644 --- a/streaming/base/converters/dataframe_to_mds.py +++ b/streaming/base/converters/dataframe_to_mds.py @@ -280,12 +280,10 @@ def write_mds(iterator: Iterable): def merge_and_log(df: DataFrame, batch_id: int): partitions = df.collect() if len(partitions) == 0: + logger.warning(f'[Batch #{batch_id}] No records to write') return if merge_index: - index_files = [ - (row['mds_path_local'], row['mds_path_remote']) for row in partitions - ] lock_file_path = os.path.join(out, '.merge.lock') # Acquire the lock. while True: @@ -295,20 +293,19 @@ def merge_and_log(df: DataFrame, batch_id: int): time.sleep(1) # File already exists, wait and try again else: break - do_merge_index(index_files, out, keep_local=keep_local, download_timeout=60) + do_merge_index(out, keep_local=keep_local, download_timeout=60) # Release the lock. os.close(fd) + os.remove(lock_file_path) - sum_fail_count = 0 for row in partitions: - sum_fail_count += row['fail_count'] + logger.warning(f"[Batch #{batch_id}] {row['fail_count']} failed record(s) for {row['mds_path_local']}") - if sum_fail_count > 0: - logger.warning( - f'[Batch #{batch_id}] Total failed records = {sum_fail_count}\nOverall records {dataframe.count()}' - ) - - mapped_df.writeStream.foreachBatch(merge_and_log).start() + mapped_df \ + .writeStream \ + .foreachBatch(merge_and_log) \ + .start() \ + .awaitTermination() return None, 0 else: partitions = mapped_df.collect() diff --git a/streaming/base/format/mds/writer.py b/streaming/base/format/mds/writer.py index e82fc02a8..2517703ad 100644 --- a/streaming/base/format/mds/writer.py +++ b/streaming/base/format/mds/writer.py @@ -4,6 +4,7 @@ """:class:`MDSWriter` writes samples to ``.mds`` files that can be read by :class:`MDSReader`.""" import json +import time from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np @@ -123,7 +124,8 @@ def get_config(self) -> Dict[str, Any]: obj.update({ 'column_names': self.column_names, 'column_encodings': self.column_encodings, - 'column_sizes': self.column_sizes + 'column_sizes': self.column_sizes, + 'write_timestamp': time.time(), }) return obj diff --git a/streaming/base/util.py b/streaming/base/util.py index d8042eda0..4151a6a0e 100644 --- a/streaming/base/util.py +++ b/streaming/base/util.py @@ -350,7 +350,9 @@ def _merge_index_from_list(index_file_urls: List[Union[str, Tuple[str, str]]], # Move merged index from temp path to local part in out # Upload merged index to remote if out has remote part - shutil.move(merged_index_path, cu.local) + dst_index_path = os.path.join(cu.local, os.path.basename(merged_index_path)) + shutil.copy(merged_index_path, dst_index_path) + os.remove(merged_index_path) if cu.remote is not None: cu.upload_file(index_basename)