From c3fda237170889e9012c8144c1faa2358fe8d2df Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Fri, 24 May 2024 12:44:59 -0400 Subject: [PATCH] Print file name for mapping/splitting failures. (#318) * Use dask print for worker failures * Pylint. * Improve test coverage * Print worker address (if inside worker) * Remove unneeded line. --- src/hipscat_import/catalog/map_reduce.py | 231 ++++++++++-------- src/hipscat_import/pipeline_resume_plan.py | 37 +-- src/hipscat_import/soap/map_reduce.py | 164 +++++++------ .../hipscat_import/catalog/test_map_reduce.py | 25 +- .../soap/test_soap_map_reduce.py | 16 ++ .../test_pipeline_resume_plan.py | 12 +- 6 files changed, 278 insertions(+), 207 deletions(-) diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 3294d45c..f94de055 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -14,7 +14,7 @@ from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram -from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure # pylint: disable=too-many-locals,too-many-arguments @@ -94,22 +94,28 @@ def map_to_pixels( ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. FileNotFoundError: if the file does not exist, or is a directory """ - histo = SparseHistogram.make_empty(highest_order) + try: + histo = SparseHistogram.make_empty(highest_order) - if use_hipscat_index: - read_columns = [HIPSCAT_ID_COLUMN] - else: - read_columns = [ra_column, dec_column] + if use_hipscat_index: + read_columns = [HIPSCAT_ID_COLUMN] + else: + read_columns = [ra_column, dec_column] - for _, _, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns - ): - mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) + for _, _, mapped_pixels in _iterate_input_file( + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns + ): + mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) - partial = SparseHistogram.make_from_counts(mapped_pixel, count_at_pixel, healpix_order=highest_order) - histo.add(partial) + partial = SparseHistogram.make_from_counts( + mapped_pixel, count_at_pixel, healpix_order=highest_order + ) + histo.add(partial) - histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key)) + histo.to_file(ResumePlan.partial_histogram_file(tmp_path=resume_path, mapping_key=mapping_key)) + except Exception as exception: # pylint: disable=broad-exception-caught + print_task_failure(f"Failed MAPPING stage with file {input_file}", exception) + raise exception def split_pixels( @@ -142,30 +148,34 @@ def split_pixels( ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. FileNotFoundError: if the file does not exist, or is a directory """ - for chunk_number, data, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index - ): - aligned_pixels = alignment[mapped_pixels] - unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) - - for unique_index, [order, pixel, _] in enumerate(unique_pixels): - mapped_indexes = np.where(unique_inverse == unique_index) - data_indexes = data.index[mapped_indexes[0].tolist()] - - filtered_data = data.filter(items=data_indexes, axis=0) - - pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel)) - file_io.make_directory(pixel_dir, exist_ok=True) - output_file = file_io.append_paths_to_pointer( - pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" - ) - if _has_named_index(filtered_data): - filtered_data.to_parquet(output_file, index=True) - else: - filtered_data.to_parquet(output_file, index=False) - del filtered_data, data_indexes - - ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key) + try: + for chunk_number, data, mapped_pixels in _iterate_input_file( + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index + ): + aligned_pixels = alignment[mapped_pixels] + unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) + + for unique_index, [order, pixel, _] in enumerate(unique_pixels): + mapped_indexes = np.where(unique_inverse == unique_index) + data_indexes = data.index[mapped_indexes[0].tolist()] + + filtered_data = data.filter(items=data_indexes, axis=0) + + pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel)) + file_io.make_directory(pixel_dir, exist_ok=True) + output_file = file_io.append_paths_to_pointer( + pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" + ) + if _has_named_index(filtered_data): + filtered_data.to_parquet(output_file, index=True) + else: + filtered_data.to_parquet(output_file, index=False) + del filtered_data, data_indexes + + ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key) + except Exception as exception: # pylint: disable=broad-exception-caught + print_task_failure(f"Failed SPLITTING stage with file {input_file}", exception) + raise exception def reduce_pixel_shards( @@ -227,84 +237,99 @@ def reduce_pixel_shards( ValueError: if the number of rows written doesn't equal provided `destination_pixel_size` """ - destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number) - file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options) + try: + destination_dir = paths.pixel_directory( + output_path, destination_pixel_order, destination_pixel_number + ) + file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options) - destination_file = paths.pixel_catalog_file( - output_path, destination_pixel_order, destination_pixel_number - ) + destination_file = paths.pixel_catalog_file( + output_path, destination_pixel_order, destination_pixel_number + ) - schema = None - if use_schema_file: - schema = file_io.read_parquet_metadata( - use_schema_file, storage_options=storage_options - ).schema.to_arrow_schema() + schema = None + if use_schema_file: + schema = file_io.read_parquet_metadata( + use_schema_file, storage_options=storage_options + ).schema.to_arrow_schema() - tables = [] - healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) - pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) + tables = [] + healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) + pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) - if schema: - tables.append(pq.read_table(pixel_dir, schema=schema)) - else: - tables.append(pq.read_table(pixel_dir)) + if schema: + tables.append(pq.read_table(pixel_dir, schema=schema)) + else: + tables.append(pq.read_table(pixel_dir)) - merged_table = pa.concat_tables(tables) + merged_table = pa.concat_tables(tables) - rows_written = len(merged_table) + rows_written = len(merged_table) - if rows_written != destination_pixel_size: - raise ValueError( - "Unexpected number of objects at pixel " - f"({healpix_pixel})." - f" Expected {destination_pixel_size}, wrote {rows_written}" - ) + if rows_written != destination_pixel_size: + raise ValueError( + "Unexpected number of objects at pixel " + f"({healpix_pixel})." + f" Expected {destination_pixel_size}, wrote {rows_written}" + ) - dataframe = merged_table.to_pandas() - if sort_columns: - dataframe = dataframe.sort_values(sort_columns.split(",")) - if add_hipscat_index: - ## If we had a meaningful index before, preserve it as a column. - if _has_named_index(dataframe): - dataframe = dataframe.reset_index() + dataframe = merged_table.to_pandas() + if sort_columns: + dataframe = dataframe.sort_values(sort_columns.split(",")) + if add_hipscat_index: + ## If we had a meaningful index before, preserve it as a column. + if _has_named_index(dataframe): + dataframe = dataframe.reset_index() + + dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id( + dataframe[ra_column].values, + dataframe[dec_column].values, + ) + dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index() - dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id( - dataframe[ra_column].values, - dataframe[dec_column].values, - ) - dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index() + # Adjust the schema to make sure that the _hipscat_index will + # be saved as a uint64 + elif use_hipscat_index: + if dataframe.index.name != HIPSCAT_ID_COLUMN: + dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN) + dataframe = dataframe.sort_index() + + dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8) + dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64) + dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64) - # Adjust the schema to make sure that the _hipscat_index will - # be saved as a uint64 if schema: - pandas_index_column = schema.get_field_index("__index_level_0__") - if pandas_index_column != -1: - schema = schema.remove(pandas_index_column) - schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64())) - elif use_hipscat_index: - if dataframe.index.name != HIPSCAT_ID_COLUMN: - dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN) - dataframe = dataframe.sort_index() - - dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8) - dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64) - dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64) - - if schema: - schema = ( - schema.append(pa.field("Norder", pa.uint8())) - .append(pa.field("Dir", pa.uint64())) - .append(pa.field("Npix", pa.uint64())) - ) - dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options) - else: - dataframe.to_parquet(destination_file, storage_options=storage_options) + schema = _modify_arrow_schema(schema, add_hipscat_index) + dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options) + else: + dataframe.to_parquet(destination_file, storage_options=storage_options) - del dataframe, merged_table, tables + del dataframe, merged_table, tables - if delete_input_files: - pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) + if delete_input_files: + pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) - file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options) + file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options) + + ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key) + except Exception as exception: # pylint: disable=broad-exception-caught + print_task_failure( + f"Failed REDUCING stage for shard: {destination_pixel_order} {destination_pixel_number}", + exception, + ) + raise exception + + +def _modify_arrow_schema(schema, add_hipscat_index): + if add_hipscat_index: + pandas_index_column = schema.get_field_index("__index_level_0__") + if pandas_index_column != -1: + schema = schema.remove(pandas_index_column) + schema = schema.insert(0, pa.field(HIPSCAT_ID_COLUMN, pa.uint64())) + schema = ( + schema.append(pa.field("Norder", pa.uint8())) + .append(pa.field("Dir", pa.uint64())) + .append(pa.field("Npix", pa.uint64())) + ) - ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key) + return schema diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 350a9905..f9ef4f60 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -6,7 +6,8 @@ from dataclasses import dataclass from pathlib import Path -from dask.distributed import as_completed +from dask.distributed import as_completed, get_worker +from dask.distributed import print as dask_print from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel from tqdm.auto import tqdm @@ -134,22 +135,8 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False): ): if future.status == "error": some_error = True - exception = future.exception() - trace_strs = [ - f"{stage_name} task {future.key} failed with message:", - f" {type(exception).__name__}: {exception}", - " Traceback (most recent call last):", - ] - stack_trace = exception.__traceback__ - while stack_trace is not None: - filename = stack_trace.tb_frame.f_code.co_filename - method_name = stack_trace.tb_frame.f_code.co_name - line_number = stack_trace.tb_lineno - trace_strs.append(f' File "{filename}", line {line_number}, in {method_name}') - stack_trace = stack_trace.tb_next - print("\n".join(trace_strs)) if fail_fast: - raise exception + raise future.exception() if some_error: raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.") @@ -220,3 +207,21 @@ def get_pixel_cache_directory(cache_path, pixel: HealpixPixel): return file_io.append_paths_to_pointer( cache_path, f"order_{pixel.order}", f"dir_{pixel.dir}", f"pixel_{pixel.pixel}" ) + + +def print_task_failure(custom_message, exception): + """Use dask's distributed print feature to print the exception message to the task's logs + and to the controller job's logs. + + Optionally print the worker address if a worker is found. + + Args: + custom_message (str): some custom message for the task that might help with debugging + exception (Exception): error raised in execution of the task. + """ + dask_print(custom_message) + try: + dask_print(" worker address:", get_worker().address) + except Exception: # pylint: disable=broad-exception-caught + pass + dask_print(exception) diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 6ec232d5..5416e15f 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -12,7 +12,7 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort -from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory +from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure from hipscat_import.soap.arguments import SoapArguments from hipscat_import.soap.resume_plan import SoapPlan @@ -94,39 +94,43 @@ def count_joins(soap_args: SoapArguments, source_pixel: HealpixPixel, object_pix object_pixels(List[HealpixPixel]): set of tuples of order and pixel for the partitions of the object catalog to be joined. """ - source_path = paths.pixel_catalog_file( - catalog_base_dir=file_io.get_file_pointer_from_path(soap_args.source_catalog_dir), - pixel_order=source_pixel.order, - pixel_number=source_pixel.pixel, - ) - if soap_args.write_leaf_files and soap_args.source_object_id_column != soap_args.source_id_column: - read_columns = [soap_args.source_object_id_column, soap_args.source_id_column] - else: - read_columns = [soap_args.source_object_id_column] - source_data = file_io.load_parquet_to_pandas( - source_path, columns=read_columns, storage_options=soap_args.source_storage_options - ).set_index(soap_args.source_object_id_column) - - remaining_sources = len(source_data) - results = [] - - for object_pixel in object_pixels: - if remaining_sources < 1: - break - join_count = _count_joins_for_object( - source_data, - source_pixel, - object_pixel, - soap_args, + try: + source_path = paths.pixel_catalog_file( + catalog_base_dir=file_io.get_file_pointer_from_path(soap_args.source_catalog_dir), + pixel_order=source_pixel.order, + pixel_number=source_pixel.pixel, ) - results.append([object_pixel.order, object_pixel.pixel, join_count]) - remaining_sources -= join_count - - ## mark that some sources were not joined - if remaining_sources > 0: - results.append([-1, -1, remaining_sources]) - - _write_count_results(soap_args.tmp_path, source_pixel, results) + if soap_args.write_leaf_files and soap_args.source_object_id_column != soap_args.source_id_column: + read_columns = [soap_args.source_object_id_column, soap_args.source_id_column] + else: + read_columns = [soap_args.source_object_id_column] + source_data = file_io.load_parquet_to_pandas( + source_path, columns=read_columns, storage_options=soap_args.source_storage_options + ).set_index(soap_args.source_object_id_column) + + remaining_sources = len(source_data) + results = [] + + for object_pixel in object_pixels: + if remaining_sources < 1: + break + join_count = _count_joins_for_object( + source_data, + source_pixel, + object_pixel, + soap_args, + ) + results.append([object_pixel.order, object_pixel.pixel, join_count]) + remaining_sources -= join_count + + ## mark that some sources were not joined + if remaining_sources > 0: + results.append([-1, -1, remaining_sources]) + + _write_count_results(soap_args.tmp_path, source_pixel, results) + except Exception as exception: # pylint: disable=broad-exception-caught + print_task_failure(f"Failed COUNTING stage for shard: {source_pixel}", exception) + raise exception def combine_partial_results(input_path, output_path, output_storage_options) -> int: @@ -187,49 +191,55 @@ def reduce_joins( ): """Reduce join tables into one parquet file per object-pixel, with one row-group inside per source pixel.""" - pixel_dir = get_pixel_cache_directory(soap_args.tmp_path, object_pixel) - # If there's no directory, this implies there were no matches to this object pixel - # earlier in the pipeline. Move on. - if not file_io.does_file_or_directory_exist(pixel_dir): - return - # Find all of the constituent files / source pixels. Create a list of PyArrow Tables from those - # parquet files. We need to know the schema before we create the ParquetWriter. - shard_file_list = file_io.find_files_matching_path(pixel_dir, "source*.parquet") - - if len(shard_file_list) == 0: - return - - ## We want to order the row groups in a "breadth-first" sorting. Determine our sorting - ## via the metadata, then read the tables in using that sorting. - healpix_pixels = [] - for shard_file_name in shard_file_list: - healpix_pixels.append( - get_healpix_pixel_from_metadata(pq.read_metadata(shard_file_name), "join_Norder", "join_Npix") + try: + pixel_dir = get_pixel_cache_directory(soap_args.tmp_path, object_pixel) + # If there's no directory, this implies there were no matches to this object pixel + # earlier in the pipeline. Move on. + if not file_io.does_file_or_directory_exist(pixel_dir): + return + # Find all of the constituent files / source pixels. Create a list of PyArrow Tables from those + # parquet files. We need to know the schema before we create the ParquetWriter. + shard_file_list = file_io.find_files_matching_path(pixel_dir, "source*.parquet") + + if len(shard_file_list) == 0: + return + + ## We want to order the row groups in a "breadth-first" sorting. Determine our sorting + ## via the metadata, then read the tables in using that sorting. + healpix_pixels = [] + for shard_file_name in shard_file_list: + healpix_pixels.append( + get_healpix_pixel_from_metadata(pq.read_metadata(shard_file_name), "join_Norder", "join_Npix") + ) + + argsort = get_pixel_argsort(healpix_pixels) + shard_file_list = np.array(shard_file_list)[argsort] + + shards = [] + for shard_file_name in shard_file_list: + shards.append(pq.read_table(shard_file_name)) + + # Write all of the shards into a single parquet file, one row-group-per-shard. + starting_catalog_path = FilePointer(str(soap_args.catalog_path)) + destination_dir = paths.pixel_directory(starting_catalog_path, object_pixel.order, object_pixel.pixel) + file_io.make_directory( + destination_dir, exist_ok=True, storage_options=soap_args.output_storage_options ) - argsort = get_pixel_argsort(healpix_pixels) - shard_file_list = np.array(shard_file_list)[argsort] - - shards = [] - for shard_file_name in shard_file_list: - shards.append(pq.read_table(shard_file_name)) - - # Write all of the shards into a single parquet file, one row-group-per-shard. - starting_catalog_path = FilePointer(str(soap_args.catalog_path)) - destination_dir = paths.pixel_directory(starting_catalog_path, object_pixel.order, object_pixel.pixel) - file_io.make_directory(destination_dir, exist_ok=True, storage_options=soap_args.output_storage_options) - - output_file = paths.pixel_catalog_file(starting_catalog_path, object_pixel.order, object_pixel.pixel) - file_system, output_file = get_fs( - file_pointer=output_file, storage_options=soap_args.output_storage_options - ) - output_file = strip_leading_slash_for_pyarrow(output_file, protocol=file_system.protocol) - with pq.ParquetWriter(output_file, shards[0].schema, filesystem=file_system) as writer: - for table in shards: - writer.write_table(table) - - # Delete the intermediate shards. - if delete_input_files: - file_io.remove_directory(pixel_dir, ignore_errors=True) - - SoapPlan.reducing_key_done(tmp_path=soap_args.tmp_path, reducing_key=object_key) + output_file = paths.pixel_catalog_file(starting_catalog_path, object_pixel.order, object_pixel.pixel) + file_system, output_file = get_fs( + file_pointer=output_file, storage_options=soap_args.output_storage_options + ) + output_file = strip_leading_slash_for_pyarrow(output_file, protocol=file_system.protocol) + with pq.ParquetWriter(output_file, shards[0].schema, filesystem=file_system) as writer: + for table in shards: + writer.write_table(table) + + # Delete the intermediate shards. + if delete_input_files: + file_io.remove_directory(pixel_dir, ignore_errors=True) + + SoapPlan.reducing_key_done(tmp_path=soap_args.tmp_path, reducing_key=object_key) + except Exception as exception: # pylint: disable=broad-exception-caught + print_task_failure(f"Failed REDUCING stage for shard: {object_pixel}", exception) + raise exception diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index dad2eb49..adc1758b 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -58,7 +58,7 @@ def test_read_directory(test_data_dir): ) -def test_read_bad_fileformat(blank_data_file): +def test_read_bad_fileformat(blank_data_file, capsys): """Unsupported file format""" with pytest.raises(NotImplementedError): mr.map_to_pixels( @@ -70,6 +70,8 @@ def test_read_bad_fileformat(blank_data_file): resume_path="", mapping_key="map_0", ) + captured = capsys.readouterr() + assert "No file reader implemented" in captured.out def read_partial_histogram(tmp_path, mapping_key): @@ -246,6 +248,27 @@ def test_map_small_sky_part_order1(tmp_path, small_sky_file0): assert (result == expected).all() +def test_split_pixels_bad_format(blank_data_file, tmp_path, capsys): + """Test loading the a file with non-default headers""" + alignment = np.full(12, None) + alignment[11] = (0, 11, 131) + with pytest.raises(NotImplementedError): + mr.split_pixels( + input_file=blank_data_file, + file_reader=None, + highest_order=0, + ra_column="ra_mean", + dec_column="dec_mean", + splitting_key="0", + cache_shard_path=tmp_path, + resume_path=tmp_path, + alignment=alignment, + ) + captured = capsys.readouterr() + assert "No file reader implemented" in captured.out + os.makedirs(os.path.join(tmp_path, "splitting")) + + def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_path): """Test loading the a file with non-default headers""" os.makedirs(os.path.join(tmp_path, "splitting")) diff --git a/tests/hipscat_import/soap/test_soap_map_reduce.py b/tests/hipscat_import/soap/test_soap_map_reduce.py index f86724bf..b95745e7 100644 --- a/tests/hipscat_import/soap/test_soap_map_reduce.py +++ b/tests/hipscat_import/soap/test_soap_map_reduce.py @@ -159,3 +159,19 @@ def test_reduce_joins(small_sky_soap_args, soap_intermediate_dir, small_sky_soap for row_index in range(14) ] assert ordered_pixels == list(small_sky_soap_maps.keys()) + + +def test_reduce_joins_missing_files(small_sky_soap_args, soap_intermediate_dir, capsys): + """Use some previously-computed intermediate files to reduce the joined + leaf parquet files into a single parquet file.""" + temp_path = os.path.join(small_sky_soap_args.tmp_path, "resume", "intermediate") + shutil.copytree( + soap_intermediate_dir, + temp_path, + ) + small_sky_soap_args.tmp_path = temp_path + + with pytest.raises(FileNotFoundError): + reduce_joins(small_sky_soap_args, HealpixPixel(0, 11), object_key="0_11") + captured = capsys.readouterr() + assert "No such file or directory" in captured.out diff --git a/tests/hipscat_import/test_pipeline_resume_plan.py b/tests/hipscat_import/test_pipeline_resume_plan.py index c029f09b..b694f33c 100644 --- a/tests/hipscat_import/test_pipeline_resume_plan.py +++ b/tests/hipscat_import/test_pipeline_resume_plan.py @@ -98,7 +98,7 @@ def test_safe_to_resume(tmp_path): @pytest.mark.dask -def test_wait_for_futures(tmp_path, dask_client, capsys): +def test_wait_for_futures(tmp_path, dask_client): """Test that we can wait around for futures to complete. Additionally test that relevant parts of the traceback are printed to stdout.""" @@ -118,13 +118,9 @@ def error_on_even(argument): with pytest.raises(RuntimeError, match="Some test stages failed"): plan.wait_for_futures(futures, "test") - captured = capsys.readouterr() - assert "RuntimeError: we are at odds with evens" in captured.out - assert "error_on_even" in captured.out - @pytest.mark.dask -def test_wait_for_futures_fail_fast(tmp_path, dask_client, capsys): +def test_wait_for_futures_fail_fast(tmp_path, dask_client): """Test that we can wait around for futures to complete. Additionally test that relevant parts of the traceback are printed to stdout.""" @@ -139,10 +135,6 @@ def error_on_even(argument): with pytest.raises(RuntimeError, match="we are at odds with evens"): plan.wait_for_futures(futures, "test", fail_fast=True) - captured = capsys.readouterr() - assert "we are at odds with evens" in captured.out - assert "error_on_even" in captured.out - def test_formatted_stage_name(): """Test that we make pretty stage names for presenting in progress bars"""