Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pickle instead of scatter. #328

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Import a set of non-hipscat files using dask for parallelization"""

import pickle
from typing import Any, Dict, Union

import healpy as hp
Expand All @@ -11,7 +12,6 @@
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.catalog.sparse_histogram import SparseHistogram
from hipscat_import.pipeline_resume_plan import get_pixel_cache_directory, print_task_failure
Expand All @@ -35,14 +35,16 @@ def _has_named_index(dataframe):

def _iterate_input_file(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
highest_order,
ra_column,
dec_column,
use_hipscat_index=False,
read_columns=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
with open(pickled_reader_file, "rb") as pickle_file:
file_reader = pickle.load(pickle_file)
if not file_reader:
raise NotImplementedError("No file reader implemented")

Expand All @@ -66,7 +68,7 @@ def _iterate_input_file(

def map_to_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
resume_path: FilePointer,
mapping_key,
highest_order,
Expand Down Expand Up @@ -103,7 +105,13 @@ def map_to_pixels(
read_columns = [ra_column, dec_column]

for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index, read_columns
input_file,
pickled_reader_file,
highest_order,
ra_column,
dec_column,
use_hipscat_index,
read_columns,
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)

Expand All @@ -120,14 +128,14 @@ def map_to_pixels(

def split_pixels(
input_file: FilePointer,
file_reader: InputReader,
pickled_reader_file: str,
splitting_key,
highest_order,
ra_column,
dec_column,
cache_shard_path: FilePointer,
resume_path: FilePointer,
alignment=None,
alignment_file=None,
use_hipscat_index=False,
):
"""Map a file of input objects to their healpix pixels and split into shards.
Expand All @@ -149,8 +157,10 @@ def split_pixels(
FileNotFoundError: if the file does not exist, or is a directory
"""
try:
with open(alignment_file, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index
input_file, pickled_reader_file, highest_order, ra_column, dec_column, use_hipscat_index
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)
Expand Down
84 changes: 72 additions & 12 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from __future__ import annotations

import pickle
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import healpy as hp
import numpy as np
from hipscat import pixel_math
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from tqdm.auto import tqdm
Expand All @@ -24,15 +27,16 @@ class ResumePlan(PipelineResumePlan):
"""list of files (and job keys) that have yet to be mapped"""
split_keys: List[Tuple[str, str]] = field(default_factory=list)
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""
destination_pixel_map: Optional[List[Tuple[int, int, int]]] = None
"""Destination pixels and their expected final count"""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
REDUCING_STAGE = "reducing"

HISTOGRAM_BINARY_FILE = "mapping_histogram.npz"
HISTOGRAMS_DIR = "histograms"
ALIGNMENT_FILE = "alignment.pickle"

def __post_init__(self):
"""Initialize the plan."""
Expand Down Expand Up @@ -196,6 +200,61 @@ def is_mapping_done(self) -> bool:
"""Are there files left to map?"""
return self.done_file_exists(self.MAPPING_STAGE)

def get_alignment_file(
self,
raw_histogram,
constant_healpix_order,
highest_healpix_order,
lowest_healpix_order,
pixel_threshold,
) -> str:
"""Get a pointer to the existing alignment file for the pipeline, or
generate a new alignment using provided arguments.

Args:
raw_histogram (:obj:`np.array`): one-dimensional numpy array of long integers where the
value at each index corresponds to the number of objects found at the healpix pixel.
constant_healpix_order (int): if positive, use this as the order for
all non-empty partitions. else, use remaining arguments.
highest_healpix_order (int): the highest healpix order (e.g. 5-10)
lowest_healpix_order (int): the lowest healpix order (e.g. 1-5). specifying a lowest order
constrains the partitioning to prevent spatially large pixels.
threshold (int): the maximum number of objects allowed in a single pixel

Returns:
path to cached alignment file.
"""
file_name = file_io.append_paths_to_pointer(self.tmp_path, self.ALIGNMENT_FILE)
if not file_io.does_file_or_directory_exist(file_name):
if constant_healpix_order >= 0:
alignment = np.full(len(raw_histogram), None)
for pixel_num, pixel_sum in enumerate(raw_histogram):
alignment[pixel_num] = (
constant_healpix_order,
pixel_num,
pixel_sum,
)
else:
alignment = pixel_math.generate_alignment(
raw_histogram,
highest_order=highest_healpix_order,
lowest_order=lowest_healpix_order,
threshold=pixel_threshold,
)
with open(file_name, "wb") as pickle_file:
pickle.dump(alignment, pickle_file)

if self.destination_pixel_map is None:
with open(file_name, "rb") as pickle_file:
alignment = pickle.load(pickle_file)
non_none_elements = alignment[alignment != np.array(None)]
self.destination_pixel_map = np.unique(non_none_elements)
self.destination_pixel_map = [
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
(order, pix, count) for (order, pix, count) in self.destination_pixel_map if int(count) > 0
]

return file_name

def wait_for_splitting(self, futures):
"""Wait for splitting futures to complete."""
self.wait_for_futures(futures, self.SPLITTING_STAGE)
Expand All @@ -208,30 +267,31 @@ def is_splitting_done(self) -> bool:
"""Are there files left to split?"""
return self.done_file_exists(self.SPLITTING_STAGE)

def get_reduce_items(self, destination_pixel_map=None):
def get_reduce_items(self):
"""Fetch a triple for each partition to reduce.

Triple contains:

- destination pixel (healpix pixel with both order and pixel)
- source pixels (list of pixels at mapping order)
- number of rows expected for this pixel
- reduce key (string of destination order+pixel)

"""
reduced_keys = set(self.read_done_keys(self.REDUCING_STAGE))
if destination_pixel_map is None:
destination_pixel_map = self.destination_pixel_map
elif self.destination_pixel_map is None:
self.destination_pixel_map = destination_pixel_map
if self.destination_pixel_map is None:
raise RuntimeError("destination pixel map not provided for progress tracking.")
reduce_items = [
(hp_pixel, source_pixels, f"{hp_pixel.order}_{hp_pixel.pixel}")
for hp_pixel, source_pixels in destination_pixel_map.items()
if f"{hp_pixel.order}_{hp_pixel.pixel}" not in reduced_keys
(HealpixPixel(hp_order, hp_pixel), row_count, f"{hp_order}_{hp_pixel}")
for hp_order, hp_pixel, row_count in self.destination_pixel_map
if f"{hp_order}_{hp_pixel}" not in reduced_keys
]
return reduce_items

def get_destination_pixels(self):
"""Create HealpixPixel list of all destination pixels."""
if self.destination_pixel_map is None:
raise RuntimeError("destination pixel map not known.")
return [HealpixPixel(hp_order, hp_pixel) for hp_order, hp_pixel, _ in self.destination_pixel_map]

def is_reducing_done(self) -> bool:
"""Are there partitions left to reduce?"""
return self.done_file_exists(self.REDUCING_STAGE)
Expand Down
Loading