Skip to content

Commit

Permalink
Pickle instead of scatter. (#328)
Browse files Browse the repository at this point in the history
* Pickle instead of scatter.

* Code review updates

* Improve code coverage.
  • Loading branch information
delucchi-cmu authored Jun 6, 2024
1 parent c5e6095 commit 3b2c3ae
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 184 deletions.
24 changes: 17 additions & 7 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import a set of non-hipscat files using dask for parallelization"""

import pickle
from typing import Any, Dict, Union

import healpy as hp
Expand All @@ -11,7 +12,6 @@
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure
Expand All @@ -35,14 +35,16 @@ def _has_named_index(dataframe):

def _iterate_input_file(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
highest_order,
ra_column,
dec_column,
use_hipscat_index=False,
read_columns=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
with open(pickled_reader_file, "rb") as pickle_file:
file_reader = pickle.load(pickle_file)
if not file_reader:
raise NotImplementedError("No file reader implemented")

Expand All @@ -66,7 +68,7 @@ def _iterate_input_file(

def map_to_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
resume_path: FilePointer,
mapping_key,
highest_order,
Expand Down Expand Up @@ -103,7 +105,13 @@ def map_to_pixels(
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
input_file,
pickled_reader_file,
highest_order,
ra_column,
dec_column,
use_hipscat_index,
read_columns,
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

Expand All @@ -120,14 +128,14 @@ def map_to_pixels(

def split_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
splitting_key,
highest_order,
ra_column,
dec_column,
cache_shard_path: FilePointer,
resume_path: FilePointer,
alignment=None,
alignment_file=None,
use_hipscat_index=False,
):
"""Map a file of input objects to their healpix pixels and split into shards.
Expand All @@ -149,8 +157,10 @@ def split_pixels(
FileNotFoundError: if the file does not exist, or is a directory
"""
try:
with open(alignment_file, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)
Expand Down
84 changes: 72 additions & 12 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from __future__ import annotations

import pickle
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import healpy as hp
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from tqdm.auto import tqdm
Expand All @@ -24,15 +27,16 @@ class ResumePlan(PipelineResumePlan):
"""list of files (and job keys) that have yet to be mapped"""
split_keys: List[Tuple[str, str]] = field(default_factory=list)
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""
destination_pixel_map: Optional[List[Tuple[int, int, int]]] = None
"""Destination pixels and their expected final count"""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
HISTOGRAMS_DIR = "histograms"
ALIGNMENT_FILE = "alignment.pickle"

def __post_init__(self):
"""Initialize the plan."""
Expand Down Expand Up @@ -196,6 +200,61 @@ def is_mapping_done(self) -> bool:
"""Are there files left to map?"""
return self.done_file_exists(self.MAPPING_STAGE)

def get_alignment_file(
self,
raw_histogram,
constant_healpix_order,
highest_healpix_order,
lowest_healpix_order,
pixel_threshold,
) -> str:
"""Get a pointer to the existing alignment file for the pipeline, or
generate a new alignment using provided arguments.
Args:
raw_histogram (:obj:`np.array`): one-dimensional numpy array of long integers where the
value at each index corresponds to the number of objects found at the healpix pixel.
constant_healpix_order (int): if positive, use this as the order for
all non-empty partitions. else, use remaining arguments.
highest_healpix_order (int): the highest healpix order (e.g. 5-10)
lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order
constrains the partitioning to prevent spatially large pixels.
threshold (int): the maximum number of objects allowed in a single pixel
Returns:
path to cached alignment file.
"""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ALIGNMENT_FILE)
if not file_io.does_file_or_directory_exist(file_name):
if constant_healpix_order >= 0:
alignment = np.full(len(raw_histogram), None)
for pixel_num, pixel_sum in enumerate(raw_histogram):
alignment[pixel_num] = (
constant_healpix_order,
pixel_num,
pixel_sum,
)
else:
alignment = pixel_math.generate_alignment(
raw_histogram,
highest_order=highest_healpix_order,
lowest_order=lowest_healpix_order,
threshold=pixel_threshold,
)
with open(file_name, "wb") as pickle_file:
pickle.dump(alignment, pickle_file)

if self.destination_pixel_map is None:
with open(file_name, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
non_none_elements = alignment[alignment != np.array(None)]
self.destination_pixel_map = np.unique(non_none_elements)
self.destination_pixel_map = [
(order, pix, count) for (order, pix, count) in self.destination_pixel_map if int(count) > 0
]

return file_name

def wait_for_splitting(self, futures):
"""Wait for splitting futures to complete."""
self.wait_for_futures(futures, self.SPLITTING_STAGE)
Expand All @@ -208,30 +267,31 @@ def is_splitting_done(self) -> bool:
"""Are there files left to split?"""
return self.done_file_exists(self.SPLITTING_STAGE)

def get_reduce_items(self, destination_pixel_map=None):
def get_reduce_items(self):
"""Fetch a triple for each partition to reduce.
Triple contains:
- destination pixel (healpix pixel with both order and pixel)
- source pixels (list of pixels at mapping order)
- number of rows expected for this pixel
- reduce key (string of destination order+pixel)
"""
reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE))
if destination_pixel_map is None:
destination_pixel_map = self.destination_pixel_map
elif self.destination_pixel_map is None:
self.destination_pixel_map = destination_pixel_map
if self.destination_pixel_map is None:
raise RuntimeError("destination pixel map not provided for progress tracking.")
reduce_items = [
(hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel, source_pixels in destination_pixel_map.items()
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys
(HealpixPixel(hp_order, hp_pixel), row_count, f"{hp_order}_{hp_pixel}")
for hp_order, hp_pixel, row_count in self.destination_pixel_map
if f"{hp_order}_{hp_pixel}" not in reduced_keys
]
return reduce_items

def get_destination_pixels(self):
"""Create HealpixPixel list of all destination pixels."""
if self.destination_pixel_map is None:
raise RuntimeError("destination pixel map not known.")
return [HealpixPixel(hp_order, hp_pixel) for hp_order, hp_pixel, _ in self.destination_pixel_map]

def is_reducing_done(self) -> bool:
"""Are there partitions left to reduce?"""
return self.done_file_exists(self.REDUCING_STAGE)
Expand Down
Loading

0 comments on commit 3b2c3ae

Please sign in to comment.