diff --git a/docs/catalogs/arguments.rst b/docs/catalogs/arguments.rst index b7c7e0a9..dc3f1b6e 100644 --- a/docs/catalogs/arguments.rst +++ b/docs/catalogs/arguments.rst @@ -169,7 +169,7 @@ You can find the full API documentation for ) If you're reading from cloud storage, or otherwise have some filesystem credential -dict, put those in ``input_storage_options``. +dict, initialize ``input_file`` using ``universal_pathlib``'s utilities. Indexed batching strategy ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -304,7 +304,7 @@ preferable to delete any existing contents, however, as this may cause unexpected side effects. If you're writing to cloud storage, or otherwise have some filesystem credential -dict, put those in ``output_storage_options``. +dict, initialize ``output_path`` using ``universal_pathlib``'s utilities. In addition, you can specify directories to use for various intermediate files: diff --git a/docs/guide/index_table.rst b/docs/guide/index_table.rst index 74382e58..eb816bf2 100644 --- a/docs/guide/index_table.rst +++ b/docs/guide/index_table.rst @@ -229,7 +229,7 @@ preferable to delete any existing contents, however, as this may cause unexpected side effects. If you're writing to cloud storage, or otherwise have some filesystem credential -dict, put those in ``output_storage_options``. +dict, initialize ``output_path`` using ``universal_pathlib``'s utilities. In addition, you can specify directories to use for various intermediate files: diff --git a/docs/guide/margin_cache.rst b/docs/guide/margin_cache.rst index 92ea90f7..d481b519 100644 --- a/docs/guide/margin_cache.rst +++ b/docs/guide/margin_cache.rst @@ -141,7 +141,7 @@ preferable to delete any existing contents, however, as this may cause unexpected side effects. If you're writing to cloud storage, or otherwise have some filesystem credential -dict, put those in ``output_storage_options``. +dict, initialize ``output_path`` using ``universal_pathlib``'s utilities. In addition, you can specify directories to use for various intermediate files: diff --git a/pyproject.toml b/pyproject.toml index 71377ad3..9be58956 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pyyaml", "scipy", "tqdm", + "universal_pathlib", ] # On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes) diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index f90fcf5a..89d0c144 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -3,11 +3,12 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Dict, List, Union +from pathlib import Path +from typing import List from hipscat.catalog.catalog import CatalogInfo -from hipscat.io import FilePointer from hipscat.pixel_math import hipscat_id +from upath import UPath from hipscat_import.catalog.file_readers import InputReader, get_file_reader from hipscat_import.runtime_arguments import RuntimeArguments, find_input_paths @@ -24,14 +25,12 @@ class ImportArguments(RuntimeArguments): catalog_type: str = "object" """level of catalog data, object (things in the sky) or source (detections)""" - input_path: FilePointer | None = None + input_path: str | Path | UPath | None = None """path to search for the input data""" - input_file_list: List[FilePointer] = field(default_factory=list) + input_file_list: List[str | Path | UPath] = field(default_factory=list) """can be used instead of input_path to import only specified files""" - input_paths: List[FilePointer] = field(default_factory=list) + input_paths: List[str | Path | UPath] = field(default_factory=list) """resolved list of all files that will be used in the importer""" - input_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the INPUT.""" ra_column: str = "ra" """column for right ascension""" @@ -45,7 +44,7 @@ class ImportArguments(RuntimeArguments): resolve the counter within the same higher-order pixel space""" add_hipscat_index: bool = True """add the hipscat spatial index field alongside the data""" - use_schema_file: str | None = None + use_schema_file: str | Path | UPath | None = None """path to a parquet file with schema metadata. this will be used for column metadata when writing the files, if specified""" expected_total_rows: int = 0 @@ -130,12 +129,7 @@ def _check_arguments(self): raise ValueError("When using _hipscat_index for position, no sort columns should be added") # Basic checks complete - make more checks and create directories where necessary - self.input_paths = find_input_paths( - self.input_path, - "**/*.*", - self.input_file_list, - storage_options=self.input_storage_options, - ) + self.input_paths = find_input_paths(self.input_path, "**/*.*", self.input_file_list) def to_catalog_info(self, total_rows) -> CatalogInfo: """Catalog-type-specific dataset info.""" diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 6b4c7599..300717d9 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -1,7 +1,6 @@ """File reading generators for common file types.""" import abc -from typing import Any, Dict, Union import pandas as pd import pyarrow @@ -9,7 +8,8 @@ import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader from astropy.table import Table -from hipscat.io import FilePointer, file_io +from hipscat.io import file_io +from upath import UPath # pylint: disable=too-few-public-methods,too-many-arguments @@ -113,30 +113,40 @@ def provenance_info(self) -> dict: all_args["kwargs"]["storage_options"] = "REDACTED" return {"input_reader_type": type(self).__name__, **vars(self)} - def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs): + def regular_file_exists(self, input_file, **_kwargs): """Check that the `input_file` points to a single regular file Raises: FileNotFoundError: if nothing exists at path, or directory found. """ - if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options): + if not file_io.does_file_or_directory_exist(input_file): raise FileNotFoundError(f"File not found at path: {input_file}") - if not file_io.is_regular_file(input_file, storage_options=storage_options): + if not file_io.is_regular_file(input_file): raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}") - def read_index_file(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **kwargs): + def read_index_file(self, input_file, upath_kwargs=None, **kwargs): """Read an "indexed" file. This should contain a list of paths to files to be read and batched. + In order to create a valid connection to the string paths, provide any + additional universal pathlib (i.e. fsspec) arguments to the `upath_kwargs` kwarg. + In this way, the "index" file may contain a list of paths on a remote service, + and the `upath_kwargs` will be used to create a connection to that remote service. + Raises: FileNotFoundError: if nothing exists at path, or directory found. """ + input_file = file_io.get_upath(input_file) self.regular_file_exists(input_file, **kwargs) - file_names = file_io.load_text_file(input_file, storage_options=storage_options) + file_names = file_io.load_text_file(input_file) file_names = [f.strip() for f in file_names] - file_names = [f for f in file_names if f] - return file_names + if upath_kwargs is None: + upath_kwargs = {} + + file_paths = [UPath(f, **upath_kwargs) for f in file_names if f] + + return file_paths class CsvReader(InputReader): @@ -170,6 +180,7 @@ def __init__( column_names=None, type_map=None, parquet_kwargs=None, + upath_kwargs=None, **kwargs, ): self.chunksize = chunksize @@ -178,6 +189,7 @@ def __init__( self.column_names = column_names self.type_map = type_map self.parquet_kwargs = parquet_kwargs + self.upath_kwargs = upath_kwargs self.kwargs = kwargs schema_parquet = None @@ -185,7 +197,7 @@ def __init__( if self.parquet_kwargs is None: self.parquet_kwargs = {} schema_parquet = file_io.read_parquet_file_to_pandas( - FilePointer(self.schema_file), + self.schema_file, **self.parquet_kwargs, ) @@ -206,7 +218,7 @@ def read(self, input_file, read_columns=None): self.kwargs["usecols"] = read_columns return file_io.load_csv_to_pandas_generator( - FilePointer(input_file), + input_file, chunksize=self.chunksize, header=self.header, **self.kwargs, @@ -220,11 +232,13 @@ class IndexedCsvReader(CsvReader): """ def read(self, input_file, read_columns=None): - file_names = self.read_index_file(input_file=input_file, **self.kwargs) + file_paths = self.read_index_file( + input_file=input_file, upath_kwargs=self.upath_kwargs, **self.kwargs + ) batch_size = 0 batch_frames = [] - for file in file_names: + for file in file_paths: for single_frame in super().read(file, read_columns=read_columns): if batch_size + len(single_frame) >= self.chunksize: # We've hit our chunksize, send the batch off to the task. @@ -382,6 +396,7 @@ def __init__( fragment_readahead=4, use_threads=True, column_names=None, + upath_kwargs=None, **kwargs, ): self.chunksize = chunksize @@ -389,11 +404,14 @@ def __init__( self.fragment_readahead = fragment_readahead self.use_threads = use_threads self.column_names = column_names + self.upath_kwargs = upath_kwargs self.kwargs = kwargs def read(self, input_file, read_columns=None): columns = read_columns or self.column_names - file_names = self.read_index_file(input_file=input_file, **self.kwargs) + file_names = self.read_index_file( + input_file=input_file, upath_kwargs=self.upath_kwargs, **self.kwargs + ) (_, input_dataset) = file_io.read_parquet_dataset(file_names, **self.kwargs) batches, nrows = [], 0 diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 5c6508c0..799c3339 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -1,16 +1,16 @@ """Import a set of non-hipscat files using dask for parallelization""" import pickle -from typing import Any, Dict, Union import hipscat.pixel_math.healpix_shim as hp import numpy as np import pyarrow as pa import pyarrow.parquet as pq from hipscat import pixel_math -from hipscat.io import FilePointer, file_io, paths +from hipscat.io import file_io, paths from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix +from upath import UPath from hipscat_import.catalog.resume_plan import ResumePlan from hipscat_import.catalog.sparse_histogram import SparseHistogram @@ -34,7 +34,7 @@ def _has_named_index(dataframe): def _iterate_input_file( - input_file: FilePointer, + input_file: UPath, pickled_reader_file: str, highest_order, ra_column, @@ -67,9 +67,9 @@ def _iterate_input_file( def map_to_pixels( - input_file: FilePointer, + input_file: UPath, pickled_reader_file: str, - resume_path: FilePointer, + resume_path: UPath, mapping_key, highest_order, ra_column, @@ -79,10 +79,10 @@ def map_to_pixels( """Map a file of input objects to their healpix pixels. Args: - input_file (FilePointer): file to read for catalog data. + input_file (UPath): file to read for catalog data. file_reader (hipscat_import.catalog.file_readers.InputReader): instance of input reader that specifies arguments necessary for reading from the input file. - resume_path (FilePointer): where to write resume partial results. + resume_path (UPath): where to write resume partial results. mapping_key (str): unique counter for this input file, used when creating intermediate files highest_order (int): healpix order to use when mapping @@ -127,21 +127,21 @@ def map_to_pixels( def split_pixels( - input_file: FilePointer, + input_file: UPath, pickled_reader_file: str, splitting_key, highest_order, ra_column, dec_column, - cache_shard_path: FilePointer, - resume_path: FilePointer, + cache_shard_path: UPath, + resume_path: UPath, alignment_file=None, use_hipscat_index=False, ): """Map a file of input objects to their healpix pixels and split into shards. Args: - input_file (FilePointer): file to read for catalog data. + input_file (UPath): file to read for catalog data. file_reader (hipscat_import.catalog.file_readers.InputReader): instance of input reader that specifies arguments necessary for reading from the input file. splitting_key (str): unique counter for this input file, used @@ -149,8 +149,8 @@ def split_pixels( highest_order (int): healpix order to use when mapping ra_column (str): where to find right ascension data in the dataframe dec_column (str): where to find declation in the dataframe - cache_shard_path (FilePointer): where to write intermediate parquet files. - resume_path (FilePointer): where to write resume files. + cache_shard_path (UPath): where to write intermediate parquet files. + resume_path (UPath): where to write resume files. Raises: ValueError: if the `ra_column` or `dec_column` cannot be found in the input file. @@ -177,9 +177,9 @@ def split_pixels( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" ) if _has_named_index(filtered_data): - filtered_data.to_parquet(output_file, index=True) + filtered_data.to_parquet(output_file.path, index=True, filesystem=output_file.fs) else: - filtered_data.to_parquet(output_file, index=False) + filtered_data.to_parquet(output_file.path, index=False, filesystem=output_file.fs) del filtered_data, data_indexes ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key) @@ -203,7 +203,6 @@ def reduce_pixel_shards( add_hipscat_index=True, delete_input_files=True, use_schema_file="", - storage_options: Union[Dict[Any, Any], None] = None, ): """Reduce sharded source pixels into destination pixels. @@ -225,8 +224,8 @@ def reduce_pixel_shards( for more in-depth discussion of this field. Args: - cache_shard_path (FilePointer): where to read intermediate parquet files. - resume_path (FilePointer): where to write resume files. + cache_shard_path (UPath): where to read intermediate parquet files. + resume_path (UPath): where to write resume files. reducing_key (str): unique string for this task, used for resume files. origin_pixel_numbers (list[int]): high order pixels, with object data written to intermediate directories. @@ -234,7 +233,7 @@ def reduce_pixel_shards( destination_pixel_number (int): pixel number at the above order destination_pixel_size (int): expected number of rows to write for the catalog's final pixel - output_path (FilePointer): where to write the final catalog pixel data + output_path (UPath): where to write the final catalog pixel data sort_columns (str): column for survey identifier, or other sortable column add_hipscat_index (bool): should we add a _hipscat_index column to the resulting parquet file? @@ -251,20 +250,16 @@ def reduce_pixel_shards( destination_dir = paths.pixel_directory( output_path, destination_pixel_order, destination_pixel_number ) - file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options) + file_io.make_directory(destination_dir, exist_ok=True) - destination_file = paths.pixel_catalog_file( - output_path, destination_pixel_order, destination_pixel_number - ) + healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) + destination_file = paths.pixel_catalog_file(output_path, healpix_pixel) schema = None if use_schema_file: - schema = file_io.read_parquet_metadata( - use_schema_file, storage_options=storage_options - ).schema.to_arrow_schema() + schema = file_io.read_parquet_metadata(use_schema_file).schema.to_arrow_schema() tables = [] - healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) if schema: @@ -310,16 +305,16 @@ def reduce_pixel_shards( if schema: schema = _modify_arrow_schema(schema, add_hipscat_index) - dataframe.to_parquet(destination_file, schema=schema, storage_options=storage_options) + dataframe.to_parquet(destination_file.path, schema=schema, filesystem=destination_file.fs) else: - dataframe.to_parquet(destination_file, storage_options=storage_options) + dataframe.to_parquet(destination_file.path, filesystem=destination_file.fs) del dataframe, merged_table, tables if delete_input_files: pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) - file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options) + file_io.remove_directory(pixel_dir, ignore_errors=True) ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key) except Exception as exception: # pylint: disable=broad-exception-caught diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 83de436f..03acb7c9 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -6,13 +6,14 @@ from dataclasses import dataclass, field from typing import List, Optional, Tuple -import hipscat.pixel_math as hist import hipscat.pixel_math.healpix_shim as hp import numpy as np from hipscat import pixel_math -from hipscat.io import FilePointer, file_io +from hipscat.io import file_io +from hipscat.pixel_math import empty_histogram from hipscat.pixel_math.healpix_pixel import HealpixPixel from numpy import frombuffer +from upath import UPath from hipscat_import.catalog.sparse_histogram import SparseHistogram from hipscat_import.pipeline_resume_plan import PipelineResumePlan @@ -22,7 +23,7 @@ class ResumePlan(PipelineResumePlan): """Container class for holding the state of each file in the pipeline plan.""" - input_paths: List[FilePointer] = field(default_factory=list) + input_paths: List[UPath] = field(default_factory=list) """resolved list of all files that will be used in the importer""" map_files: List[Tuple[str, str]] = field(default_factory=list) """list of files (and job keys) that have yet to be mapped""" @@ -51,10 +52,9 @@ def __init__( simple_progress_bar: bool = False, input_paths=None, tmp_path=None, - tmp_base_path: FilePointer | None = None, + tmp_base_path: UPath | None = None, delete_resume_log_files: bool = True, delete_intermediate_parquet_files: bool = True, - output_storage_options: dict | None = None, run_stages: List[str] | None = None, import_args=None, ): @@ -67,7 +67,6 @@ def __init__( tmp_base_path=import_args.tmp_base_path, delete_resume_log_files=import_args.delete_resume_log_files, delete_intermediate_parquet_files=import_args.delete_intermediate_parquet_files, - output_storage_options=import_args.output_storage_options, ) if import_args.debug_stats_only: run_stages = ["mapping", "finishing"] @@ -81,7 +80,6 @@ def __init__( tmp_base_path=tmp_base_path, delete_resume_log_files=delete_resume_log_files, delete_intermediate_parquet_files=delete_intermediate_parquet_files, - output_storage_options=output_storage_options, ) self.input_paths = input_paths self.gather_plan(run_stages) @@ -177,7 +175,7 @@ def read_histogram(self, healpix_order): if len(remaining_map_files) > 0: raise RuntimeError(f"{len(remaining_map_files)} map stages did not complete successfully.") histogram_files = file_io.find_files_matching_path(self.tmp_path, self.HISTOGRAMS_DIR, "*.npz") - aggregate_histogram = hist.empty_histogram(healpix_order) + aggregate_histogram = empty_histogram(healpix_order) for partial_file_name in histogram_files: partial = SparseHistogram.from_file(partial_file_name) partial_as_array = partial.to_array() @@ -274,7 +272,7 @@ def get_alignment_file( pixel_threshold, drop_empty_siblings, expected_total_rows, - ) -> str: + ) -> UPath: """Get a pointer to the existing alignment file for the pipeline, or generate a new alignment using provided arguments. diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 4b932713..20ca721f 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -114,7 +114,6 @@ def run(args, client): use_schema_file=args.use_schema_file, use_hipscat_index=args.use_hipscat_index, delete_input_files=args.delete_intermediate_parquet_files, - storage_options=args.output_storage_options, ) ) @@ -128,34 +127,25 @@ def run(args, client): catalog_base_dir=args.catalog_path, dataset_info=catalog_info, tool_args=args.provenance_info(), - storage_options=args.output_storage_options, ) step_progress.update(1) - io.write_catalog_info( - catalog_base_dir=args.catalog_path, - dataset_info=catalog_info, - storage_options=args.output_storage_options, - ) + io.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=catalog_info) step_progress.update(1) partition_info = PartitionInfo.from_healpix(resume_plan.get_destination_pixels()) partition_info_file = paths.get_partition_info_pointer(args.catalog_path) - partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options) + partition_info.write_to_file(partition_info_file) if not args.debug_stats_only: - parquet_rows = write_parquet_metadata( - args.catalog_path, storage_options=args.output_storage_options - ) + parquet_rows = write_parquet_metadata(args.catalog_path) if total_rows > 0 and parquet_rows != total_rows: raise ValueError( f"Number of rows in parquet ({parquet_rows}) " f"does not match expectation ({total_rows})" ) else: - partition_info.write_to_metadata_files( - args.catalog_path, storage_options=args.output_storage_options - ) + partition_info.write_to_metadata_files(args.catalog_path) step_progress.update(1) - io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options) + io.write_fits_map(args.catalog_path, raw_histogram) step_progress.update(1) resume_plan.clean_resume_files() step_progress.update(1) diff --git a/src/hipscat_import/index/arguments.py b/src/hipscat_import/index/arguments.py index 13d7d25e..1fb6fa6b 100644 --- a/src/hipscat_import/index/arguments.py +++ b/src/hipscat_import/index/arguments.py @@ -1,11 +1,15 @@ """Utility to hold all arguments required throughout indexing""" +from __future__ import annotations + from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Union +from pathlib import Path +from typing import List, Optional from hipscat.catalog import Catalog from hipscat.catalog.index.index_catalog_info import IndexCatalogInfo from hipscat.io.validation import is_valid_catalog +from upath import UPath from hipscat_import.runtime_arguments import RuntimeArguments @@ -15,10 +19,8 @@ class IndexArguments(RuntimeArguments): """Data class for holding indexing arguments""" ## Input - input_catalog_path: str = "" + input_catalog_path: str | Path | UPath | None = None input_catalog: Optional[Catalog] = None - input_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the INPUT.""" indexing_column: str = "" extra_columns: List[str] = field(default_factory=list) @@ -58,11 +60,9 @@ def _check_arguments(self): if not self.include_hipscat_index and not self.include_order_pixel: raise ValueError("At least one of include_hipscat_index or include_order_pixel must be True") - if not is_valid_catalog(self.input_catalog_path, storage_options=self.input_storage_options): + if not is_valid_catalog(self.input_catalog_path): raise ValueError("input_catalog_path not a valid catalog") - self.input_catalog = Catalog.read_from_hipscat( - catalog_path=self.input_catalog_path, storage_options=self.input_storage_options - ) + self.input_catalog = Catalog.read_from_hipscat(catalog_path=self.input_catalog_path) if self.include_radec: catalog_info = self.input_catalog.catalog_info self.extra_columns.extend([catalog_info.ra_column, catalog_info.dec_column]) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index 09f41af3..8bba30ba 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -2,23 +2,19 @@ import dask.dataframe as dd import numpy as np -import pandas as pd -from hipscat.io import paths +from hipscat.io import file_io, paths from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN -def read_leaf_file( - input_file, include_columns, include_hipscat_index, drop_duplicates, schema, storage_options -): +def read_leaf_file(input_file, include_columns, include_hipscat_index, drop_duplicates, schema): """Mapping function called once per input file. Reads the leaf parquet file, and returns with appropriate columns and duplicates dropped.""" - data = pd.read_parquet( + data = file_io.read_parquet_file_to_pandas( input_file, columns=include_columns, engine="pyarrow", schema=schema, - storage_options=storage_options, ) data = data.reset_index() @@ -39,23 +35,18 @@ def create_index(args, client): if args.include_order_pixel: include_columns.extend(["Norder", "Dir", "Npix"]) - index_dir = paths.append_paths_to_pointer(args.catalog_path, "index") + index_dir = file_io.get_upath(args.catalog_path / "index") data = dd.from_map( read_leaf_file, [ - paths.pixel_catalog_file( - catalog_base_dir=args.input_catalog.catalog_base_dir, - pixel_order=pixel.order, - pixel_number=pixel.pixel, - ) + paths.pixel_catalog_file(catalog_base_dir=args.input_catalog.catalog_base_dir, pixel=pixel) for pixel in args.input_catalog.get_healpix_pixels() ], include_columns=include_columns, include_hipscat_index=args.include_hipscat_index, drop_duplicates=args.drop_duplicates, schema=args.input_catalog.schema, - storage_options=args.input_storage_options, ) if args.include_order_pixel: @@ -75,10 +66,10 @@ def create_index(args, client): # Now just write it out to leaf parquet files! result = data.to_parquet( - path=index_dir, + path=index_dir.path, engine="pyarrow", compute_kwargs={"partition_size": args.compute_partition_size}, - storage_options=args.output_storage_options, + filesystem=index_dir.fs, ) client.compute(result) return len(data) diff --git a/src/hipscat_import/index/run_index.py b/src/hipscat_import/index/run_index.py index bbf870d1..fc324af2 100644 --- a/src/hipscat_import/index/run_index.py +++ b/src/hipscat_import/index/run_index.py @@ -27,20 +27,11 @@ def run(args, client): catalog_base_dir=args.catalog_path, dataset_info=index_catalog_info, tool_args=args.provenance_info(), - storage_options=args.output_storage_options, ) step_progress.update(1) - write_metadata.write_catalog_info( - catalog_base_dir=args.catalog_path, - dataset_info=index_catalog_info, - storage_options=args.output_storage_options, - ) + write_metadata.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=index_catalog_info) step_progress.update(1) - file_io.remove_directory( - args.tmp_path, ignore_errors=True, storage_options=args.output_storage_options - ) + file_io.remove_directory(args.tmp_path, ignore_errors=True) step_progress.update(1) - parquet_metadata.write_parquet_metadata( - args.catalog_path, order_by_healpix=False, storage_options=args.output_storage_options - ) + parquet_metadata.write_parquet_metadata(args.catalog_path, order_by_healpix=False) step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 8d0a512c..c9815139 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -21,13 +21,12 @@ def generate_margin_cache(args, client): if not resume_plan.is_mapping_done(): futures = [] for mapping_key, pix in resume_plan.get_remaining_map_keys(): - partition_file = paths.pixel_catalog_file(args.input_catalog_path, pix.order, pix.pixel) + partition_file = paths.pixel_catalog_file(args.input_catalog_path, pix) futures.append( client.submit( mcmr.map_pixel_shards, partition_file=partition_file, mapping_key=mapping_key, - input_storage_options=args.input_storage_options, original_catalog_metadata=original_catalog_metadata, margin_pair_file=resume_plan.margin_pair_file, margin_threshold=args.margin_threshold, @@ -49,27 +48,21 @@ def generate_margin_cache(args, client): intermediate_directory=args.tmp_path, reducing_key=reducing_key, output_path=args.catalog_path, - output_storage_options=args.output_storage_options, partition_order=pix.order, partition_pixel=pix.pixel, original_catalog_metadata=original_catalog_metadata, delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, - input_storage_options=args.input_storage_options, ) ) resume_plan.wait_for_reducing(futures) with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: - total_rows = parquet_metadata.write_parquet_metadata( - args.catalog_path, storage_options=args.output_storage_options - ) + total_rows = parquet_metadata.write_parquet_metadata(args.catalog_path) step_progress.update(1) metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) - partition_info = PartitionInfo.read_from_file( - metadata_path, storage_options=args.output_storage_options - ) + partition_info = PartitionInfo.read_from_file(metadata_path) partition_info_file = paths.get_partition_info_pointer(args.catalog_path) - partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options) + partition_info.write_to_file(partition_info_file) step_progress.update(1) margin_catalog_info = args.to_catalog_info(int(total_rows)) @@ -77,15 +70,10 @@ def generate_margin_cache(args, client): catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info, tool_args=args.provenance_info(), - storage_options=args.output_storage_options, ) write_metadata.write_catalog_info( - catalog_base_dir=args.catalog_path, - dataset_info=margin_catalog_info, - storage_options=args.output_storage_options, + catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info ) step_progress.update(1) - file_io.remove_directory( - args.tmp_path, ignore_errors=True, storage_options=args.output_storage_options - ) + file_io.remove_directory(args.tmp_path, ignore_errors=True) step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache_arguments.py b/src/hipscat_import/margin_cache/margin_cache_arguments.py index d3afef48..e65f8542 100644 --- a/src/hipscat_import/margin_cache/margin_cache_arguments.py +++ b/src/hipscat_import/margin_cache/margin_cache_arguments.py @@ -1,11 +1,15 @@ +from __future__ import annotations + from dataclasses import dataclass, field -from typing import Any, Dict, List, Union +from pathlib import Path +from typing import List import hipscat.pixel_math.healpix_shim as hp from hipscat.catalog import Catalog from hipscat.catalog.margin_cache.margin_cache_catalog_info import MarginCacheCatalogInfo from hipscat.io.validation import is_valid_catalog from hipscat.pixel_math.healpix_pixel import HealpixPixel +from upath import UPath from hipscat_import.runtime_arguments import RuntimeArguments @@ -34,10 +38,8 @@ class MarginCacheArguments(RuntimeArguments): """should we delete task-level done files once each stage is complete? if False, we will keep all done marker files at the end of the pipeline.""" - input_catalog_path: str = "" + input_catalog_path: str | Path | UPath | None = None """the path to the hipscat-formatted input catalog.""" - input_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the INPUT.""" debug_filter_pixel_list: List[HealpixPixel] = field(default_factory=list) """debug setting. if provided, we will first filter the catalog to the pixels provided. this can be useful for creating a margin over a subset of a catalog.""" @@ -49,12 +51,10 @@ def _check_arguments(self): super()._check_arguments() if not self.input_catalog_path: raise ValueError("input_catalog_path is required") - if not is_valid_catalog(self.input_catalog_path, storage_options=self.input_storage_options): + if not is_valid_catalog(self.input_catalog_path): raise ValueError("input_catalog_path not a valid catalog") - self.catalog = Catalog.read_from_hipscat( - self.input_catalog_path, storage_options=self.input_storage_options - ) + self.catalog = Catalog.read_from_hipscat(self.input_catalog_path) if len(self.debug_filter_pixel_list) > 0: self.catalog = self.catalog.filter_from_pixel_list(self.debug_filter_pixel_list) if len(self.catalog.get_healpix_pixels()) == 0: diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index e30ae9e7..d63d705e 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -16,7 +16,6 @@ def map_pixel_shards( partition_file, mapping_key, - input_storage_options, original_catalog_metadata, margin_pair_file, margin_threshold, @@ -28,12 +27,8 @@ def map_pixel_shards( ): """Creates margin cache shards from a source partition file.""" try: - schema = file_io.read_parquet_metadata( - original_catalog_metadata, storage_options=input_storage_options - ).schema.to_arrow_schema() - data = file_io.read_parquet_file_to_pandas( - partition_file, schema=schema, storage_options=input_storage_options - ) + schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema() + data = file_io.read_parquet_file_to_pandas(partition_file, schema=schema) source_pixel = HealpixPixel(data["Norder"].iloc[0], data["Npix"].iloc[0]) # Constrain the possible margin pairs, first by only those `margin_order` pixels @@ -114,7 +109,7 @@ def _to_pixel_shard( file_io.make_directory(shard_dir, exist_ok=True) - shard_path = paths.pixel_catalog_file(partition_dir, source_pixel.order, source_pixel.pixel) + shard_path = paths.pixel_catalog_file(partition_dir, source_pixel) rename_columns = { PartitionInfo.METADATA_ORDER_COLUMN_NAME: f"margin_{PartitionInfo.METADATA_ORDER_COLUMN_NAME}", @@ -137,29 +132,24 @@ def _to_pixel_shard( ) margin_data = margin_data.sort_index() - margin_data.to_parquet(shard_path) + margin_data.to_parquet(shard_path.path, filesystem=shard_path.fs) def reduce_margin_shards( intermediate_directory, reducing_key, output_path, - output_storage_options, partition_order, partition_pixel, original_catalog_metadata, delete_intermediate_parquet_files, - input_storage_options, ): """Reduce all partition pixel directories into a single file""" try: - shard_dir = get_pixel_cache_directory( - intermediate_directory, HealpixPixel(partition_order, partition_pixel) - ) + healpix_pixel = HealpixPixel(partition_order, partition_pixel) + shard_dir = get_pixel_cache_directory(intermediate_directory, healpix_pixel) if file_io.does_file_or_directory_exist(shard_dir): - schema = file_io.read_parquet_metadata( - original_catalog_metadata, storage_options=input_storage_options - ).schema.to_arrow_schema() + schema = file_io.read_parquet_metadata(original_catalog_metadata).schema.to_arrow_schema() schema = ( schema.append(pa.field("margin_Norder", pa.uint8())) @@ -171,16 +161,12 @@ def reduce_margin_shards( if len(full_df): margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel) - file_io.make_directory( - margin_cache_dir, exist_ok=True, storage_options=output_storage_options - ) + file_io.make_directory(margin_cache_dir, exist_ok=True) - margin_cache_file_path = paths.pixel_catalog_file( - output_path, partition_order, partition_pixel - ) + margin_cache_file_path = paths.pixel_catalog_file(output_path, healpix_pixel) full_df.to_parquet( - margin_cache_file_path, schema=schema, storage_options=output_storage_options + margin_cache_file_path.path, schema=schema, filesystem=margin_cache_file_path.fs ) if delete_intermediate_parquet_files: file_io.remove_directory(shard_dir) diff --git a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py index f8105c04..000e1ae2 100644 --- a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py @@ -37,7 +37,6 @@ def __init__(self, args: MarginCacheArguments): tmp_base_path=args.tmp_base_path, delete_resume_log_files=args.delete_resume_log_files, delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, - output_storage_options=args.output_storage_options, ) self._gather_plan(args) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index 52161555..95a966af 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -8,22 +8,21 @@ from dask.distributed import as_completed, get_worker from dask.distributed import print as dask_print -from hipscat.io import FilePointer, file_io +from hipscat.io import file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel from tqdm.auto import tqdm as auto_tqdm from tqdm.std import tqdm as std_tqdm +from upath import UPath @dataclass class PipelineResumePlan: """Container class for holding the state of pipeline plan.""" - tmp_path: FilePointer + tmp_path: UPath """path for any intermediate files""" - tmp_base_path: FilePointer | None = None + tmp_base_path: str | Path | UPath | None = None """temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user""" - output_storage_options: dict | None = None - """optional dictionary of abstract filesystem credentials for the output.""" resume: bool = True """if there are existing intermediate resume files, should we read those and continue to run pipeline where we left off""" @@ -52,12 +51,10 @@ def safe_to_resume(self): """ if file_io.directory_has_contents(self.tmp_path): if not self.resume: - file_io.remove_directory( - self.tmp_path, ignore_errors=True, storage_options=self.output_storage_options - ) + file_io.remove_directory(self.tmp_path, ignore_errors=True) else: print(f"tmp_path ({self.tmp_path}) contains intermediate files; resuming prior progress.") - file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options) + file_io.make_directory(self.tmp_path, exist_ok=True) def done_file_exists(self, stage_name): """Is there a file at a given path? @@ -120,8 +117,7 @@ def get_keys_from_file_names(directory, extension): result_files = file_io.find_files_matching_path(directory, f"*{extension}") keys = [] for file_path in result_files: - result_file_name = file_io.get_basename_from_filepointer(file_path) - match = re.match(r"(.*)" + extension, str(result_file_name)) + match = re.match(r"(.*)" + extension, str(file_path.name)) keys.append(match.group(1)) return keys @@ -132,7 +128,7 @@ def clean_resume_files(self): # Use the temporary directory base path if the user provided it, or # delete the intermediate directory from inside the output directory path = self.tmp_base_path if self.tmp_base_path is not None else self.tmp_path - file_io.remove_directory(path, ignore_errors=True, storage_options=self.output_storage_options) + file_io.remove_directory(path, ignore_errors=True) def wait_for_futures(self, futures, stage_name, fail_fast=False): """Wait for collected futures to complete. @@ -191,9 +187,9 @@ def check_original_input_paths(self, input_paths): """ if not input_paths: return [] - input_paths = set(input_paths) - input_paths = [str(p) for p in input_paths] - input_paths.sort() + expected_input_paths = set(input_paths) + expected_input_paths = [str(p) for p in input_paths] + expected_input_paths.sort() original_input_paths = [] @@ -212,7 +208,7 @@ def check_original_input_paths(self, input_paths): for path in input_paths: file_handle.write(f"{path}\n") else: - if original_input_paths != input_paths: + if original_input_paths != expected_input_paths: raise ValueError("Different file set from resumed pipeline execution.") return input_paths diff --git a/src/hipscat_import/runtime_arguments.py b/src/hipscat_import/runtime_arguments.py index b9d54a55..eae0ad89 100644 --- a/src/hipscat_import/runtime_arguments.py +++ b/src/hipscat_import/runtime_arguments.py @@ -5,9 +5,10 @@ import re from dataclasses import dataclass from importlib.metadata import version -from typing import Any, Dict, Union +from pathlib import Path -from hipscat.io import FilePointer, file_io +from hipscat.io import file_io +from upath import UPath # pylint: disable=too-many-instance-attributes @@ -17,15 +18,13 @@ class RuntimeArguments: """Data class for holding runtime arguments""" ## Output - output_path: str = "" + output_path: str | Path | UPath | None = None """base path where new catalog should be output""" output_artifact_name: str = "" """short, convenient name for the catalog""" - output_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the OUTPUT.""" ## Execution - tmp_dir: str = "" + tmp_dir: str | Path | UPath | None = None """path for storing intermediate files""" resume: bool = True """If True, we try to read any existing intermediate files and continue to run @@ -38,27 +37,27 @@ class RuntimeArguments: """if displaying a progress bar, use a text-only simple progress bar instead of widget. this can be useful in some environments when running in a notebook where ipywidgets cannot be used (see `progress_bar` argument)""" - dask_tmp: str = "" + dask_tmp: str | Path | UPath | None = None """directory for dask worker space. this should be local to the execution of the pipeline, for speed of reads and writes""" dask_n_workers: int = 1 """number of workers for the dask client""" dask_threads_per_worker: int = 1 """number of threads per dask worker""" - resume_tmp: str = "" + resume_tmp: str | Path | UPath | None = None """directory for intermediate resume files, when needed. see RTD for more info.""" completion_email_address: str = "" """if provided, send an email to the indicated email address once the import pipeline has complete.""" - catalog_path: FilePointer | None = None + catalog_path: UPath | None = None """constructed output path for the catalog that will be something like /""" - tmp_path: FilePointer | None = None + tmp_path: UPath | None = None """constructed temp path - defaults to tmp_dir, then dask_tmp, but will create a new temp directory under catalog_path if no other options are provided""" - tmp_base_path: FilePointer | None = None + tmp_base_path: UPath | None = None """temporary base directory: either `tmp_dir` or `dask_dir`, if those were provided by the user""" def __post_init__(self): @@ -77,12 +76,10 @@ def _check_arguments(self): if self.dask_threads_per_worker <= 0: raise ValueError("dask_threads_per_worker should be greater than 0") - self.catalog_path = file_io.append_paths_to_pointer(self.output_path, self.output_artifact_name) + self.catalog_path = file_io.get_upath(self.output_path) / self.output_artifact_name if not self.resume: - file_io.remove_directory( - self.catalog_path, ignore_errors=True, storage_options=self.output_storage_options - ) - file_io.make_directory(self.catalog_path, exist_ok=True, storage_options=self.output_storage_options) + file_io.remove_directory(self.catalog_path, ignore_errors=True) + file_io.make_directory(self.catalog_path, exist_ok=True) if self.tmp_dir and str(self.tmp_dir) != str(self.output_path): if not file_io.does_file_or_directory_exist(self.tmp_dir): @@ -100,7 +97,7 @@ def _check_arguments(self): self.tmp_base_path = self.dask_tmp else: self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate") - file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options) + file_io.make_directory(self.tmp_path, exist_ok=True) if self.resume_tmp: self.resume_tmp = file_io.append_paths_to_pointer(self.resume_tmp, self.output_artifact_name) else: @@ -138,9 +135,7 @@ def additional_runtime_provenance_info(self): return {} -def find_input_paths( - input_path="", file_matcher="", input_file_list=None, storage_options: Union[Dict[Any, Any], None] = None -): +def find_input_paths(input_path="", file_matcher="", input_file_list=None): """Helper method to find input paths, given either a prefix and format, or an explicit list of paths. @@ -158,12 +153,10 @@ def find_input_paths( if input_file_list: raise ValueError("exactly one of input_path or input_file_list is required") - if not file_io.does_file_or_directory_exist(input_path, storage_options=storage_options): + if not file_io.does_file_or_directory_exist(input_path): raise FileNotFoundError("input_path not found on local storage") - input_paths = file_io.find_files_matching_path( - input_path, file_matcher, include_protocol=True, storage_options=storage_options - ) - elif not input_file_list is None: + input_paths = file_io.find_files_matching_path(input_path, file_matcher) + elif input_file_list is not None: # It's common for users to accidentally pass in an empty list. Give them a friendly error. if len(input_file_list) == 0: raise ValueError("input_file_list is empty") diff --git a/src/hipscat_import/soap/arguments.py b/src/hipscat_import/soap/arguments.py index cd4c5259..2cfe5fe1 100644 --- a/src/hipscat_import/soap/arguments.py +++ b/src/hipscat_import/soap/arguments.py @@ -1,10 +1,13 @@ +from __future__ import annotations + from dataclasses import dataclass -from typing import Any, Dict, Union +from pathlib import Path from hipscat.catalog import Catalog from hipscat.catalog.association_catalog.association_catalog import AssociationCatalogInfo from hipscat.catalog.catalog_type import CatalogType from hipscat.io.validation import is_valid_catalog +from upath import UPath from hipscat_import.runtime_arguments import RuntimeArguments @@ -14,17 +17,13 @@ class SoapArguments(RuntimeArguments): """Data class for holding source-object association arguments""" ## Input - Object catalog - object_catalog_dir: str = "" + object_catalog_dir: str | Path | UPath | None = None object_id_column: str = "" - object_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the OBJECT catalog.""" ## Input - Source catalog - source_catalog_dir: str = "" + source_catalog_dir: str | Path | UPath | None = None source_object_id_column: str = "" source_id_column: str = "" - source_storage_options: Union[Dict[Any, Any], None] = None - """optional dictionary of abstract filesystem credentials for the SOURCE catalog.""" resume: bool = True """if there are existing intermediate resume files, should we @@ -50,23 +49,19 @@ def _check_arguments(self): raise ValueError("object_catalog_dir is required") if not self.object_id_column: raise ValueError("object_id_column is required") - if not is_valid_catalog(self.object_catalog_dir, storage_options=self.object_storage_options): + if not is_valid_catalog(self.object_catalog_dir): raise ValueError("object_catalog_dir not a valid catalog") - self.object_catalog = Catalog.read_from_hipscat( - catalog_path=self.object_catalog_dir, storage_options=self.object_storage_options - ) + self.object_catalog = Catalog.read_from_hipscat(catalog_path=self.object_catalog_dir) if not self.source_catalog_dir: raise ValueError("source_catalog_dir is required") if not self.source_object_id_column: raise ValueError("source_object_id_column is required") - if not is_valid_catalog(self.source_catalog_dir, storage_options=self.source_storage_options): + if not is_valid_catalog(self.source_catalog_dir): raise ValueError("source_catalog_dir not a valid catalog") - self.source_catalog = Catalog.read_from_hipscat( - catalog_path=self.source_catalog_dir, storage_options=self.source_storage_options - ) + self.source_catalog = Catalog.read_from_hipscat(catalog_path=self.source_catalog_dir) if self.compute_partition_size < 100_000: raise ValueError("compute_partition_size must be at least 100_000") diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 9f36e809..009d921e 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -6,8 +6,7 @@ import pandas as pd import pyarrow.parquet as pq from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo -from hipscat.io import FilePointer, file_io, paths -from hipscat.io.file_io.file_pointer import get_fs, strip_leading_slash_for_pyarrow +from hipscat.io import file_io, paths from hipscat.io.parquet_metadata import get_healpix_pixel_from_metadata from hipscat.pixel_math.healpix_pixel import HealpixPixel from hipscat.pixel_math.healpix_pixel_function import get_pixel_argsort @@ -18,16 +17,11 @@ def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args): - object_path = paths.pixel_catalog_file( - catalog_base_dir=soap_args.object_catalog_dir, - pixel_order=object_pixel.order, - pixel_number=object_pixel.pixel, - ) + object_path = paths.pixel_catalog_file(soap_args.object_catalog_dir, object_pixel) object_data = file_io.read_parquet_file_to_pandas( object_path, columns=[soap_args.object_id_column], schema=soap_args.object_catalog.schema, - storage_options=soap_args.object_storage_options, ).set_index(soap_args.object_id_column) joined_data = source_data.merge(object_data, how="inner", left_index=True, right_index=True) @@ -80,9 +74,7 @@ def _write_count_results(cache_path, source_healpix, results): file_io.write_dataframe_to_csv( dataframe=dataframe, - file_pointer=file_io.append_paths_to_pointer( - cache_path, f"{source_healpix.order}_{source_healpix.pixel}.csv" - ), + file_pointer=cache_path / f"{source_healpix.order}_{source_healpix.pixel}.csv", index=False, ) @@ -98,11 +90,7 @@ def count_joins(soap_args: SoapArguments, source_pixel: HealpixPixel, object_pix of the object catalog to be joined. """ try: - source_path = paths.pixel_catalog_file( - catalog_base_dir=file_io.get_file_pointer_from_path(soap_args.source_catalog_dir), - pixel_order=source_pixel.order, - pixel_number=source_pixel.pixel, - ) + source_path = paths.pixel_catalog_file(soap_args.source_catalog_dir, source_pixel) if soap_args.write_leaf_files and soap_args.source_object_id_column != soap_args.source_id_column: read_columns = [soap_args.source_object_id_column, soap_args.source_id_column] else: @@ -111,7 +99,6 @@ def count_joins(soap_args: SoapArguments, source_pixel: HealpixPixel, object_pix source_path, columns=read_columns, schema=soap_args.source_catalog.schema, - storage_options=soap_args.source_storage_options, ).set_index(soap_args.source_object_id_column) remaining_sources = len(source_data) @@ -139,7 +126,7 @@ def count_joins(soap_args: SoapArguments, source_pixel: HealpixPixel, object_pix raise exception -def combine_partial_results(input_path, output_path, output_storage_options) -> int: +def combine_partial_results(input_path, output_path) -> int: """Combine many partial CSVs into single partition join info. Also write out a debug file with counts of unmatched sources, if any. @@ -164,30 +151,21 @@ def combine_partial_results(input_path, output_path, output_storage_options) -> unmatched = dataframe.loc[dataframe["Norder"] == -1] file_io.write_dataframe_to_csv( - dataframe=matched, - file_pointer=file_io.append_paths_to_pointer(output_path, "partition_join_info.csv"), - index=False, - storage_options=output_storage_options, + dataframe=matched, file_pointer=output_path / "partition_join_info.csv", index=False ) if len(unmatched) > 0: file_io.write_dataframe_to_csv( - dataframe=unmatched, - file_pointer=file_io.append_paths_to_pointer(output_path, "unmatched_sources.csv"), - index=False, - storage_options=output_storage_options, + dataframe=unmatched, file_pointer=output_path / "unmatched_sources.csv", index=False ) primary_only = matched.groupby(["Norder", "Dir", "Npix"])["num_rows"].sum().reset_index() file_io.write_dataframe_to_csv( - dataframe=primary_only, - file_pointer=file_io.append_paths_to_pointer(output_path, "partition_info.csv"), - index=False, - storage_options=output_storage_options, + dataframe=primary_only, file_pointer=output_path / "partition_info.csv", index=False ) join_info = PartitionJoinInfo(matched) - join_info.write_to_metadata_files(output_path, storage_options=output_storage_options) + join_info.write_to_metadata_files(output_path) return primary_only["num_rows"].sum() @@ -226,18 +204,13 @@ def reduce_joins( shards.append(pq.read_table(shard_file_name)) # Write all of the shards into a single parquet file, one row-group-per-shard. - starting_catalog_path = FilePointer(str(soap_args.catalog_path)) - destination_dir = paths.pixel_directory(starting_catalog_path, object_pixel.order, object_pixel.pixel) - file_io.make_directory( - destination_dir, exist_ok=True, storage_options=soap_args.output_storage_options + destination_dir = paths.pixel_directory( + soap_args.catalog_path, object_pixel.order, object_pixel.pixel ) + file_io.make_directory(destination_dir, exist_ok=True) - output_file = paths.pixel_catalog_file(starting_catalog_path, object_pixel.order, object_pixel.pixel) - file_system, output_file = get_fs( - file_pointer=output_file, storage_options=soap_args.output_storage_options - ) - output_file = strip_leading_slash_for_pyarrow(output_file, protocol=file_system.protocol) - with pq.ParquetWriter(output_file, shards[0].schema, filesystem=file_system) as writer: + output_file = paths.pixel_catalog_file(soap_args.catalog_path, object_pixel) + with pq.ParquetWriter(output_file.path, shards[0].schema, filesystem=output_file.fs) as writer: for table in shards: writer.write_table(table) diff --git a/src/hipscat_import/soap/resume_plan.py b/src/hipscat_import/soap/resume_plan.py index 18e98ec1..a77eb25d 100644 --- a/src/hipscat_import/soap/resume_plan.py +++ b/src/hipscat_import/soap/resume_plan.py @@ -43,7 +43,6 @@ def __init__(self, args: SoapArguments): tmp_base_path=args.tmp_base_path, delete_resume_log_files=args.delete_resume_log_files, delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, - output_storage_options=args.output_storage_options, ) self.gather_plan(args) @@ -60,16 +59,12 @@ def gather_plan(self, args): return step_progress.update(1) - self.object_catalog = Catalog.read_from_hipscat( - args.object_catalog_dir, storage_options=args.object_storage_options - ) + self.object_catalog = Catalog.read_from_hipscat(args.object_catalog_dir) source_map_file = file_io.append_paths_to_pointer(self.tmp_path, self.SOURCE_MAP_FILE) if file_io.does_file_or_directory_exist(source_map_file): source_pixel_map = np.load(source_map_file, allow_pickle=True)["arr_0"].item() else: - source_catalog = Catalog.read_from_hipscat( - args.source_catalog_dir, storage_options=args.source_storage_options - ) + source_catalog = Catalog.read_from_hipscat(args.source_catalog_dir) source_pixel_map = source_to_object_map(self.object_catalog, source_catalog) np.savez_compressed(source_map_file, source_pixel_map) self.count_keys = self.get_sources_to_count(source_pixel_map=source_pixel_map) diff --git a/src/hipscat_import/soap/run_soap.py b/src/hipscat_import/soap/run_soap.py index cc721138..d5f7a0cf 100644 --- a/src/hipscat_import/soap/run_soap.py +++ b/src/hipscat_import/soap/run_soap.py @@ -50,35 +50,21 @@ def run(args, client): # All done - write out the metadata with resume_plan.print_progress(total=4, stage_name="Finishing") as step_progress: if args.write_leaf_files: - total_rows = parquet_metadata.write_parquet_metadata( - args.catalog_path, - storage_options=args.output_storage_options, - ) + total_rows = parquet_metadata.write_parquet_metadata(args.catalog_path) metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path) - partition_join_info = PartitionJoinInfo.read_from_file( - metadata_path, storage_options=args.output_storage_options - ) - partition_join_info.write_to_csv( - catalog_path=args.catalog_path, storage_options=args.output_storage_options - ) + partition_join_info = PartitionJoinInfo.read_from_file(metadata_path) + partition_join_info.write_to_csv(catalog_path=args.catalog_path) else: - total_rows = combine_partial_results( - args.tmp_path, args.catalog_path, args.output_storage_options - ) + total_rows = combine_partial_results(args.tmp_path, args.catalog_path) step_progress.update(1) catalog_info = args.to_catalog_info(total_rows) write_metadata.write_provenance_info( catalog_base_dir=args.catalog_path, dataset_info=catalog_info, tool_args=args.provenance_info(), - storage_options=args.output_storage_options, ) step_progress.update(1) - write_metadata.write_catalog_info( - dataset_info=catalog_info, - catalog_base_dir=args.catalog_path, - storage_options=args.output_storage_options, - ) + write_metadata.write_catalog_info(dataset_info=catalog_info, catalog_base_dir=args.catalog_path) step_progress.update(1) resume_plan.clean_resume_files() step_progress.update(1) diff --git a/src/hipscat_import/verification/arguments.py b/src/hipscat_import/verification/arguments.py index a0ec752a..f04c602c 100644 --- a/src/hipscat_import/verification/arguments.py +++ b/src/hipscat_import/verification/arguments.py @@ -1,9 +1,14 @@ """Utility to hold all arguments required throughout verification pipeline""" +from __future__ import annotations + from dataclasses import dataclass, field +from pathlib import Path from typing import List, Optional from hipscat.catalog import Catalog +from hipscat.io.validation import is_valid_catalog +from upath import UPath from hipscat_import.runtime_arguments import RuntimeArguments @@ -13,7 +18,7 @@ class VerificationArguments(RuntimeArguments): """Data class for holding verification arguments""" ## Input - input_catalog_path: str + input_catalog_path: str | Path | UPath | None = None """Path to an existing catalog that will be inspected.""" ## Verification options diff --git a/tests/hipscat_import/catalog/test_argument_validation.py b/tests/hipscat_import/catalog/test_argument_validation.py index 9213751a..2f98f431 100644 --- a/tests/hipscat_import/catalog/test_argument_validation.py +++ b/tests/hipscat_import/catalog/test_argument_validation.py @@ -107,7 +107,7 @@ def test_good_paths(blank_data_dir, blank_data_file, tmp_path): ) assert args.input_path == blank_data_dir assert len(args.input_paths) == 1 - assert str(blank_data_file) in args.input_paths[0] + assert str(blank_data_file) in str(args.input_paths[0]) def test_multiple_files_in_path(small_sky_parts_dir, tmp_path): @@ -263,7 +263,6 @@ def test_write_provenance_info(formats_dir, tmp_path): catalog_base_dir=args.catalog_path, dataset_info=args.to_catalog_info(0), tool_args=args.provenance_info(), - storage_options=args.output_storage_options, ) diff --git a/tests/hipscat_import/catalog/test_resume_plan.py b/tests/hipscat_import/catalog/test_resume_plan.py index 7fb6945d..705e6079 100644 --- a/tests/hipscat_import/catalog/test_resume_plan.py +++ b/tests/hipscat_import/catalog/test_resume_plan.py @@ -58,15 +58,14 @@ def test_same_input_paths(tmp_path, small_sky_single_file, formats_headers_csv): input_paths=[small_sky_single_file, small_sky_single_file], ) - ## Includes a duplicate file, but that's ok. - plan = ResumePlan( - tmp_path=tmp_path, - progress_bar=False, - resume=True, - input_paths=[small_sky_single_file, small_sky_single_file, formats_headers_csv], - ) - map_files = plan.map_files - assert len(map_files) == 2 + ## Includes a duplicate file, and we don't like that. + with pytest.raises(ValueError, match="Different file set"): + ResumePlan( + tmp_path=tmp_path, + progress_bar=False, + resume=True, + input_paths=[small_sky_single_file, small_sky_single_file, formats_headers_csv], + ) def test_read_write_histogram(tmp_path): diff --git a/tests/hipscat_import/margin_cache/test_margin_cache.py b/tests/hipscat_import/margin_cache/test_margin_cache.py index 01de850c..3b79e25b 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache.py @@ -7,6 +7,7 @@ from hipscat.catalog import PartitionInfo from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset from hipscat.io import paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel import hipscat_import.margin_cache.margin_cache as mc from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments @@ -31,7 +32,7 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): norder = 1 npix = 47 - test_file = paths.pixel_catalog_file(args.catalog_path, norder, npix) + test_file = paths.pixel_catalog_file(args.catalog_path, HealpixPixel(norder, npix)) data = pd.read_parquet(test_file) @@ -92,7 +93,7 @@ def test_margin_cache_gen_negative_pixels(small_sky_source_catalog, tmp_path, da norder = 0 npix = 7 - negative_test_file = paths.pixel_catalog_file(args.catalog_path, norder, npix) + negative_test_file = paths.pixel_catalog_file(args.catalog_path, HealpixPixel(norder, npix)) negative_data = pd.read_parquet(negative_test_file) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 9dbe77e1..7e029423 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -81,9 +81,8 @@ def test_map_pixel_shards_error(tmp_path, capsys): catalog parquet files.""" with pytest.raises(FileNotFoundError): margin_cache_map_reduce.map_pixel_shards( - paths.pixel_catalog_file(tmp_path, 1, 0), + paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 0)), mapping_key="1_21", - input_storage_options=None, original_catalog_metadata="", margin_pair_file="", margin_threshold=10, @@ -95,7 +94,7 @@ def test_map_pixel_shards_error(tmp_path, capsys): ) captured = capsys.readouterr() - assert "No such file or directory" in captured.out + assert "Parquet file does not exist" in captured.out @pytest.mark.timeout(30) @@ -106,7 +105,6 @@ def test_map_pixel_shards_fine(tmp_path, test_data_dir, small_sky_source_catalog margin_cache_map_reduce.map_pixel_shards( small_sky_source_catalog / "Norder=1" / "Dir=0" / "Npix=47.parquet", mapping_key="1_47", - input_storage_options=None, original_catalog_metadata=small_sky_source_catalog / "_common_metadata", margin_pair_file=test_data_dir / "margin_pairs" / "small_sky_source_pairs.csv", margin_threshold=3600, @@ -136,7 +134,6 @@ def test_map_pixel_shards_coarse(tmp_path, test_data_dir, small_sky_source_catal margin_cache_map_reduce.map_pixel_shards( small_sky_source_catalog / "Norder=1" / "Dir=0" / "Npix=47.parquet", mapping_key="1_47", - input_storage_options=None, original_catalog_metadata=small_sky_source_catalog / "_common_metadata", margin_pair_file=test_data_dir / "margin_pairs" / "small_sky_source_pairs.csv", margin_threshold=3600, @@ -166,8 +163,8 @@ def test_reduce_margin_shards(tmp_path): os.makedirs(shard_dir) os.makedirs(intermediate_dir / "reducing") - first_shard_path = paths.pixel_catalog_file(partition_dir, 1, 0) - second_shard_path = paths.pixel_catalog_file(partition_dir, 1, 1) + first_shard_path = paths.pixel_catalog_file(partition_dir, HealpixPixel(1, 0)) + second_shard_path = paths.pixel_catalog_file(partition_dir, HealpixPixel(1, 1)) ras = np.arange(0.0, 360.0) dec = np.full(360, 0.0) @@ -208,15 +205,13 @@ def test_reduce_margin_shards(tmp_path): intermediate_dir, "1_21", tmp_path, - None, 1, 21, original_catalog_metadata=schema_path, delete_intermediate_parquet_files=False, - input_storage_options=None, ) - result_path = paths.pixel_catalog_file(tmp_path, 1, 21) + result_path = paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 21)) validate_result_dataframe(result_path, 720) assert os.path.exists(shard_dir) @@ -226,15 +221,13 @@ def test_reduce_margin_shards(tmp_path): intermediate_dir, "1_21", tmp_path, - None, 1, 21, original_catalog_metadata=schema_path, delete_intermediate_parquet_files=True, - input_storage_options=None, ) - result_path = paths.pixel_catalog_file(tmp_path, 1, 21) + result_path = paths.pixel_catalog_file(tmp_path, HealpixPixel(1, 21)) validate_result_dataframe(result_path, 720) assert not os.path.exists(shard_dir) @@ -252,21 +245,19 @@ def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys): # Don't write anything at the metadata path! schema_path = tmp_path / "metadata.parquet" - basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 0)) - basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, 1, 1)) + basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, HealpixPixel(1, 0))) + basic_data_shard_df.to_parquet(paths.pixel_catalog_file(partition_dir, HealpixPixel(1, 1))) with pytest.raises(FileNotFoundError): margin_cache_map_reduce.reduce_margin_shards( intermediate_dir, "1_21", tmp_path, - None, 1, 21, original_catalog_metadata=schema_path, delete_intermediate_parquet_files=True, - input_storage_options=None, ) captured = capsys.readouterr() - assert "No such file or directory" in captured.out + assert "Parquet file does not exist" in captured.out diff --git a/tests/hipscat_import/margin_cache/test_margin_round_trip.py b/tests/hipscat_import/margin_cache/test_margin_round_trip.py index 9e598c7f..b557cb77 100644 --- a/tests/hipscat_import/margin_cache/test_margin_round_trip.py +++ b/tests/hipscat_import/margin_cache/test_margin_round_trip.py @@ -8,6 +8,7 @@ from hipscat.catalog.catalog import Catalog from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset from hipscat.io import paths +from hipscat.pixel_math.healpix_pixel import HealpixPixel import hipscat_import.catalog.run_import as runner import hipscat_import.margin_cache.margin_cache as mc @@ -68,7 +69,7 @@ def test_margin_import_gaia_minimum( norder = 0 npix = 0 - test_file = paths.pixel_catalog_file(args.catalog_path, norder, npix) + test_file = paths.pixel_catalog_file(args.catalog_path, HealpixPixel(norder, npix)) data = pd.read_parquet(test_file) @@ -122,7 +123,7 @@ def test_margin_import_mixed_schema_csv( norder = 2 npix = 187 - test_file = paths.pixel_catalog_file(args.catalog_path, norder, npix) + test_file = paths.pixel_catalog_file(args.catalog_path, HealpixPixel(norder, npix)) data = pd.read_parquet(test_file) diff --git a/tests/hipscat_import/soap/test_soap_map_reduce.py b/tests/hipscat_import/soap/test_soap_map_reduce.py index ab88f176..c605eb29 100644 --- a/tests/hipscat_import/soap/test_soap_map_reduce.py +++ b/tests/hipscat_import/soap/test_soap_map_reduce.py @@ -117,7 +117,7 @@ def test_combine_results(tmp_path): partitions_csv_file = input_path / "0_11.csv" join_info.to_csv(partitions_csv_file, index=False) - total_num_rows = combine_partial_results(input_path, output_path, None) + total_num_rows = combine_partial_results(input_path, output_path) assert total_num_rows == 131 result = pd.read_csv(output_path / "partition_join_info.csv") diff --git a/tests/hipscat_import/verification/test_verification_arguments.py b/tests/hipscat_import/verification/test_verification_arguments.py index 303a63f7..8ebd6c81 100644 --- a/tests/hipscat_import/verification/test_verification_arguments.py +++ b/tests/hipscat_import/verification/test_verification_arguments.py @@ -62,7 +62,7 @@ def test_catalog_object(tmp_path, small_sky_object_catalog): output_path=tmp_path, output_artifact_name="small_sky_object_verification_report", ) - assert args.input_catalog_path == str(small_sky_object_catalog) + assert args.input_catalog_path == small_sky_object_catalog assert str(args.output_path) == tmp_path_str assert str(args.tmp_path).startswith(tmp_path_str)