diff --git a/.github/workflows/add-issue-to-project-tracker.yml b/.github/workflows/add-issue-to-project-tracker.yml index 18b113a5..8f0a3694 100644 --- a/.github/workflows/add-issue-to-project-tracker.yml +++ b/.github/workflows/add-issue-to-project-tracker.yml @@ -10,7 +10,7 @@ jobs: name: Add issue to project runs-on: ubuntu-latest steps: - - uses: actions/add-to-project@v1.0.1 + - uses: actions/add-to-project@v1.0.2 with: project-url: https://github.com/orgs/astronomy-commons/projects/7 github-token: ${{ secrets.ADD_TO_PROJECT_PAT }} diff --git a/docs/catalogs/arguments.rst b/docs/catalogs/arguments.rst index ca86302b..b7c7e0a9 100644 --- a/docs/catalogs/arguments.rst +++ b/docs/catalogs/arguments.rst @@ -97,7 +97,6 @@ Reading input files Catalog import reads through a list of files and converts them into a hipscatted catalog. - Which files? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -129,9 +128,10 @@ along to the map/reduce stages. We've provided reference implementations for reading CSV, FITS, and Parquet input files, but you can subclass the reader type to suit whatever input files you've got. -You only need to provide the ``file_reader`` argument if you are using a custom file reader +You only need to provide an object ``file_reader`` argument if you are using a custom file reader or passing parameters to the file reader. For example you might use ``file_reader=CsvReader(sep="\s+")`` -to parse a whitespace separated file. +to parse a whitespace separated file. Otherwise, you can use a short string to +specify an existing file reader type e.g. ``file_reader="csv"``. You can find the full API documentation for :py:class:`hipscat_import.catalog.file_readers.InputReader` @@ -171,6 +171,36 @@ You can find the full API documentation for If you're reading from cloud storage, or otherwise have some filesystem credential dict, put those in ``input_storage_options``. +Indexed batching strategy +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you have many small files (think 400k+ CSV files with a few rows each), you +may benefit from "indexed" file readers. These allow you to explicitly create +batches for tasks by providing a set of index files, where each file is a +text file that contains only paths to data files. + +Benefits: + +1. If you have 400k+ input files, you don't want to create 400k+ dask tasks + to process these files. +2. If the files are very small, batching them in this way allows the import + process to *combine* several small files into a single chunk for processing. + This will result in fewer intermediate files during the ``splitting`` stage. +3. If you have a parquet files over a slow networked file system, we support + pyarrow's readahead protocol through indexed readers. + +Warnings: + +1. If you have 20 dask workers in your pool, you may be tempted to create + 20 index files. This is not always an efficient use of resources! + You'd be better served by 200 index files, so that: + + a. dask can spread the load if some lists of files take longer to process + than others + b. if the pipeline dies after successfully processing 15 lists, when you + retry the pipeline, you'll only be processing 5 lists with those same 20 + workers and many workers will be sitting idle. + Which fields? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -245,6 +275,12 @@ reporting to look like the following: Reducing : 100%|██████████| 10895/10895 [7:46:07<00:00, 2.57s/it] Finishing: 100%|██████████| 6/6 [08:03<00:00, 80.65s/it] +``tqdm`` will try to make a guess about the type of output to provide: plain +text as for a command line, or a pretty ipywidget. If it tries to use a pretty +widget but your execution environment can't support the widget, you can +force the pipeline to use a simple progress bar with the ``simple_progress_bar`` +argument. + For very long-running pipelines (e.g. multi-TB inputs), you can get an email notification when the pipeline completes using the ``completion_email_address`` argument. This will send a brief email, diff --git a/pyproject.toml b/pyproject.toml index 931aa25e..9315e911 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ "dask[complete]>=2024.3.0", # Includes dask expressions. "deprecated", "healpy", - "hipscat >=0.3.4", + "hipscat >=0.3.5", "ipykernel", # Support for Jupyter notebooks "numpy", "pandas", diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..124b2043 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +git+https://github.com/astronomy-commons/hipscat.git@main \ No newline at end of file diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index f3f27bf0..ba9004f4 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -3,6 +3,9 @@ import abc from typing import Any, Dict, Union +import pandas as pd +import pyarrow +import pyarrow.dataset import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader from astropy.table import Table @@ -30,8 +33,16 @@ def get_file_reader( includes `.csv.gz` and other compressed csv files - `fits`, flexible image transport system. often used for astropy tables. - `parquet`, compressed columnar data format + - `ecsv`, astropy's enhanced CSV + - `indexed_csv`, "index" style reader, that accepts a file with a list + of csv files that are appended in-memory + - `indexed_parquet`, "index" style reader, that accepts a file with a list + of parquet files that are appended in-memory chunksize (int): number of rows to read in a single iteration. + for single-file readers, large files are split into batches based on this value. + for index-style readers, we read files until we reach this chunksize and + create a single batch in-memory. schema_file (str): path to a parquet schema file. if provided, header names and column types will be pulled from the parquet schema metadata. column_names (list[str]): for CSV files, the names of columns if no header @@ -59,7 +70,16 @@ def get_file_reader( ) if file_format == "parquet": return ParquetReader(chunksize=chunksize, **kwargs) - + if file_format == "indexed_csv": + return IndexedCsvReader( + chunksize=chunksize, + schema_file=schema_file, + column_names=column_names, + type_map=type_map, + **kwargs, + ) + if file_format == "indexed_parquet": + return IndexedParquetReader(chunksize=chunksize, **kwargs) raise NotImplementedError(f"File Format: {file_format} not supported") @@ -96,7 +116,7 @@ def provenance_info(self) -> dict: def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs): """Check that the `input_file` points to a single regular file - Raises + Raises: FileNotFoundError: if nothing exists at path, or directory found. """ if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options): @@ -104,6 +124,20 @@ def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], if not file_io.is_regular_file(input_file, storage_options=storage_options): raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}") + def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs): + """Read an "indexed" file. + + This should contain a list of paths to files to be read and batched. + + Raises: + FileNotFoundError: if nothing exists at path, or directory found. + """ + self.regular_file_exists(input_file, **kwargs) + file_names = file_io.load_text_file(input_file, storage_options=storage_options) + file_names = [f.strip() for f in file_names] + file_names = [f for f in file_names if f] + return file_names + class CsvReader(InputReader): """CSV reader for the most common CSV reading arguments. @@ -158,7 +192,7 @@ def __init__( if self.column_names: self.kwargs["names"] = self.column_names elif not self.header and schema_parquet is not None: - self.kwargs["names"] = schema_parquet.columns + self.kwargs["names"] = list(schema_parquet.columns) if self.type_map: self.kwargs["dtype"] = self.type_map @@ -180,6 +214,37 @@ def read(self, input_file, read_columns=None): yield from reader +class IndexedCsvReader(CsvReader): + """Reads an index file, containing paths to CSV files to be read and batched + + See CsvReader for additional configuration for reading CSV files. + """ + + def read(self, input_file, read_columns=None): + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + + batch_size = 0 + batch_frames = [] + for file in file_names: + for single_frame in super().read(file, read_columns=read_columns): + if batch_size + len(single_frame) >= self.chunksize: + # We've hit our chunksize, send the batch off to the task. + if len(batch_frames) == 0: + yield single_frame + batch_size = 0 + else: + yield pd.concat(batch_frames, ignore_index=True) + batch_frames = [] + batch_frames.append(single_frame) + batch_size = len(single_frame) + else: + batch_frames.append(single_frame) + batch_size += len(single_frame) + + if len(batch_frames) > 0: + yield pd.concat(batch_frames, ignore_index=True) + + class AstropyEcsvReader(InputReader): """Reads astropy ascii .ecsv files. @@ -241,7 +306,7 @@ def read(self, input_file, read_columns=None): table = Table.read(input_file, memmap=True, **self.kwargs) if read_columns: table.keep_columns(read_columns) - if self.column_names: + elif self.column_names: table.keep_columns(self.column_names) elif self.skip_column_names: table.remove_columns(self.skip_column_names) @@ -291,3 +356,63 @@ def read(self, input_file, read_columns=None): batch_size=self.chunksize, columns=columns, use_pandas_metadata=True ): yield smaller_table.to_pandas() + + +class IndexedParquetReader(InputReader): + """Reads an index file, containing paths to parquet files to be read and batched + + Attributes: + chunksize (int): maximum number of rows to process at once. + Large files will be processed in chunks. Small files will be concatenated. + Also passed to pyarrow.dataset.Dataset.to_batches as `batch_size`. + batch_readahead (int): number of batches to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + fragment_readahead (int): number of fragments to read ahead. + Passed to pyarrow.dataset.Dataset.to_batches. + use_threads (bool): whether to use multiple threads for reading. + Passed to pyarrow.dataset.Dataset.to_batches. + column_names (list[str] or None): Names of columns to use from the input dataset. + If None, use all columns. + kwargs: additional arguments to pass along to InputReader.read_index_file. + """ + + def __init__( + self, + chunksize=500_000, + batch_readahead=16, + fragment_readahead=4, + use_threads=True, + column_names=None, + **kwargs, + ): + self.chunksize = chunksize + self.batch_readahead = batch_readahead + self.fragment_readahead = fragment_readahead + self.use_threads = use_threads + self.column_names = column_names + self.kwargs = kwargs + + def read(self, input_file, read_columns=None): + columns = read_columns or self.column_names + file_names = self.read_index_file(input_file=input_file, **self.kwargs) + (_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs) + + batches, nrows = [], 0 + for batch in input_dataset.to_batches( + batch_size=self.chunksize, + batch_readahead=self.batch_readahead, + fragment_readahead=self.fragment_readahead, + use_threads=self.use_threads, + columns=columns, + ): + if nrows + batch.num_rows > self.chunksize: + # We've hit the chunksize so load to a DataFrame and yield. + # There should always be at least one batch in here since batch_size == self.chunksize above. + yield pyarrow.Table.from_batches(batches).to_pandas() + batches, nrows = [], 0 + + batches.append(batch) + nrows += batch.num_rows + + if len(batches) > 0: + yield pyarrow.Table.from_batches(batches).to_pandas() diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index f94de055..7326335a 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -1,5 +1,6 @@ """Import a set of non-hipscat files using dask for parallelization""" +import pickle from typing import Any, Dict, Union import healpy as hp @@ -11,7 +12,6 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix -from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure @@ -35,7 +35,7 @@ def _has_named_index(dataframe): def _iterate_input_file( input_file: FilePointer, - file_reader: InputReader, + pickled_reader_file: str, highest_order, ra_column, dec_column, @@ -43,6 +43,8 @@ def _iterate_input_file( read_columns=None, ): """Helper function to handle input file reading and healpix pixel calculation""" + with open(pickled_reader_file, "rb") as pickle_file: + file_reader = pickle.load(pickle_file) if not file_reader: raise NotImplementedError("No file reader implemented") @@ -66,7 +68,7 @@ def _iterate_input_file( def map_to_pixels( input_file: FilePointer, - file_reader: InputReader, + pickled_reader_file: str, resume_path: FilePointer, mapping_key, highest_order, @@ -103,7 +105,13 @@ def map_to_pixels( read_columns = [ra_column, dec_column] for _, _, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns + input_file, + pickled_reader_file, + highest_order, + ra_column, + dec_column, + use_hipscat_index, + read_columns, ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) @@ -120,14 +128,14 @@ def map_to_pixels( def split_pixels( input_file: FilePointer, - file_reader: InputReader, + pickled_reader_file: str, splitting_key, highest_order, ra_column, dec_column, cache_shard_path: FilePointer, resume_path: FilePointer, - alignment=None, + alignment_file=None, use_hipscat_index=False, ): """Map a file of input objects to their healpix pixels and split into shards. @@ -149,8 +157,10 @@ def split_pixels( FileNotFoundError: if the file does not exist, or is a directory """ try: + with open(alignment_file, "rb") as pickle_file: + alignment = pickle.load(pickle_file) for chunk_number, data, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index + input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_hipscat_index ): aligned_pixels = alignment[mapped_pixels] unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 604d8a91..5ff33f2e 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -2,13 +2,15 @@ from __future__ import annotations +import pickle from dataclasses import dataclass, field from typing import List, Optional, Tuple import healpy as hp +import numpy as np +from hipscat import pixel_math from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from tqdm.auto import tqdm from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -24,8 +26,8 @@ class ResumePlan(PipelineResumePlan): """list of files (and job keys) that have yet to be mapped""" split_keys: List[Tuple[str, str]] = field(default_factory=list) """set of files (and job keys) that have yet to be split""" - destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None - """Fully resolved map of destination pixels to constituent smaller pixels""" + destination_pixel_map: Optional[List[Tuple[int, int, int]]] = None + """Destination pixels and their expected final count""" MAPPING_STAGE = "mapping" SPLITTING_STAGE = "splitting" @@ -33,6 +35,7 @@ class ResumePlan(PipelineResumePlan): HISTOGRAM_BINARY_FILE = "mapping_histogram.npz" HISTOGRAMS_DIR = "histograms" + ALIGNMENT_FILE = "alignment.pickle" def __post_init__(self): """Initialize the plan.""" @@ -40,9 +43,7 @@ def __post_init__(self): def gather_plan(self): """Initialize the plan.""" - with tqdm( - total=5, desc=self.get_formatted_stage_name("Planning"), disable=not self.progress_bar - ) as step_progress: + with self.print_progress(total=5, stage_name="Planning") as step_progress: ## Make sure it's safe to use existing resume state. super().safe_to_resume() step_progress.update(1) @@ -196,6 +197,61 @@ def is_mapping_done(self) -> bool: """Are there files left to map?""" return self.done_file_exists(self.MAPPING_STAGE) + def get_alignment_file( + self, + raw_histogram, + constant_healpix_order, + highest_healpix_order, + lowest_healpix_order, + pixel_threshold, + ) -> str: + """Get a pointer to the existing alignment file for the pipeline, or + generate a new alignment using provided arguments. + + Args: + raw_histogram (:obj:`np.array`): one-dimensional numpy array of long integers where the + value at each index corresponds to the number of objects found at the healpix pixel. + constant_healpix_order (int): if positive, use this as the order for + all non-empty partitions. else, use remaining arguments. + highest_healpix_order (int): the highest healpix order (e.g. 5-10) + lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order + constrains the partitioning to prevent spatially large pixels. + threshold (int): the maximum number of objects allowed in a single pixel + + Returns: + path to cached alignment file. + """ + file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ALIGNMENT_FILE) + if not file_io.does_file_or_directory_exist(file_name): + if constant_healpix_order >= 0: + alignment = np.full(len(raw_histogram), None) + for pixel_num, pixel_sum in enumerate(raw_histogram): + alignment[pixel_num] = ( + constant_healpix_order, + pixel_num, + pixel_sum, + ) + else: + alignment = pixel_math.generate_alignment( + raw_histogram, + highest_order=highest_healpix_order, + lowest_order=lowest_healpix_order, + threshold=pixel_threshold, + ) + with open(file_name, "wb") as pickle_file: + pickle.dump(alignment, pickle_file) + + if self.destination_pixel_map is None: + with open(file_name, "rb") as pickle_file: + alignment = pickle.load(pickle_file) + non_none_elements = alignment[alignment != np.array(None)] + self.destination_pixel_map = np.unique(non_none_elements) + self.destination_pixel_map = [ + (order, pix, count) for (order, pix, count) in self.destination_pixel_map if int(count) > 0 + ] + + return file_name + def wait_for_splitting(self, futures): """Wait for splitting futures to complete.""" self.wait_for_futures(futures, self.SPLITTING_STAGE) @@ -208,30 +264,31 @@ def is_splitting_done(self) -> bool: """Are there files left to split?""" return self.done_file_exists(self.SPLITTING_STAGE) - def get_reduce_items(self, destination_pixel_map=None): + def get_reduce_items(self): """Fetch a triple for each partition to reduce. Triple contains: - destination pixel (healpix pixel with both order and pixel) - - source pixels (list of pixels at mapping order) + - number of rows expected for this pixel - reduce key (string of destination order+pixel) - """ reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE)) - if destination_pixel_map is None: - destination_pixel_map = self.destination_pixel_map - elif self.destination_pixel_map is None: - self.destination_pixel_map = destination_pixel_map if self.destination_pixel_map is None: raise RuntimeError("destination pixel map not provided for progress tracking.") reduce_items = [ - (hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}") - for hp_pixel, source_pixels in destination_pixel_map.items() - if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys + (HealpixPixel(hp_order, hp_pixel), row_count, f"{hp_order}_{hp_pixel}") + for hp_order, hp_pixel, row_count in self.destination_pixel_map + if f"{hp_order}_{hp_pixel}" not in reduced_keys ] return reduce_items + def get_destination_pixels(self): + """Create HealpixPixel list of all destination pixels.""" + if self.destination_pixel_map is None: + raise RuntimeError("destination pixel map not known.") + return [HealpixPixel(hp_order, hp_pixel) for hp_order, hp_pixel, _ in self.destination_pixel_map] + def is_reducing_done(self) -> bool: """Are there partitions left to reduce?""" return self.done_file_exists(self.REDUCING_STAGE) diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 1b97b547..1e4054cc 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -4,106 +4,16 @@ The actual logic of the map reduce is in the `map_reduce.py` file. """ +import os +import pickle + import hipscat.io.write_metadata as io -import numpy as np -from hipscat import pixel_math from hipscat.catalog import PartitionInfo from hipscat.io import paths from hipscat.io.parquet_metadata import write_parquet_metadata -from tqdm.auto import tqdm import hipscat_import.catalog.map_reduce as mr from hipscat_import.catalog.arguments import ImportArguments -from hipscat_import.pipeline_resume_plan import PipelineResumePlan - - -def _map_pixels(args, client): - """Generate a raw histogram of object counts in each healpix pixel""" - - if args.resume_plan.is_mapping_done(): - return - - reader_future = client.scatter(args.file_reader, hash=False) - futures = [] - for key, file_path in args.resume_plan.map_files: - futures.append( - client.submit( - mr.map_to_pixels, - input_file=file_path, - resume_path=args.resume_plan.tmp_path, - file_reader=reader_future, - mapping_key=key, - highest_order=args.mapping_healpix_order, - ra_column=args.ra_column, - dec_column=args.dec_column, - use_hipscat_index=args.use_hipscat_index, - ) - ) - args.resume_plan.wait_for_mapping(futures) - - -def _split_pixels(args, alignment_future, client): - """Generate a raw histogram of object counts in each healpix pixel""" - - if args.resume_plan.is_splitting_done(): - return - - reader_future = client.scatter(args.file_reader, hash=False) - futures = [] - for key, file_path in args.resume_plan.split_keys: - futures.append( - client.submit( - mr.split_pixels, - input_file=file_path, - file_reader=reader_future, - highest_order=args.mapping_healpix_order, - ra_column=args.ra_column, - dec_column=args.dec_column, - splitting_key=key, - cache_shard_path=args.tmp_path, - resume_path=args.resume_plan.tmp_path, - alignment=alignment_future, - use_hipscat_index=args.use_hipscat_index, - ) - ) - - args.resume_plan.wait_for_splitting(futures) - - -def _reduce_pixels(args, destination_pixel_map, client): - """Loop over destination pixels and merge into parquet files""" - - if args.resume_plan.is_reducing_done(): - return - - futures = [] - for ( - destination_pixel, - source_pixels, - destination_pixel_key, - ) in args.resume_plan.get_reduce_items(destination_pixel_map): - futures.append( - client.submit( - mr.reduce_pixel_shards, - cache_shard_path=args.tmp_path, - resume_path=args.resume_plan.tmp_path, - reducing_key=destination_pixel_key, - destination_pixel_order=destination_pixel.order, - destination_pixel_number=destination_pixel.pixel, - destination_pixel_size=source_pixels[0], - output_path=args.catalog_path, - ra_column=args.ra_column, - dec_column=args.dec_column, - sort_columns=args.sort_columns, - add_hipscat_index=args.add_hipscat_index, - use_schema_file=args.use_schema_file, - use_hipscat_index=args.use_hipscat_index, - delete_input_files=args.delete_intermediate_parquet_files, - storage_options=args.output_storage_options, - ) - ) - - args.resume_plan.wait_for_reducing(futures) def run(args, client): @@ -112,50 +22,96 @@ def run(args, client): raise ValueError("args is required and should be type ImportArguments") if not isinstance(args, ImportArguments): raise ValueError("args must be type ImportArguments") - _map_pixels(args, client) - with tqdm( - total=2, desc=PipelineResumePlan.get_formatted_stage_name("Binning"), disable=not args.progress_bar - ) as step_progress: + pickled_reader_file = os.path.join(args.resume_plan.tmp_path, "reader.pickle") + with open(pickled_reader_file, "wb") as pickle_file: + pickle.dump(args.file_reader, pickle_file) + + if not args.resume_plan.is_mapping_done(): + futures = [] + for key, file_path in args.resume_plan.map_files: + futures.append( + client.submit( + mr.map_to_pixels, + input_file=file_path, + resume_path=args.resume_plan.tmp_path, + pickled_reader_file=pickled_reader_file, + mapping_key=key, + highest_order=args.mapping_healpix_order, + ra_column=args.ra_column, + dec_column=args.dec_column, + use_hipscat_index=args.use_hipscat_index, + ) + ) + args.resume_plan.wait_for_mapping(futures) + + with args.resume_plan.print_progress(total=2, stage_name="Binning") as step_progress: raw_histogram = args.resume_plan.read_histogram(args.mapping_healpix_order) step_progress.update(1) - if args.constant_healpix_order >= 0: - alignment = np.full(len(raw_histogram), None) - for pixel_num, pixel_sum in enumerate(raw_histogram): - alignment[pixel_num] = ( - args.constant_healpix_order, - pixel_num, - pixel_sum, - ) + alignment_file = args.resume_plan.get_alignment_file( + raw_histogram, + args.constant_healpix_order, + args.highest_healpix_order, + args.lowest_healpix_order, + args.pixel_threshold, + ) - destination_pixel_map = pixel_math.generate_constant_pixel_map( - histogram=raw_histogram, - constant_healpix_order=args.constant_healpix_order, - ) - else: - alignment = pixel_math.generate_alignment( - raw_histogram, - highest_order=args.highest_healpix_order, - lowest_order=args.lowest_healpix_order, - threshold=args.pixel_threshold, - ) - destination_pixel_map = pixel_math.compute_pixel_map( - raw_histogram, - highest_order=args.highest_healpix_order, - lowest_order=args.lowest_healpix_order, - threshold=args.pixel_threshold, - ) step_progress.update(1) if not args.debug_stats_only: - alignment_future = client.scatter(alignment) - _split_pixels(args, alignment_future, client) - _reduce_pixels(args, destination_pixel_map, client) + if not args.resume_plan.is_splitting_done(): + futures = [] + for key, file_path in args.resume_plan.split_keys: + futures.append( + client.submit( + mr.split_pixels, + input_file=file_path, + pickled_reader_file=pickled_reader_file, + highest_order=args.mapping_healpix_order, + ra_column=args.ra_column, + dec_column=args.dec_column, + splitting_key=key, + cache_shard_path=args.tmp_path, + resume_path=args.resume_plan.tmp_path, + alignment_file=alignment_file, + use_hipscat_index=args.use_hipscat_index, + ) + ) + + args.resume_plan.wait_for_splitting(futures) + + if not args.resume_plan.is_reducing_done(): + futures = [] + for ( + destination_pixel, + source_pixel_count, + destination_pixel_key, + ) in args.resume_plan.get_reduce_items(): + futures.append( + client.submit( + mr.reduce_pixel_shards, + cache_shard_path=args.tmp_path, + resume_path=args.resume_plan.tmp_path, + reducing_key=destination_pixel_key, + destination_pixel_order=destination_pixel.order, + destination_pixel_number=destination_pixel.pixel, + destination_pixel_size=source_pixel_count, + output_path=args.catalog_path, + ra_column=args.ra_column, + dec_column=args.dec_column, + sort_columns=args.sort_columns, + add_hipscat_index=args.add_hipscat_index, + use_schema_file=args.use_schema_file, + use_hipscat_index=args.use_hipscat_index, + delete_input_files=args.delete_intermediate_parquet_files, + storage_options=args.output_storage_options, + ) + ) + + args.resume_plan.wait_for_reducing(futures) # All done - write out the metadata - with tqdm( - total=5, desc=PipelineResumePlan.get_formatted_stage_name("Finishing"), disable=not args.progress_bar - ) as step_progress: + with args.resume_plan.print_progress(total=5, stage_name="Finishing") as step_progress: catalog_info = args.to_catalog_info(int(raw_histogram.sum())) io.write_provenance_info( catalog_base_dir=args.catalog_path, @@ -171,7 +127,7 @@ def run(args, client): storage_options=args.output_storage_options, ) step_progress.update(1) - partition_info = PartitionInfo.from_healpix(destination_pixel_map.keys()) + partition_info = PartitionInfo.from_healpix(args.resume_plan.get_destination_pixels()) partition_info_file = paths.get_partition_info_pointer(args.catalog_path) partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options) if not args.debug_stats_only: diff --git a/src/hipscat_import/index/run_index.py b/src/hipscat_import/index/run_index.py index c4279623..bbf870d1 100644 --- a/src/hipscat_import/index/run_index.py +++ b/src/hipscat_import/index/run_index.py @@ -1,11 +1,10 @@ """Create columnar index of hipscat table using dask for parallelization""" from hipscat.io import file_io, parquet_metadata, write_metadata -from tqdm.auto import tqdm import hipscat_import.index.map_reduce as mr from hipscat_import.index.arguments import IndexArguments -from hipscat_import.pipeline_resume_plan import PipelineResumePlan +from hipscat_import.pipeline_resume_plan import print_progress def run(args, client): @@ -17,8 +16,11 @@ def run(args, client): rows_written = mr.create_index(args, client) # All done - write out the metadata - with tqdm( - total=4, desc=PipelineResumePlan.get_formatted_stage_name("Finishing"), disable=not args.progress_bar + with print_progress( + total=4, + stage_name="Finishing", + use_progress_bar=args.progress_bar, + simple_progress_bar=args.simple_progress_bar, ) as step_progress: index_catalog_info = args.to_catalog_info(int(rows_written)) write_metadata.write_provenance_info( diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 217fcc56..8611393f 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -1,6 +1,5 @@ from hipscat.catalog import PartitionInfo from hipscat.io import file_io, parquet_metadata, paths, write_metadata -from tqdm.auto import tqdm import hipscat_import.margin_cache.margin_cache_map_reduce as mcmr from hipscat_import.margin_cache.margin_cache_resume_plan import MarginCachePlan @@ -59,7 +58,7 @@ def generate_margin_cache(args, client): ) resume_plan.wait_for_reducing(futures) - with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress: + with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: parquet_metadata.write_parquet_metadata( args.catalog_path, storage_options=args.output_storage_options ) diff --git a/src/hipscat_import/margin_cache/margin_cache_arguments.py b/src/hipscat_import/margin_cache/margin_cache_arguments.py index 8c22d4ae..11362cde 100644 --- a/src/hipscat_import/margin_cache/margin_cache_arguments.py +++ b/src/hipscat_import/margin_cache/margin_cache_arguments.py @@ -83,6 +83,9 @@ def to_catalog_info(self, total_rows) -> MarginCacheCatalogInfo: "catalog_name": self.output_artifact_name, "total_rows": total_rows, "catalog_type": "margin", + "epoch": self.catalog.catalog_info.epoch, + "ra_column": self.catalog.catalog_info.ra_column, + "dec_column": self.catalog.catalog_info.dec_column, "primary_catalog": self.input_catalog_path, "margin_threshold": self.margin_threshold, } diff --git a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py index 72aec80d..9a6274df 100644 --- a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py @@ -9,7 +9,6 @@ from hipscat import pixel_math from hipscat.io import file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from tqdm.auto import tqdm from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -33,6 +32,7 @@ def __init__(self, args: MarginCacheArguments): super().__init__( resume=args.resume, progress_bar=args.progress_bar, + simple_progress_bar=args.simple_progress_bar, tmp_path=args.tmp_path, delete_resume_log_files=args.delete_resume_log_files, ) @@ -40,9 +40,7 @@ def __init__(self, args: MarginCacheArguments): def _gather_plan(self, args): """Initialize the plan.""" - with tqdm( - total=3, desc=self.get_formatted_stage_name("Planning"), disable=not self.progress_bar - ) as step_progress: + with self.print_progress(total=3, stage_name="Planning") as step_progress: ## Make sure it's safe to use existing resume state. super().safe_to_resume() mapping_done = self.is_mapping_done() diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 7e859e91..84a1e603 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -10,7 +10,8 @@ from dask.distributed import print as dask_print from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from tqdm.auto import tqdm +from tqdm.auto import tqdm as auto_tqdm +from tqdm.std import tqdm as std_tqdm @dataclass @@ -25,6 +26,10 @@ class PipelineResumePlan: progress_bar: bool = True """if true, a tqdm progress bar will be displayed for user feedback of planning progress""" + simple_progress_bar: bool = False + """if displaying a progress bar, use a text-only simple progress + bar instead of widget. this can be useful in some environments when running + in a notebook where ipywidgets cannot be used (see `progress_bar` argument)""" delete_resume_log_files: bool = True """should we delete task-level done files once each stage is complete? if False, we will keep all sub-histograms from the mapping stage, and all @@ -131,13 +136,7 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False): RuntimeError: if any future returns an error status. """ some_error = False - formatted_stage_name = self.get_formatted_stage_name(stage_name) - for future in tqdm( - as_completed(futures), - desc=formatted_stage_name, - total=len(futures), - disable=(not self.progress_bar), - ): + for future in self.print_progress(as_completed(futures), stage_name=stage_name, total=len(futures)): if future.status == "error": some_error = True if fail_fast: @@ -146,18 +145,26 @@ def wait_for_futures(self, futures, stage_name, fail_fast=False): if some_error: raise RuntimeError(f"Some {stage_name} stages failed. See logs for details.") - @staticmethod - def get_formatted_stage_name(stage_name) -> str: - """Create a stage name of consistent minimum length. Ensures that the tqdm - progress bars can line up nicely when multiple stages must run. + def print_progress(self, iterable=None, total=None, stage_name=None): + """Create a progress bar that will provide user with task feedback. + + This is a thin wrapper around the static ``print_progress`` method that uses + member variables for the caller's convenience. Args: - stage_name (str): name of the stage (e.g. mapping, reducing) + iterable (iterable): Optional. provides iterations to progress updates. + total (int): Optional. Expected iterations. + stage_name (str): name of the stage (e.g. mapping, reducing). this will + be further formatted with ``get_formatted_stage_name``, so the caller + doesn't need to worry about that. """ - if stage_name is None or len(stage_name) == 0: - stage_name = "progress" - - return f"{stage_name.capitalize(): <10}" + return print_progress( + iterable=iterable, + total=total, + stage_name=stage_name, + use_progress_bar=self.progress_bar, + simple_progress_bar=self.simple_progress_bar, + ) def check_original_input_paths(self, input_paths): """Validate that we're operating on the same file set as the original pipeline, @@ -230,3 +237,50 @@ def print_task_failure(custom_message, exception): except Exception: # pylint: disable=broad-exception-caught pass dask_print(exception) + + +def get_formatted_stage_name(stage_name) -> str: + """Create a stage name of consistent minimum length. Ensures that the tqdm + progress bars can line up nicely when multiple stages must run. + + Args: + stage_name (str): name of the stage (e.g. mapping, reducing) + """ + if stage_name is None or len(stage_name) == 0: + stage_name = "progress" + + return f"{stage_name.capitalize(): <10}" + + +def print_progress( + iterable=None, total=None, stage_name=None, use_progress_bar=True, simple_progress_bar=False +): + """Create a progress bar that will provide user with task feedback. + + Args: + iterable (iterable): Optional. provides iterations to progress updates. + total (int): Optional. Expected iterations. + stage_name (str): name of the stage (e.g. mapping, reducing). this will + be further formatted with ``get_formatted_stage_name``, so the caller + doesn't need to worry about that. + use_progress_bar (bool): should we display any progress. typically False + when no stdout is expected. + simple_progress_bar (bool): if displaying a progress bar, use a text-only + simple progress bar instead of widget. this can be useful when running + in a particular notebook where ipywidgets cannot be used + (only used when ``use_progress_bar`` is True) + """ + if simple_progress_bar: + return std_tqdm( + iterable, + desc=get_formatted_stage_name(stage_name), + total=total, + disable=not use_progress_bar, + ) + + return auto_tqdm( + iterable, + desc=get_formatted_stage_name(stage_name), + total=total, + disable=not use_progress_bar, + ) diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index 4f4d7126..9ce5e8ad 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -32,8 +32,12 @@ class RuntimeArguments: the pipeline where we left off. If False, we start the import from scratch, overwriting any content of the output directory.""" progress_bar: bool = True - """if true, a tqdm progress bar will be displayed for user + """if true, a progress bar will be displayed for user feedback of map reduce progress""" + simple_progress_bar: bool = False + """if displaying a progress bar, use a text-only simple progress + bar instead of widget. this can be useful in some environments when running + in a notebook where ipywidgets cannot be used (see `progress_bar` argument)""" dask_tmp: str = "" """directory for dask worker space. this should be local to the execution of the pipeline, for speed of reads and writes""" diff --git a/src/hipscat_import/soap/resume_plan.py b/src/hipscat_import/soap/resume_plan.py index be1a6f86..3afd8626 100644 --- a/src/hipscat_import/soap/resume_plan.py +++ b/src/hipscat_import/soap/resume_plan.py @@ -11,7 +11,6 @@ from hipscat.io import file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_tree import PixelAlignment, align_trees -from tqdm.auto import tqdm from hipscat_import.pipeline_resume_plan import PipelineResumePlan from hipscat_import.soap.arguments import SoapArguments @@ -39,6 +38,7 @@ def __init__(self, args: SoapArguments): super().__init__( resume=args.resume, progress_bar=args.progress_bar, + simple_progress_bar=args.simple_progress_bar, tmp_path=args.tmp_path, delete_resume_log_files=args.delete_resume_log_files, ) @@ -46,9 +46,7 @@ def __init__(self, args: SoapArguments): def gather_plan(self, args): """Initialize the plan.""" - with tqdm( - total=3, desc=self.get_formatted_stage_name("Planning"), disable=not self.progress_bar - ) as step_progress: + with self.print_progress(total=3, stage_name="Planning") as step_progress: ## Make sure it's safe to use existing resume state. super().safe_to_resume() step_progress.update(1) diff --git a/src/hipscat_import/soap/run_soap.py b/src/hipscat_import/soap/run_soap.py index dafafae4..34d50b8a 100644 --- a/src/hipscat_import/soap/run_soap.py +++ b/src/hipscat_import/soap/run_soap.py @@ -5,9 +5,7 @@ from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo from hipscat.io import parquet_metadata, paths, write_metadata -from tqdm.auto import tqdm -from hipscat_import.pipeline_resume_plan import PipelineResumePlan from hipscat_import.soap.arguments import SoapArguments from hipscat_import.soap.map_reduce import combine_partial_results, count_joins, reduce_joins from hipscat_import.soap.resume_plan import SoapPlan @@ -50,9 +48,7 @@ def run(args, client): resume_plan.wait_for_reducing(futures) # All done - write out the metadata - with tqdm( - total=4, desc=PipelineResumePlan.get_formatted_stage_name("Finishing"), disable=not args.progress_bar - ) as step_progress: + with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: if args.write_leaf_files: parquet_metadata.write_parquet_metadata( args.catalog_path, diff --git a/tests/hipscat_import/catalog/test_argument_validation.py b/tests/hipscat_import/catalog/test_argument_validation.py index 65625eec..c39636d9 100644 --- a/tests/hipscat_import/catalog/test_argument_validation.py +++ b/tests/hipscat_import/catalog/test_argument_validation.py @@ -1,8 +1,10 @@ """Tests of argument validation""" import pytest +from hipscat.io import write_metadata from hipscat_import.catalog.arguments import ImportArguments, check_healpix_order_range +from hipscat_import.catalog.file_readers import CsvReader # pylint: disable=protected-access @@ -78,7 +80,7 @@ def test_good_paths(blank_data_dir, blank_data_file, tmp_path): ) assert args.input_path == blank_data_dir assert len(args.input_paths) == 1 - assert blank_data_file in args.input_paths[0] + assert str(blank_data_file) in args.input_paths[0] def test_multiple_files_in_path(small_sky_parts_dir, tmp_path): @@ -206,6 +208,38 @@ def test_provenance_info(blank_data_dir, tmp_path): assert "epoch" in runtime_args +def test_write_provenance_info(formats_dir, tmp_path): + """Verify that provenance info can be written to JSON file.""" + input_file = formats_dir / "gaia_minimum.csv" + schema_file = formats_dir / "gaia_minimum_schema.parquet" + + args = ImportArguments( + output_artifact_name="gaia_minimum", + input_file_list=[input_file], + file_reader=CsvReader( + comment="#", + header=None, + schema_file=schema_file, + ), + ra_column="ra", + dec_column="dec", + sort_columns="solution_id", + use_schema_file=schema_file, + output_path=tmp_path, + dask_tmp=tmp_path, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + ) + + write_metadata.write_provenance_info( + catalog_base_dir=args.catalog_path, + dataset_info=args.to_catalog_info(0), + tool_args=args.provenance_info(), + storage_options=args.output_storage_options, + ) + + def test_check_healpix_order_range(): """Test method check_healpix_order_range""" check_healpix_order_range(5, "order_field") diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index 677b4a7d..8ceedb3f 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -1,7 +1,5 @@ """Test dataframe-generating file readers""" -import os - import hipscat.io.write_metadata as io import numpy as np import pandas as pd @@ -10,7 +8,14 @@ import pytest from hipscat.catalog.catalog import CatalogInfo -from hipscat_import.catalog.file_readers import CsvReader, FitsReader, ParquetReader, get_file_reader +from hipscat_import.catalog.file_readers import ( + CsvReader, + FitsReader, + IndexedCsvReader, + IndexedParquetReader, + ParquetReader, + get_file_reader, +) # pylint: disable=redefined-outer-name @@ -91,7 +96,7 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path): pa.field("dec_error", pa.float64()), ] ) - schema_file = os.path.join(tmp_path, "metadata.parquet") + schema_file = tmp_path / "metadata.parquet" pq.write_metadata( small_sky_schema, schema_file, @@ -187,7 +192,7 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path): pa.field("numeric", pa.int64()), ] ) - schema_file = os.path.join(tmp_path, "metadata.parquet") + schema_file = tmp_path / "metadata.parquet" pq.write_metadata(parquet_schema_types, schema_file) frame = next( @@ -224,7 +229,7 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info): ) provenance_info = reader.provenance_info() catalog_base_dir = tmp_path / "test_catalog" - os.makedirs(catalog_base_dir) + catalog_base_dir.mkdir(parents=True) io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info) with open(catalog_base_dir / "provenance_info.json", "r", encoding="utf-8") as file: @@ -235,6 +240,32 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info): assert "SECRETS" not in data +def test_indexed_csv_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=10_000).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=60).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedCsvReader(chunksize=5).read(indexed_files_dir / "csv_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader(parquet_shards_shard_44_0): """Verify we can read the parquet file into a single data frame.""" total_chunks = 0 @@ -254,12 +285,40 @@ def test_parquet_reader_chunked(parquet_shards_shard_44_0): assert total_chunks == 7 +def test_indexed_parquet_reader(indexed_files_dir): + # Chunksize covers all the inputs. + total_chunks = 0 + for frame in get_file_reader("indexed_parquet", chunksize=10_000).read( + indexed_files_dir / "parquet_list_single.txt" + ): + total_chunks += 1 + assert len(frame) == 131 + + assert total_chunks == 1 + + # Chunksize requires splitting into just a few batches. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=60).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) < 60 + + assert total_chunks == 3 + + # Requesting a very small chunksize. This will split up reads on the CSV. + total_chunks = 0 + for frame in IndexedParquetReader(chunksize=5).read(indexed_files_dir / "parquet_list_single.txt"): + total_chunks += 1 + assert len(frame) <= 5 + + assert total_chunks == 29 + + def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info): """Test that we get some provenance info and it is parseable into JSON.""" reader = ParquetReader(chunksize=1) provenance_info = reader.provenance_info() - catalog_base_dir = os.path.join(tmp_path, "test_catalog") - os.makedirs(catalog_base_dir) + catalog_base_dir = tmp_path / "test_catalog" + catalog_base_dir.mkdir(parents=True) io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info) @@ -301,14 +360,22 @@ def test_read_fits_columns(formats_fits): frame = next(FitsReader(column_names=["id", "ra", "dec"]).read(formats_fits)) assert list(frame.columns) == ["id", "ra", "dec"] + frame = next(FitsReader(column_names=["id", "ra", "dec"]).read(formats_fits, read_columns=["ra", "dec"])) + assert list(frame.columns) == ["ra", "dec"] + frame = next(FitsReader(skip_column_names=["ra_error", "dec_error"]).read(formats_fits)) assert list(frame.columns) == ["id", "ra", "dec", "test_id"] + frame = next( + FitsReader(skip_column_names=["ra_error", "dec_error"]).read(formats_fits, read_columns=["ra", "dec"]) + ) + assert list(frame.columns) == ["ra", "dec"] + def test_fits_reader_provenance_info(tmp_path, basic_catalog_info): """Test that we get some provenance info and it is parseable into JSON.""" reader = FitsReader() provenance_info = reader.provenance_info() - catalog_base_dir = os.path.join(tmp_path, "test_catalog") - os.makedirs(catalog_base_dir) + catalog_base_dir = tmp_path / "test_catalog" + catalog_base_dir.mkdir(parents=True) io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index adc1758b..34c72147 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -1,6 +1,7 @@ """Tests of map reduce operations""" import os +import pickle from io import StringIO import healpy as hp @@ -13,15 +14,24 @@ import hipscat_import.catalog.map_reduce as mr from hipscat_import.catalog.file_readers import get_file_reader +from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram +def pickle_file_reader(tmp_path, file_reader) -> str: + """Utility method to pickle a file reader, and return path to pickle.""" + pickled_reader_file = tmp_path / "reader.pickle" + with open(pickled_reader_file, "wb") as pickle_file: + pickle.dump(file_reader, pickle_file) + return pickled_reader_file + + def test_read_empty_filename(): """Empty file name""" with pytest.raises(FileNotFoundError): mr.map_to_pixels( input_file="", - file_reader=get_file_reader("parquet"), + pickled_reader_file="", highest_order=10, ra_column="ra", dec_column="dec", @@ -30,12 +40,12 @@ def test_read_empty_filename(): ) -def test_read_wrong_fileformat(small_sky_file0): +def test_read_wrong_fileformat(small_sky_file0, tmp_path): """CSV file attempting to be read as parquet""" with pytest.raises(pa.lib.ArrowInvalid): mr.map_to_pixels( input_file=small_sky_file0, - file_reader=get_file_reader("parquet"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("parquet")), highest_order=0, ra_column="ra_mean", dec_column="dec_mean", @@ -44,12 +54,12 @@ def test_read_wrong_fileformat(small_sky_file0): ) -def test_read_directory(test_data_dir): +def test_read_directory(test_data_dir, tmp_path): """Provide directory, not file""" with pytest.raises(FileNotFoundError): mr.map_to_pixels( input_file=test_data_dir, - file_reader=get_file_reader("parquet"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("parquet")), highest_order=0, ra_column="ra", dec_column="dec", @@ -58,12 +68,12 @@ def test_read_directory(test_data_dir): ) -def test_read_bad_fileformat(blank_data_file, capsys): +def test_read_bad_fileformat(blank_data_file, capsys, tmp_path): """Unsupported file format""" with pytest.raises(NotImplementedError): mr.map_to_pixels( input_file=blank_data_file, - file_reader=None, + pickled_reader_file=pickle_file_reader(tmp_path, None), highest_order=0, ra_column="ra", dec_column="dec", @@ -76,17 +86,17 @@ def test_read_bad_fileformat(blank_data_file, capsys): def read_partial_histogram(tmp_path, mapping_key): """Helper to read in the former result of a map operation.""" - histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.npz") + histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz" hist = SparseHistogram.from_file(histogram_file) return hist.to_array() def test_read_single_fits(tmp_path, formats_fits): """Success case - fits file that exists being read as fits""" - os.makedirs(os.path.join(tmp_path, "histograms")) + (tmp_path / "histograms").mkdir(parents=True) mr.map_to_pixels( input_file=formats_fits, - file_reader=get_file_reader("fits"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")), highest_order=0, ra_column="ra", dec_column="dec", @@ -101,12 +111,12 @@ def test_read_single_fits(tmp_path, formats_fits): npt.assert_array_equal(result, expected) -def test_map_headers_wrong(formats_headers_csv): +def test_map_headers_wrong(formats_headers_csv, tmp_path): """Test loading the a file with non-default headers (without specifying right headers)""" with pytest.raises(ValueError, match="columns expected but not found"): mr.map_to_pixels( input_file=formats_headers_csv, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="ra", dec_column="dec", @@ -117,10 +127,10 @@ def test_map_headers_wrong(formats_headers_csv): def test_map_headers(tmp_path, formats_headers_csv): """Test loading the a file with non-default headers""" - os.makedirs(os.path.join(tmp_path, "histograms")) + (tmp_path / "histograms").mkdir(parents=True) mr.map_to_pixels( input_file=formats_headers_csv, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="ra_mean", dec_column="dec_mean", @@ -139,11 +149,11 @@ def test_map_headers(tmp_path, formats_headers_csv): def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): - os.makedirs(os.path.join(tmp_path, "histograms")) - input_file = os.path.join(formats_dir, "hipscat_index.csv") + (tmp_path / "histograms").mkdir(parents=True) + input_file = formats_dir / "hipscat_index.csv" mr.map_to_pixels( input_file=input_file, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="NOPE", dec_column="NOPE", @@ -161,7 +171,7 @@ def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): with pytest.raises(ValueError, match="columns expected but not found"): mr.map_to_pixels( input_file=small_sky_single_file, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="NOPE", dec_column="NOPE", @@ -173,13 +183,16 @@ def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet): """Test loading the a file when using a parquet schema file for dtypes""" - os.makedirs(os.path.join(tmp_path, "histograms")) - input_file = os.path.join(mixed_schema_csv_dir, "input_01.csv") + (tmp_path / "histograms").mkdir(parents=True) + input_file = mixed_schema_csv_dir / "input_01.csv" mr.map_to_pixels( input_file=input_file, - file_reader=get_file_reader( - "csv", - schema_file=mixed_schema_csv_parquet, + pickled_reader_file=pickle_file_reader( + tmp_path, + get_file_reader( + "csv", + schema_file=mixed_schema_csv_parquet, + ), ), highest_order=0, ra_column="ra", @@ -200,10 +213,10 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" - os.makedirs(os.path.join(tmp_path, "histograms")) + (tmp_path / "histograms").mkdir(parents=True) mr.map_to_pixels( input_file=small_sky_single_file, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="ra", dec_column="dec", @@ -226,10 +239,10 @@ def test_map_small_sky_part_order1(tmp_path, small_sky_file0): Test loading a small portion of the small sky catalog and partitioning objects into four smaller buckets """ - os.makedirs(os.path.join(tmp_path, "histograms")) + (tmp_path / "histograms").mkdir(parents=True) mr.map_to_pixels( input_file=small_sky_file0, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=1, ra_column="ra", dec_column="dec", @@ -249,54 +262,54 @@ def test_map_small_sky_part_order1(tmp_path, small_sky_file0): def test_split_pixels_bad_format(blank_data_file, tmp_path, capsys): - """Test loading the a file with non-default headers""" + """Test error behavior, e.g. when alignment file is missing.""" alignment = np.full(12, None) alignment[11] = (0, 11, 131) - with pytest.raises(NotImplementedError): + with pytest.raises(FileNotFoundError): mr.split_pixels( input_file=blank_data_file, - file_reader=None, + pickled_reader_file="", highest_order=0, ra_column="ra_mean", dec_column="dec_mean", splitting_key="0", cache_shard_path=tmp_path, resume_path=tmp_path, - alignment=alignment, + alignment_file="", ) captured = capsys.readouterr() - assert "No file reader implemented" in captured.out - os.makedirs(os.path.join(tmp_path, "splitting")) + assert "No such file or directory" in captured.out def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_path): - """Test loading the a file with non-default headers""" - os.makedirs(os.path.join(tmp_path, "splitting")) - alignment = np.full(12, None) - alignment[11] = (0, 11, 131) + """Test loading a file with non-default headers""" + plan = ResumePlan(tmp_path=tmp_path, progress_bar=False, input_paths=["foo1"]) + raw_histogram = np.full(12, 0) + raw_histogram[11] = 131 + alignment_file = plan.get_alignment_file(raw_histogram, -1, 0, 0, 1_000) mr.split_pixels( input_file=formats_headers_csv, - file_reader=get_file_reader("csv"), + pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")), highest_order=0, ra_column="ra_mean", dec_column="dec_mean", splitting_key="0", cache_shard_path=tmp_path, resume_path=tmp_path, - alignment=alignment, + alignment_file=alignment_file, ) - file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_11", "shard_0_0.parquet") + file_name = tmp_path / "order_0" / "dir_0" / "pixel_11" / "shard_0_0.parquet" expected_ids = [*range(700, 708)] assert_parquet_file_ids(file_name, "object_id", expected_ids) - file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_1", "shard_0_0.parquet") + file_name = tmp_path / "order_0" / "dir_0" / "pixel_1" / "shard_0_0.parquet" assert not os.path.exists(file_name) def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path): """Test reducing into one large pixel""" - os.makedirs(os.path.join(tmp_path, "reducing")) + (tmp_path / "reducing").mkdir(parents=True) mr.reduce_pixel_shards( cache_shard_path=parquet_shards_dir, resume_path=tmp_path, @@ -312,7 +325,7 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path): delete_input_files=False, ) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet" expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) @@ -320,7 +333,7 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path): def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_path): """Test reducing with or without a _hipscat_index field""" - os.makedirs(os.path.join(tmp_path, "reducing")) + (tmp_path / "reducing").mkdir(parents=True) mr.reduce_pixel_shards( cache_shard_path=parquet_shards_dir, resume_path=tmp_path, @@ -335,7 +348,7 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p delete_input_files=False, ) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet" expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) @@ -400,10 +413,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): First, we take some time to set up these silly data points, then we test out reducing them into a single parquet file using a mix of reduction options. """ - os.makedirs(os.path.join(tmp_path, "reducing")) - shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11") - os.makedirs(shard_dir) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + (tmp_path / "reducing").mkdir(parents=True) + shard_dir = tmp_path / "reduce_shards" / "order_0" / "dir_0" / "pixel_11" + shard_dir.mkdir(parents=True) + output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet" file1_string = """source_id,object_id,time,ra,dec 1200,700,3000,282.5,-58.5 @@ -413,7 +426,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): 1404,702,3200,310.5,-27.5 1505,703,4000,286.5,-69.5""" file1_data = pd.read_csv(StringIO(file1_string)) - file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet")) + file1_data.to_parquet(shard_dir / "file_1_shard_1.parquet") file2_string = """source_id,object_id,time,ra,dec 1206,700,2000,282.5,-58.5 @@ -421,7 +434,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): 1308,701,2100,299.5,-48.5 1309,701,2000,299.5,-48.5""" file2_data = pd.read_csv(StringIO(file2_string)) - file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet")) + file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet") combined_data = pd.concat([file1_data, file2_data]) combined_data["norder19_healpix"] = hp.ang2pix( @@ -438,7 +451,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): ## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are ## (703, 700, 701, 702) mr.reduce_pixel_shards( - cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + cache_shard_path=tmp_path / "reduce_shards", resume_path=tmp_path, reducing_key="0_11", destination_pixel_order=0, @@ -475,7 +488,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): ######################## Sort option 2: by object id and time ## sort order is effectively (norder19 healpix, object id, time) mr.reduce_pixel_shards( - cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + cache_shard_path=tmp_path / "reduce_shards", resume_path=tmp_path, reducing_key="0_11", destination_pixel_order=0, @@ -512,7 +525,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): ## spatial properties for sorting, only numeric. ## sort order is effectively (object id, time) mr.reduce_pixel_shards( - cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + cache_shard_path=tmp_path / "reduce_shards", resume_path=tmp_path, reducing_key="0_11", destination_pixel_order=0, diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index 4cf7751b..a63214a8 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -2,7 +2,6 @@ import numpy.testing as npt import pytest -from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram @@ -180,17 +179,24 @@ def test_some_split_task_failures(tmp_path, dask_client): def test_get_reduce_items(tmp_path): """Test generation of remaining reduce items""" - destination_pixel_map = {HealpixPixel(0, 11): (131, [44, 45, 46])} + destination_pixel_map = [(0, 11, 131)] plan = ResumePlan(tmp_path=tmp_path, progress_bar=False) with pytest.raises(RuntimeError, match="destination pixel map"): remaining_reduce_items = plan.get_reduce_items() - remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map) + with pytest.raises(RuntimeError, match="destination pixel map"): + remaining_reduce_items = plan.get_destination_pixels() + + plan.destination_pixel_map = destination_pixel_map + remaining_reduce_items = plan.get_reduce_items() assert len(remaining_reduce_items) == 1 + all_pixels = plan.get_destination_pixels() + assert len(all_pixels) == 1 + ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key="0_11") - remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map) + remaining_reduce_items = plan.get_reduce_items() assert len(remaining_reduce_items) == 0 @@ -199,8 +205,9 @@ def test_some_reduce_task_failures(tmp_path, dask_client): """Test that we only consider reduce stage successful if all done files are written""" plan = ResumePlan(tmp_path=tmp_path, progress_bar=False) - destination_pixel_map = {HealpixPixel(0, 11): (131, [44, 45, 46])} - remaining_reduce_items = plan.get_reduce_items(destination_pixel_map=destination_pixel_map) + destination_pixel_map = [(0, 11, 131)] + plan.destination_pixel_map = destination_pixel_map + remaining_reduce_items = plan.get_reduce_items() assert len(remaining_reduce_items) == 1 ## Method doesn't FAIL, but it doesn't write out the done file either. diff --git a/tests/hipscat_import/catalog/test_run_import.py b/tests/hipscat_import/catalog/test_run_import.py index 587650c8..4821a0af 100644 --- a/tests/hipscat_import/catalog/test_run_import.py +++ b/tests/hipscat_import/catalog/test_run_import.py @@ -2,6 +2,7 @@ import os import shutil +from pathlib import Path import numpy as np import pandas as pd @@ -41,13 +42,10 @@ def test_resume_dask_runner( """Test execution in the presence of some resume files.""" ## First, copy over our intermediate files. ## This prevents overwriting source-controlled resume files. - intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate") - shutil.copytree( - os.path.join(resume_dir, "intermediate"), - intermediate_dir, - ) + intermediate_dir = tmp_path / "resume_catalog" / "intermediate" + shutil.copytree(resume_dir / "intermediate", intermediate_dir) ## Now set up our resume files to match previous work. - resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") + resume_tmp = tmp_path / "tmp" / "resume_catalog" plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False) histogram = SparseHistogram.make_from_counts([11], [131], 0) empty = SparseHistogram.make_empty(0) @@ -63,10 +61,7 @@ def test_resume_dask_runner( ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11") - shutil.copytree( - os.path.join(resume_dir, "Norder=0"), - os.path.join(tmp_path, "resume_catalog", "Norder=0"), - ) + shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0") args = ImportArguments( output_artifact_name="resume_catalog", @@ -75,7 +70,7 @@ def test_resume_dask_runner( output_path=tmp_path, dask_tmp=tmp_path, tmp_dir=tmp_path, - resume_tmp=os.path.join(tmp_path, "tmp"), + resume_tmp=tmp_path / "tmp", highest_healpix_order=0, pixel_threshold=1000, progress_bar=False, @@ -93,17 +88,14 @@ def test_resume_dask_runner( assert len(catalog.get_healpix_pixels()) == 1 # Check that the catalog parquet file exists and contains correct object IDs - output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet") + output_file = Path(args.catalog_path) / "Norder=0" / "Dir=0" / "Npix=11.parquet" expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) ## Re-running the pipeline with fully done intermediate files ## should result in no changes to output files. - shutil.copytree( - os.path.join(resume_dir, "intermediate"), - resume_tmp, - ) + shutil.copytree(resume_dir / "intermediate", resume_tmp) plan = args.resume_plan plan.touch_stage_done_file(ResumePlan.MAPPING_STAGE) plan.touch_stage_done_file(ResumePlan.SPLITTING_STAGE) @@ -145,25 +137,17 @@ def test_resume_dask_runner_diff_pixel_order( with the current HEALPix order.""" ## First, copy over our intermediate files. ## This prevents overwriting source-controlled resume files. - intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate") - shutil.copytree( - os.path.join(resume_dir, "intermediate"), - intermediate_dir, - ) + intermediate_dir = tmp_path / "resume_catalog" / "intermediate" + shutil.copytree(resume_dir / "intermediate", intermediate_dir) ## Now set up our resume files to match previous work. - resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") + resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) - SparseHistogram.make_from_counts([11], [131], 0).to_file( - os.path.join(resume_tmp, "mapping_histogram.npz") - ) + SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz") for file_index in range(0, 5): ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}") - shutil.copytree( - os.path.join(resume_dir, "Norder=0"), - os.path.join(tmp_path, "resume_catalog", "Norder=0"), - ) + shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0") with pytest.raises(ValueError, match="incompatible with the highest healpix order"): args = ImportArguments( @@ -173,7 +157,7 @@ def test_resume_dask_runner_diff_pixel_order( output_path=tmp_path, dask_tmp=tmp_path, tmp_dir=tmp_path, - resume_tmp=os.path.join(tmp_path, "tmp"), + resume_tmp=tmp_path / "tmp", constant_healpix_order=1, pixel_threshold=1000, progress_bar=False, @@ -188,7 +172,7 @@ def test_resume_dask_runner_diff_pixel_order( output_path=tmp_path, dask_tmp=tmp_path, tmp_dir=tmp_path, - resume_tmp=os.path.join(tmp_path, "tmp"), + resume_tmp=tmp_path / "tmp", constant_healpix_order=1, pixel_threshold=1000, progress_bar=False, @@ -220,7 +204,7 @@ def test_resume_dask_runner_histograms_diff_size( tmp_path, ): """Tests that the pipeline errors if the partial histograms have different sizes.""" - resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog") + resume_tmp = tmp_path / "tmp" / "resume_catalog" ResumePlan(tmp_path=resume_tmp, progress_bar=False) # We'll create mock partial histograms of size 0 and 2 @@ -246,7 +230,7 @@ def test_resume_dask_runner_histograms_diff_size( output_path=tmp_path, dask_tmp=tmp_path, tmp_dir=tmp_path, - resume_tmp=os.path.join(tmp_path, "tmp"), + resume_tmp=tmp_path / "tmp", constant_healpix_order=1, pixel_threshold=1000, progress_bar=False, diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 11e8aaf7..d80b3619 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -79,8 +79,8 @@ def test_import_mixed_schema_csv( Path(mixed_schema_csv_dir) / "input_01.csv", Path(mixed_schema_csv_dir) / "input_02.csv", ], - output_path=Path(tmp_path), - dask_tmp=Path(tmp_path), + output_path=tmp_path, + dask_tmp=tmp_path, highest_healpix_order=1, file_reader=get_file_reader( "csv", @@ -260,8 +260,8 @@ def test_import_keep_intermediate_files( """Test that ALL intermediate files are still around on-disk after successful import, when setting the appropriate flags. """ - temp = os.path.join(tmp_path, "intermediate_files") - os.makedirs(temp) + temp = tmp_path / "intermediate_files" + temp.mkdir(parents=True) args = ImportArguments( output_artifact_name="small_sky_object_catalog", input_path=small_sky_parts_dir, @@ -282,13 +282,15 @@ def test_import_keep_intermediate_files( assert catalog.catalog_path == args.catalog_path ## Check that stage-level done files are still around. - base_intermediate_dir = os.path.join(temp, "small_sky_object_catalog", "intermediate") + base_intermediate_dir = temp / "small_sky_object_catalog" / "intermediate" expected_contents = [ + "alignment.pickle", "histograms", # directory containing sub-histograms "input_paths.txt", # original input paths for subsequent comparison "mapping_done", # stage-level done file "mapping_histogram.npz", # concatenated histogram file "order_0", # all intermediate parquet files + "reader.pickle", # pickled InputReader "reducing", # directory containing task-level done files "reducing_done", # stage-level done file "splitting", # directory containing task-level done files @@ -296,21 +298,21 @@ def test_import_keep_intermediate_files( ] assert_directory_contains(base_intermediate_dir, expected_contents) - checking_dir = os.path.join(base_intermediate_dir, "histograms") + checking_dir = base_intermediate_dir / "histograms" assert_directory_contains( checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"] ) - checking_dir = os.path.join(base_intermediate_dir, "splitting") + checking_dir = base_intermediate_dir / "splitting" assert_directory_contains( checking_dir, ["split_0_done", "split_1_done", "split_2_done", "split_3_done", "split_4_done", "split_5_done"], ) - checking_dir = os.path.join(base_intermediate_dir, "reducing") + checking_dir = base_intermediate_dir / "reducing" assert_directory_contains(checking_dir, ["0_11_done"]) # Check that all of the intermediate parquet shards are still around. - checking_dir = os.path.join(base_intermediate_dir, "order_0", "dir_0", "pixel_11") + checking_dir = base_intermediate_dir / "order_0" / "dir_0" / "pixel_11" assert_directory_contains( checking_dir, [ @@ -362,6 +364,16 @@ def test_import_lowest_healpix_order( assert np.logical_and(ids >= 700, ids < 832).all() +class StarrReader(CsvReader): + """Shallow subclass""" + + def read(self, input_file, read_columns=None): + files = glob.glob(f"{input_file}/*.starr") + files.sort() + for file in files: + return super().read(file, read_columns) + + @pytest.mark.dask def test_import_starr_file( dask_client, @@ -374,15 +386,6 @@ def test_import_starr_file( as a valid InputReader implementation is provided. """ - class StarrReader(CsvReader): - """Shallow subclass""" - - def read(self, input_file, read_columns=None): - files = glob.glob(f"{input_file}/*.starr") - files.sort() - for file in files: - return super().read(file, read_columns) - args = ImportArguments( output_artifact_name="starr", input_file_list=[formats_dir], @@ -421,7 +424,7 @@ def test_import_hipscat_index( ## First, let's just check the assumptions we have about our input file: ## - should have _hipscat_index as the indexed column ## - should NOT have any columns like "ra" or "dec" - input_file = os.path.join(formats_dir, "hipscat_index.parquet") + input_file = formats_dir / "hipscat_index.parquet" expected_ids = [*range(700, 831)] assert_parquet_file_ids(input_file, "id", expected_ids) @@ -472,7 +475,7 @@ def test_import_hipscat_index_no_pandas( tmp_path, ): """Test basic execution, using a previously-computed _hipscat_index column for spatial partitioning.""" - input_file = os.path.join(formats_dir, "hipscat_index.csv") + input_file = formats_dir / "hipscat_index.csv" args = ImportArguments( output_artifact_name="using_hipscat_index", input_file_list=[input_file], @@ -514,8 +517,8 @@ def test_import_gaia_minimum( tmp_path, ): """Test end-to-end import, using a representative chunk of gaia data.""" - input_file = os.path.join(formats_dir, "gaia_minimum.csv") - schema_file = os.path.join(formats_dir, "gaia_minimum_schema.parquet") + input_file = formats_dir / "gaia_minimum.csv" + schema_file = formats_dir / "gaia_minimum_schema.parquet" args = ImportArguments( output_artifact_name="gaia_minimum", @@ -566,7 +569,7 @@ def test_gaia_ecsv( tmp_path, assert_parquet_file_ids, ): - input_file = os.path.join(formats_dir, "gaia_epoch.ecsv") + input_file = formats_dir / "gaia_epoch.ecsv" args = ImportArguments( output_artifact_name="gaia_e_astropy", @@ -658,7 +661,7 @@ def test_gaia_ecsv( # In-memory schema uses list naming convention, but pyarrow converts to # the parquet-compliant list convention when writing to disk. # Round trip the schema to get a schema with compliant nested naming convention. - schema_path = os.path.join(tmp_path, "temp_schema.parquet") + schema_path = tmp_path / "temp_schema.parquet" pq.write_table(expected_parquet_schema.empty_table(), where=schema_path) expected_parquet_schema = pq.read_metadata(schema_path).schema.to_arrow_schema() @@ -670,3 +673,50 @@ def test_gaia_ecsv( assert schema.equals(expected_parquet_schema, check_metadata=False) schema = pds.dataset(args.catalog_path, format="parquet").schema assert schema.equals(expected_parquet_schema, check_metadata=False) + + +@pytest.mark.dask +def test_import_indexed_csv( + dask_client, + indexed_files_dir, + tmp_path, +): + """Use indexed-style CSV reads. There are two index files, and we expect + to have two batches worth of intermediate files.""" + temp = tmp_path / "intermediate_files" + os.makedirs(temp) + + args = ImportArguments( + output_artifact_name="indexed_csv", + input_file_list=[ + indexed_files_dir / "csv_list_double_1_of_2.txt", + indexed_files_dir / "csv_list_double_2_of_2.txt", + ], + output_path=tmp_path, + file_reader="indexed_csv", + sort_columns="id", + tmp_dir=temp, + dask_tmp=temp, + highest_healpix_order=2, + delete_intermediate_parquet_files=False, + delete_resume_log_files=False, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that there are TWO intermediate parquet file (two index files). + assert_directory_contains( + temp / "indexed_csv" / "intermediate" / "order_0" / "dir_0" / "pixel_11", + [ + "shard_split_0_0.parquet", + "shard_split_1_0.parquet", + ], + ) diff --git a/tests/hipscat_import/catalog/test_sparse_histogram.py b/tests/hipscat_import/catalog/test_sparse_histogram.py index 52e22164..57ce78f2 100644 --- a/tests/hipscat_import/catalog/test_sparse_histogram.py +++ b/tests/hipscat_import/catalog/test_sparse_histogram.py @@ -1,7 +1,5 @@ """Test sparse histogram behavior.""" -import os - import numpy as np import numpy.testing as npt import pytest @@ -12,7 +10,7 @@ def test_read_write_round_trip(tmp_path): """Test that we can read what we write into a histogram file.""" - file_name = os.path.join(tmp_path, "round_trip.npz") + file_name = tmp_path / "round_trip.npz" histogram = SparseHistogram.make_from_counts([11], [131], 0) histogram.to_file(file_name) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 303144bc..e16e9fab 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -2,6 +2,7 @@ import os import re +from pathlib import Path import healpy as hp import numpy as np @@ -52,119 +53,117 @@ def test_long_running(): @pytest.fixture def test_data_dir(): - return os.path.join(TEST_DIR, "data") + return Path(TEST_DIR) / "data" @pytest.fixture def small_sky_dir(test_data_dir): - return os.path.join(test_data_dir, "small_sky") + return test_data_dir / "small_sky" @pytest.fixture def small_sky_single_file(test_data_dir): - return os.path.join(test_data_dir, "small_sky", "catalog.csv") + return test_data_dir / "small_sky" / "catalog.csv" @pytest.fixture def small_sky_object_catalog(test_data_dir): - return os.path.join(test_data_dir, "small_sky_object_catalog") + return test_data_dir / "small_sky_object_catalog" @pytest.fixture def small_sky_source_dir(test_data_dir): - return os.path.join(test_data_dir, "small_sky_source") + return test_data_dir / "small_sky_source" @pytest.fixture def small_sky_source_catalog(test_data_dir): - return os.path.join(test_data_dir, "small_sky_source_catalog") + return test_data_dir / "small_sky_source_catalog" @pytest.fixture def blank_data_dir(test_data_dir): - return os.path.join(test_data_dir, "blank") + return test_data_dir / "blank" @pytest.fixture def blank_data_file(test_data_dir): - return os.path.join(test_data_dir, "blank", "blank.csv") + return test_data_dir / "blank" / "blank.csv" @pytest.fixture def empty_data_dir(test_data_dir): - return os.path.join(test_data_dir, "empty") + return test_data_dir / "empty" @pytest.fixture def formats_dir(test_data_dir): - return os.path.join(test_data_dir, "test_formats") + return test_data_dir / "test_formats" @pytest.fixture def formats_headers_csv(test_data_dir): - return os.path.join(test_data_dir, "test_formats", "headers.csv") + return test_data_dir / "test_formats" / "headers.csv" @pytest.fixture def formats_pipe_csv(test_data_dir): - return os.path.join(test_data_dir, "test_formats", "pipe_delimited.csv") + return test_data_dir / "test_formats" / "pipe_delimited.csv" @pytest.fixture def formats_fits(test_data_dir): - return os.path.join(test_data_dir, "test_formats", "small_sky.fits") + return test_data_dir / "test_formats" / "small_sky.fits" @pytest.fixture def formats_pandasindex(test_data_dir): - return os.path.join(test_data_dir, "test_formats", "pandasindex.parquet") + return test_data_dir / "test_formats" / "pandasindex.parquet" + + +@pytest.fixture +def indexed_files_dir(test_data_dir): + return test_data_dir / "indexed_files" @pytest.fixture def small_sky_parts_dir(test_data_dir): - return os.path.join(test_data_dir, "small_sky_parts") + return test_data_dir / "small_sky_parts" @pytest.fixture def small_sky_file0(test_data_dir): - return os.path.join(test_data_dir, "small_sky_parts", "catalog_00_of_05.csv") + return test_data_dir / "small_sky_parts" / "catalog_00_of_05.csv" @pytest.fixture def parquet_shards_dir(test_data_dir): - return os.path.join(test_data_dir, "parquet_shards") + return test_data_dir / "parquet_shards" @pytest.fixture def soap_intermediate_dir(test_data_dir): - return os.path.join(test_data_dir, "soap_intermediate") + return test_data_dir / "soap_intermediate" @pytest.fixture def parquet_shards_shard_44_0(test_data_dir): - return os.path.join( - test_data_dir, - "parquet_shards", - "order_1", - "dir_0", - "pixel_44", - "shard_3_0.parquet", - ) + return test_data_dir / "parquet_shards" / "order_1" / "dir_0" / "pixel_44" / "shard_3_0.parquet" @pytest.fixture def mixed_schema_csv_dir(test_data_dir): - return os.path.join(test_data_dir, "mixed_schema") + return test_data_dir / "mixed_schema" @pytest.fixture def mixed_schema_csv_parquet(test_data_dir): - return os.path.join(test_data_dir, "mixed_schema", "schema.parquet") + return test_data_dir / "mixed_schema" / "schema.parquet" @pytest.fixture def resume_dir(test_data_dir): - return os.path.join(test_data_dir, "resume") + return test_data_dir / "resume" @pytest.fixture diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt new file mode 100644 index 00000000..8e9c9d54 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_1_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv + diff --git a/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt new file mode 100644 index 00000000..352c08ea --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_double_2_of_2.txt @@ -0,0 +1,3 @@ +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv \ No newline at end of file diff --git a/tests/hipscat_import/data/indexed_files/csv_list_single.txt b/tests/hipscat_import/data/indexed_files/csv_list_single.txt new file mode 100644 index 00000000..04817f83 --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/csv_list_single.txt @@ -0,0 +1,6 @@ +tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv +tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv + diff --git a/tests/hipscat_import/data/indexed_files/parquet_list_single.txt b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt new file mode 100644 index 00000000..63e5b84f --- /dev/null +++ b/tests/hipscat_import/data/indexed_files/parquet_list_single.txt @@ -0,0 +1,5 @@ +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_0_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_1_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_2_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_3_0.parquet +tests/hipscat_import/data/parquet_shards/order_0/dir_0/pixel_11/shard_4_0.parquet diff --git a/tests/hipscat_import/index/test_index_map_reduce.py b/tests/hipscat_import/index/test_index_map_reduce.py index 0dbba81d..025d0f02 100644 --- a/tests/hipscat_import/index/test_index_map_reduce.py +++ b/tests/hipscat_import/index/test_index_map_reduce.py @@ -1,7 +1,5 @@ """Tests of map reduce operations""" -import os - import numpy as np import numpy.testing as npt import pandas as pd @@ -28,7 +26,7 @@ def test_create_index( ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_object_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_object_index" / "index" / "part.0.parquet" expected_ids = [*range(700, 831)] assert_parquet_file_index(output_file, expected_ids) @@ -55,7 +53,7 @@ def test_create_index_no_hipscat_index(small_sky_object_catalog, tmp_path, dask_ ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_object_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_object_index" / "index" / "part.0.parquet" data_frame = pd.read_parquet(output_file, engine="pyarrow") npt.assert_array_equal(data_frame.columns, ["Norder", "Dir", "Npix"]) @@ -76,7 +74,7 @@ def test_create_index_no_order_pixel(small_sky_object_catalog, tmp_path, dask_cl ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_object_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_object_index" / "index" / "part.0.parquet" data_frame = pd.read_parquet(output_file, engine="pyarrow") npt.assert_array_equal(data_frame.columns, ["_hipscat_index"]) @@ -95,7 +93,7 @@ def test_create_index_source(small_sky_source_catalog, assert_parquet_file_index ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_source_index" / "index" / "part.0.parquet" expected_ids = [*range(70_000, 87_161)] assert_parquet_file_index(output_file, expected_ids) @@ -134,7 +132,7 @@ def test_create_index_with_divisions( ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_source_index" / "index" / "part.0.parquet" expected_ids = [*range(70_000, 87_161)] assert_parquet_file_index(output_file, expected_ids) @@ -167,7 +165,7 @@ def test_create_index_source_by_object( ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_source_index" / "index" / "part.0.parquet" expected_ids = np.repeat([*range(700, 831)], 131) assert_parquet_file_index(output_file, expected_ids) @@ -199,7 +197,7 @@ def test_create_index_extra_columns( ) mr.create_index(args, dask_client) - output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet") + output_file = tmp_path / "small_sky_source_index" / "index" / "part.0.parquet" expected_ids = np.repeat([*range(700, 831)], 131) assert_parquet_file_index(output_file, expected_ids) diff --git a/tests/hipscat_import/margin_cache/test_arguments_margin_cache.py b/tests/hipscat_import/margin_cache/test_arguments_margin_cache.py index 546fad04..dfa8491d 100644 --- a/tests/hipscat_import/margin_cache/test_arguments_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_arguments_margin_cache.py @@ -125,6 +125,9 @@ def test_to_catalog_info(small_sky_source_catalog, tmp_path): catalog_info = args.to_catalog_info(total_rows=10) assert catalog_info.catalog_name == args.output_artifact_name assert catalog_info.total_rows == 10 + assert catalog_info.epoch == "J2000" + assert catalog_info.ra_column == "source_ra" + assert catalog_info.dec_column == "source_dec" def test_provenance_info(small_sky_source_catalog, tmp_path): diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index c6332c21..72b93dbc 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -46,7 +46,7 @@ def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): dec_column="weird_dec", ) - path = os.path.join(tmp_path, "order_1", "dir_0", "pixel_21", "Norder=1", "Dir=0", "Npix=0.parquet") + path = tmp_path / "order_1" / "dir_0" / "pixel_21" / "Norder=1" / "Dir=0" / "Npix=0.parquet" assert os.path.exists(path) @@ -63,7 +63,7 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): dec_column="weird_dec", ) - path = os.path.join(tmp_path, "order_2", "dir_0", "pixel_15", "Norder=2", "Dir=0", "Npix=0.parquet") + path = tmp_path / "order_2" / "dir_0" / "pixel_15" / "Norder=2" / "Dir=0" / "Npix=0.parquet" assert os.path.exists(path) @@ -92,12 +92,12 @@ def test_map_pixel_shards_error(tmp_path, capsys): def test_reduce_margin_shards(tmp_path): - intermediate_dir = os.path.join(tmp_path, "intermediate") + intermediate_dir = tmp_path / "intermediate" partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) os.makedirs(shard_dir) - os.makedirs(os.path.join(intermediate_dir, "reducing")) + os.makedirs(intermediate_dir / "reducing") first_shard_path = paths.pixel_catalog_file(partition_dir, 1, 0) second_shard_path = paths.pixel_catalog_file(partition_dir, 1, 1) @@ -128,7 +128,7 @@ def test_reduce_margin_shards(tmp_path): ) # Create a schema parquet file. - schema_path = os.path.join(tmp_path, "metadata.parquet") + schema_path = tmp_path / "metadata.parquet" schema_df = test_df.drop(columns=["margin_Norder", "margin_Dir", "margin_Npix"]) schema_df.to_parquet(schema_path) @@ -176,14 +176,14 @@ def test_reduce_margin_shards(tmp_path): def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys): """Test error behavior on reduce stage. e.g. by not creating the original catalog metadata.""" - intermediate_dir = os.path.join(tmp_path, "intermediate") + intermediate_dir = tmp_path / "intermediate" partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) os.makedirs(shard_dir) - os.makedirs(os.path.join(intermediate_dir, "reducing")) + os.makedirs(intermediate_dir / "reducing") # Don't write anything at the metadata path! - schema_path = os.path.join(tmp_path, "metadata.parquet") + schema_path = tmp_path / "metadata.parquet" basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 0)) basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 1)) diff --git a/tests/hipscat_import/soap/test_soap_map_reduce.py b/tests/hipscat_import/soap/test_soap_map_reduce.py index b95745e7..ab88f176 100644 --- a/tests/hipscat_import/soap/test_soap_map_reduce.py +++ b/tests/hipscat_import/soap/test_soap_map_reduce.py @@ -2,6 +2,7 @@ import os import shutil +from pathlib import Path import numpy.testing as npt import pandas as pd @@ -19,9 +20,7 @@ def test_count_joins(small_sky_soap_args, tmp_path, small_sky_soap_maps): count_joins(small_sky_soap_args, source, objects) result = pd.read_csv( - os.path.join( - tmp_path, "small_sky_association", "intermediate", f"{source.order}_{source.pixel}.csv" - ) + tmp_path / "small_sky_association" / "intermediate" / f"{source.order}_{source.pixel}.csv" ) assert len(result) == 1 assert result["num_rows"].sum() > 0 @@ -32,16 +31,20 @@ def test_count_joins_with_leaf(small_sky_soap_args, small_sky_soap_maps): small_sky_soap_args.write_leaf_files = True small_sky_soap_args.source_id_column = "source_id" - intermediate_dir = small_sky_soap_args.tmp_path + intermediate_dir = Path(small_sky_soap_args.tmp_path) for source, objects in small_sky_soap_maps.items(): count_joins(small_sky_soap_args, source, objects) - result = pd.read_csv(os.path.join(intermediate_dir, f"{source.order}_{source.pixel}.csv")) + result = pd.read_csv(intermediate_dir / f"{source.order}_{source.pixel}.csv") assert len(result) == 1 assert result["num_rows"].sum() > 0 - parquet_file_name = os.path.join( - intermediate_dir, "order_0", "dir_0", "pixel_11", f"source_{source.order}_{source.pixel}.parquet" + parquet_file_name = ( + intermediate_dir + / "order_0" + / "dir_0" + / "pixel_11" + / f"source_{source.order}_{source.pixel}.parquet" ) assert os.path.exists(parquet_file_name), f"file not found [{parquet_file_name}]" @@ -69,9 +72,7 @@ def test_count_joins_missing(small_sky_source_catalog, tmp_path): source = HealpixPixel(2, 176) count_joins(args, source, [HealpixPixel(2, 177), HealpixPixel(2, 178)]) - result_csv = os.path.join( - tmp_path, "small_sky_association", "intermediate", f"{source.order}_{source.pixel}.csv" - ) + result_csv = tmp_path / "small_sky_association" / "intermediate" / f"{source.order}_{source.pixel}.csv" result = pd.read_csv(result_csv) assert len(result) == 3 @@ -91,11 +92,11 @@ def test_count_joins_missing(small_sky_source_catalog, tmp_path): def test_combine_results(tmp_path): """Test combining many CSVs into a single one""" - input_path = os.path.join(tmp_path, "input") - os.makedirs(input_path, exist_ok=True) + input_path = tmp_path / "input" + input_path.mkdir(parents=True) - output_path = os.path.join(tmp_path, "output") - os.makedirs(output_path, exist_ok=True) + output_path = tmp_path / "output" + output_path.mkdir(parents=True) join_info = pd.DataFrame( data=[ @@ -113,16 +114,16 @@ def test_combine_results(tmp_path): "num_rows", ], ) - partitions_csv_file = os.path.join(input_path, "0_11.csv") + partitions_csv_file = input_path / "0_11.csv" join_info.to_csv(partitions_csv_file, index=False) total_num_rows = combine_partial_results(input_path, output_path, None) assert total_num_rows == 131 - result = pd.read_csv(os.path.join(output_path, "partition_join_info.csv")) + result = pd.read_csv(output_path / "partition_join_info.csv") assert len(result) == 2 - result = pd.read_csv(os.path.join(output_path, "unmatched_sources.csv")) + result = pd.read_csv(output_path / "unmatched_sources.csv") assert len(result) == 1 diff --git a/tests/hipscat_import/test_pipeline_resume_plan.py b/tests/hipscat_import/test_pipeline_resume_plan.py index b694f33c..7334d6a2 100644 --- a/tests/hipscat_import/test_pipeline_resume_plan.py +++ b/tests/hipscat_import/test_pipeline_resume_plan.py @@ -1,19 +1,18 @@ """Test resume file operations""" -import os from pathlib import Path import numpy.testing as npt import pytest -from hipscat_import.pipeline_resume_plan import PipelineResumePlan +from hipscat_import.pipeline_resume_plan import PipelineResumePlan, get_formatted_stage_name def test_done_key(tmp_path): """Verify expected behavior of marking stage progress via done files.""" plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=False) stage = "testing" - os.makedirs(os.path.join(tmp_path, stage)) + (tmp_path / stage).mkdir(parents=True) keys = plan.read_done_keys(stage) assert len(keys) == 0 @@ -119,6 +118,28 @@ def error_on_even(argument): plan.wait_for_futures(futures, "test") +@pytest.mark.dask +def test_wait_for_futures_progress(tmp_path, dask_client, capsys): + """Test that we can wait around for futures to complete. + + Additionally test that relevant parts of the traceback are printed to stdout.""" + plan = PipelineResumePlan(tmp_path=tmp_path, progress_bar=True, simple_progress_bar=True, resume=False) + + def error_on_even(argument): + """Silly little method used to test futures that fail under predictable conditions""" + if argument % 2 == 0: + raise RuntimeError("we are at odds with evens") + + ## Everything is fine if we're all odd, but use a silly name so it's + ## clear that the stage name is present, and well-formatted. + futures = [dask_client.submit(error_on_even, 1)] + plan.wait_for_futures(futures, "teeeest") + + captured = capsys.readouterr() + assert "Teeeest" in captured.err + assert "100%" in captured.err + + @pytest.mark.dask def test_wait_for_futures_fail_fast(tmp_path, dask_client): """Test that we can wait around for futures to complete. @@ -138,16 +159,16 @@ def error_on_even(argument): def test_formatted_stage_name(): """Test that we make pretty stage names for presenting in progress bars""" - formatted = PipelineResumePlan.get_formatted_stage_name(None) + formatted = get_formatted_stage_name(None) assert formatted == "Progress " - formatted = PipelineResumePlan.get_formatted_stage_name("") + formatted = get_formatted_stage_name("") assert formatted == "Progress " - formatted = PipelineResumePlan.get_formatted_stage_name("stage") + formatted = get_formatted_stage_name("stage") assert formatted == "Stage " - formatted = PipelineResumePlan.get_formatted_stage_name("very long stage name") + formatted = get_formatted_stage_name("very long stage name") assert formatted == "Very long stage name" diff --git a/tests/hipscat_import/test_runtime_arguments.py b/tests/hipscat_import/test_runtime_arguments.py index 69aded26..cea801cc 100644 --- a/tests/hipscat_import/test_runtime_arguments.py +++ b/tests/hipscat_import/test_runtime_arguments.py @@ -1,7 +1,5 @@ """Tests of argument validation""" -import os - import pytest from hipscat_import.runtime_arguments import RuntimeArguments @@ -70,12 +68,12 @@ def test_good_paths(tmp_path): def test_tmp_path_creation(tmp_path): """Check that we create a new temp path for this catalog.""" - output_path = os.path.join(tmp_path, "unique_output_directory") - temp_path = os.path.join(tmp_path, "unique_tmp_directory") - dask_tmp_path = os.path.join(tmp_path, "unique_dask_directory") - os.makedirs(output_path, exist_ok=True) - os.makedirs(temp_path, exist_ok=True) - os.makedirs(dask_tmp_path, exist_ok=True) + output_path = tmp_path / "unique_output_directory" + temp_path = tmp_path / "unique_tmp_directory" + dask_tmp_path = tmp_path / "unique_dask_directory" + output_path.mkdir(parents=True) + temp_path.mkdir(parents=True) + dask_tmp_path.mkdir(parents=True) ## If no tmp paths are given, use the output directory args = RuntimeArguments( diff --git a/tests/hipscat_import/verification/test_verification_arguments.py b/tests/hipscat_import/verification/test_verification_arguments.py index 8ebd6c81..303a63f7 100644 --- a/tests/hipscat_import/verification/test_verification_arguments.py +++ b/tests/hipscat_import/verification/test_verification_arguments.py @@ -62,7 +62,7 @@ def test_catalog_object(tmp_path, small_sky_object_catalog): output_path=tmp_path, output_artifact_name="small_sky_object_verification_report", ) - assert args.input_catalog_path == small_sky_object_catalog + assert args.input_catalog_path == str(small_sky_object_catalog) assert str(args.output_path) == tmp_path_str assert str(args.tmp_path).startswith(tmp_path_str)