Skip to content

Commit

Permalink
Pass storage options to read/write operations. (#178)
Browse files Browse the repository at this point in the history
* Pass storage options to read/write operations.

* Require updated hipscat

* Require python >= 3.9

* Bump required hipscat version

* Remove extraneous space
  • Loading branch information
delucchi-cmu authored Dec 6, 2023
1 parent b800040 commit f50adee
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"dask[distributed]",
"deprecated",
"healpy",
"hipscat >= 0.1.5",
"hipscat >= 0.1.8",
"ipykernel", # Support for Jupyter notebooks
"pandas < 2.1.0",
"pyarrow",
Expand Down
11 changes: 9 additions & 2 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import List
from typing import Any, Dict, List, Union

from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer
Expand Down Expand Up @@ -35,6 +35,8 @@ class ImportArguments(RuntimeArguments):
"""can be used instead of `input_format` to import only specified files"""
input_paths: List[FilePointer] = field(default_factory=list)
"""resolved list of all files that will be used in the importer"""
input_storage_options: Union[Dict[Any, Any], None] = None
"""optional dictionary of abstract filesystem credentials for the INPUT."""

ra_column: str = "ra"
"""column for right ascension"""
Expand Down Expand Up @@ -106,7 +108,12 @@ def _check_arguments(self):
self.file_reader = get_file_reader(self.input_format)

# Basic checks complete - make more checks and create directories where necessary
self.input_paths = find_input_paths(self.input_path, f"*{self.input_format}", self.input_file_list)
self.input_paths = find_input_paths(
self.input_path,
f"*{self.input_format}",
self.input_file_list,
storage_options=self.input_storage_options,
)
self.resume_plan = ResumePlan(
resume=self.resume,
progress_bar=self.progress_bar,
Expand Down
13 changes: 7 additions & 6 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""File reading generators for common file types."""

import abc
from typing import Any, Dict, Union

import pyarrow.parquet as pq
from astropy.table import Table
Expand Down Expand Up @@ -86,15 +87,15 @@ def provenance_info(self) -> dict:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""

def regular_file_exists(self, input_file):
def regular_file_exists(self, input_file, storage_options: Union[Dict[Any, Any], None] = None, **_kwargs):
"""Check that the `input_file` points to a single regular file
Raises
FileNotFoundError: if nothing exists at path, or directory found.
"""
if not file_io.does_file_or_directory_exist(input_file):
if not file_io.does_file_or_directory_exist(input_file, storage_options=storage_options):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file):
if not file_io.is_regular_file(input_file, storage_options=storage_options):
raise FileNotFoundError(f"Directory found at path - requires regular file: {input_file}")


Expand Down Expand Up @@ -136,7 +137,7 @@ def __init__(
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)
self.regular_file_exists(input_file, **self.kwargs)

if self.schema_file:
schema_parquet = file_io.load_parquet_to_pandas(
Expand Down Expand Up @@ -218,7 +219,7 @@ def __init__(self, chunksize=500_000, column_names=None, skip_column_names=None,
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)
self.regular_file_exists(input_file, **self.kwargs)
table = Table.read(input_file, memmap=True, **self.kwargs)
if self.column_names:
table.keep_columns(self.column_names)
Expand Down Expand Up @@ -256,7 +257,7 @@ def __init__(self, chunksize=500_000, **kwargs):
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)
self.regular_file_exists(input_file, **self.kwargs)
parquet_file = pq.ParquetFile(input_file, **self.kwargs)
for smaller_table in parquet_file.iter_batches(batch_size=self.chunksize, use_pandas_metadata=True):
yield smaller_table.to_pandas()
Expand Down
13 changes: 9 additions & 4 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Import a set of non-hipscat files using dask for parallelization"""

from typing import Any, Dict, Union

import healpy as hp
import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -199,6 +201,7 @@ def reduce_pixel_shards(
add_hipscat_index=True,
delete_input_files=True,
use_schema_file="",
storage_options: Union[Dict[Any, Any], None] = None,
):
"""Reduce sharded source pixels into destination pixels.
Expand Down Expand Up @@ -243,15 +246,17 @@ def reduce_pixel_shards(
`destination_pixel_size`
"""
destination_dir = paths.pixel_directory(output_path, destination_pixel_order, destination_pixel_number)
file_io.make_directory(destination_dir, exist_ok=True)
file_io.make_directory(destination_dir, exist_ok=True, storage_options=storage_options)

destination_file = paths.pixel_catalog_file(
output_path, destination_pixel_order, destination_pixel_number
)

schema = None
if use_schema_file:
schema = file_io.read_parquet_metadata(use_schema_file).schema.to_arrow_schema()
schema = file_io.read_parquet_metadata(
use_schema_file, storage_options=storage_options
).schema.to_arrow_schema()

tables = []
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)
Expand Down Expand Up @@ -294,13 +299,13 @@ def reduce_pixel_shards(
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index()
dataframe.to_parquet(destination_file)
dataframe.to_parquet(destination_file, storage_options=storage_options)

del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)

file_io.remove_directory(pixel_dir, ignore_errors=True)
file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

ResumePlan.reducing_key_done(tmp_path=resume_path, reducing_key=reducing_key)
13 changes: 10 additions & 3 deletions src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _reduce_pixels(args, destination_pixel_map, client):
add_hipscat_index=args.add_hipscat_index,
use_schema_file=args.use_schema_file,
use_hipscat_index=args.use_hipscat_index,
storage_options=args.output_storage_options,
)
)

