Skip to content

Commit

Permalink
Loads of small cleanups (#203)
Browse files Browse the repository at this point in the history
* Loads of cleanups

* Better docstrings.

* Pylint warnings (wth pre-commit)

* But actually do the TODO this time.
  • Loading branch information
delucchi-cmu authored Jan 19, 2024
1 parent 958857e commit 862ea03
Show file tree
Hide file tree
Showing 27 changed files with 77 additions and 1,489 deletions.
41 changes: 10 additions & 31 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,16 @@
import pyarrow.parquet as pq
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io, paths
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory

# pylint: disable=too-many-locals,too-many-arguments


def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.uint64):
"""Create a path for intermediate pixel data.
This will take the form:
<cache_path>/dir_<directory separator>/pixel_<pixel>
where the directory separator is calculated using integer division:
(pixel/10000)*10000
and exists to mitigate problems on file systems that don't support
more than 10_000 children nodes.
"""
dir_number = int(pixel / 10_000) * 10_000
return file_io.append_paths_to_pointer(
cache_path, f"order_{order}", f"dir_{dir_number}", f"pixel_{pixel}"
)


def _has_named_index(dataframe):
"""Heuristic to determine if a dataframe has some meaningful index.
Expand Down Expand Up @@ -172,7 +154,7 @@ def split_pixels(

filtered_data = data.filter(items=data_indexes, axis=0)

pixel_dir = _get_pixel_directory(cache_shard_path, order, pixel)
pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel))
file_io.make_directory(pixel_dir, exist_ok=True)
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
Expand Down Expand Up @@ -259,7 +241,8 @@ def reduce_pixel_shards(
).schema.to_arrow_schema()

tables = []
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)
healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

if schema:
tables.append(pq.read_table(pixel_dir, schema=schema))
Expand All @@ -273,7 +256,7 @@ def reduce_pixel_shards(
if rows_written != destination_pixel_size:
raise ValueError(
"Unexpected number of objects at pixel "
f"({destination_pixel_order}, {destination_pixel_number})."
f"({healpix_pixel})."
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

Expand All @@ -286,13 +269,9 @@ def reduce_pixel_shards(
dataframe[dec_column].values,
)

dataframe["Norder"] = np.full(rows_written, fill_value=destination_pixel_order, dtype=np.uint8)
dataframe["Dir"] = np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.uint64,
)
dataframe["Npix"] = np.full(rows_written, fill_value=destination_pixel_number, dtype=np.uint64)
dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)
dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)
dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)

if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
Expand All @@ -304,7 +283,7 @@ def reduce_pixel_shards(
del dataframe, merged_table, tables

if delete_input_files:
pixel_dir = _get_pixel_directory(cache_shard_path, destination_pixel_order, destination_pixel_number)
pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel)

file_io.remove_directory(pixel_dir, ignore_errors=True, storage_options=storage_options)

Expand Down
Empty file.
125 changes: 0 additions & 125 deletions src/hipscat_import/cross_match/macauff_arguments.py

This file was deleted.

106 changes: 0 additions & 106 deletions src/hipscat_import/cross_match/macauff_map_reduce.py

This file was deleted.

Loading

0 comments on commit 862ea03

Please sign in to comment.