Expand Down Expand Up @@ -155,19 +156,25 @@ def run(args, client):
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
storage_options=args.output_storage_options,
)
step_progress.update(1)

io.write_catalog_info(catalog_base_dir=args.catalog_path, dataset_info=catalog_info)
io.write_catalog_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
storage_options=args.output_storage_options,
)
step_progress.update(1)
if not args.debug_stats_only:
io.write_parquet_metadata(args.catalog_path)
io.write_parquet_metadata(args.catalog_path, storage_options=args.output_storage_options)
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram)
io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options)
step_progress.update(1)
io.write_partition_info(
catalog_base_dir=args.catalog_path,
destination_healpix_pixel_map=destination_pixel_map,
storage_options=args.output_storage_options,
)
step_progress.update(1)
args.resume_plan.clean_resume_files()
Expand Down
19 changes: 13 additions & 6 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
from dataclasses import dataclass
from importlib.metadata import version
from typing import Any, Dict, Union

from hipscat.io import FilePointer, file_io

Expand All @@ -20,6 +21,8 @@ class RuntimeArguments:
"""base path where new catalog should be output"""
output_artifact_name: str = ""
"""short, convenient name for the catalog"""
output_storage_options: Union[Dict[Any, Any], None] = None
"""optional dictionary of abstract filesystem credentials for the OUTPUT."""

## Execution
tmp_dir: str = ""
Expand Down Expand Up @@ -69,12 +72,12 @@ def _check_arguments(self):

self.catalog_path = file_io.append_paths_to_pointer(self.output_path, self.output_artifact_name)
if not self.overwrite:
if file_io.directory_has_contents(self.catalog_path):
if file_io.directory_has_contents(self.catalog_path, storage_options=self.output_storage_options):
raise ValueError(
f"output_path ({self.catalog_path}) contains files."
" choose a different directory or use --overwrite flag"
)
file_io.make_directory(self.catalog_path, exist_ok=True)
file_io.make_directory(self.catalog_path, exist_ok=True, storage_options=self.output_storage_options)

if self.tmp_dir:
if not file_io.does_file_or_directory_exist(self.tmp_dir):
Expand All @@ -90,7 +93,7 @@ def _check_arguments(self):
)
else:
self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate")
file_io.make_directory(self.tmp_path, exist_ok=True)
file_io.make_directory(self.tmp_path, exist_ok=True, storage_options=self.output_storage_options)
if self.resume_tmp:
self.resume_tmp = file_io.append_paths_to_pointer(self.resume_tmp, self.output_artifact_name)
else:
Expand Down Expand Up @@ -129,7 +132,9 @@ def additional_runtime_provenance_info(self):
return {}


def find_input_paths(input_path="", file_matcher="", input_file_list=None):
def find_input_paths(
input_path="", file_matcher="", input_file_list=None, storage_options: Union[Dict[Any, Any], None] = None
):
"""Helper method to find input paths, given either a prefix and format, or an
explicit list of paths.
Expand All @@ -143,9 +148,11 @@ def find_input_paths(input_path="", file_matcher="", input_file_list=None):
FileNotFoundError if no files are found at the input_path and the provided list is empty.
"""
if input_path:
if not file_io.does_file_or_directory_exist(input_path):
if not file_io.does_file_or_directory_exist(input_path, storage_options=storage_options):
raise FileNotFoundError("input_path not found on local storage")
input_paths = file_io.find_files_matching_path(input_path, file_matcher)
input_paths = file_io.find_files_matching_path(
input_path, file_matcher, include_protocol=True, storage_options=storage_options
)
elif input_file_list:
input_paths = input_file_list
if len(input_paths) == 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/hipscat_import/catalog/test_argument_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_good_paths(blank_data_dir, blank_data_file, tmp_path):
)
assert args.input_path == blank_data_dir
assert len(args.input_paths) == 1
assert args.input_paths[0] == blank_data_file
assert blank_data_file in args.input_paths[0]


def test_multiple_files_in_path(small_sky_parts_dir, tmp_path):
Expand Down

0 comments on commit f50adee

Please sign in to comment.