Skip to content

Commit

Permalink
Merge pull request #84 from astronomy-commons/delucchi/catalog_info
Browse files Browse the repository at this point in the history
Use type-specific catalog info
  • Loading branch information
delucchi-cmu authored Jun 14, 2023
2 parents cdf887d + 2eaf08b commit c2d0570
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 323 deletions.
29 changes: 16 additions & 13 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from dataclasses import dataclass

from hipscat.catalog import CatalogParameters
from hipscat.catalog.association_catalog.association_catalog import (
AssociationCatalogInfo,
)

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down Expand Up @@ -51,19 +53,20 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type="association",
output_path=self.output_path,
)
def to_catalog_info(self, total_rows) -> AssociationCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": "association",
"total_rows": total_rows,
"primary_column": self.primary_id_column,
"primary_catalog": str(self.primary_input_catalog_path),
"join_column": self.join_id_column,
"join_catalog": str(self.join_input_catalog_path),
}
return AssociationCatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"primary_input_catalog_path": str(self.primary_input_catalog_path),
"primary_id_column": self.primary_id_column,
Expand Down
17 changes: 12 additions & 5 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from tqdm import tqdm

from hipscat_import.association.arguments import AssociationArguments
from hipscat_import.association.map_reduce import map_association, reduce_association
from hipscat_import.association.map_reduce import (map_association,
reduce_association)


def _validate_args(args):
Expand Down Expand Up @@ -40,11 +41,17 @@ def run(args):
) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
catalog_params = args.to_catalog_parameters()
catalog_params.total_rows = int(rows_written)
write_metadata.write_provenance_info(catalog_params, args.provenance_info())
catalog_info = args.to_catalog_info(int(rows_written))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(catalog_params)
catalog_info = args.to_catalog_info(total_rows=int(rows_written))
write_metadata.write_catalog_info(
dataset_info=catalog_info, catalog_base_dir=args.catalog_path
)
step_progress.update(1)
write_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
Expand Down
33 changes: 17 additions & 16 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, List

import pandas as pd
from hipscat.catalog import CatalogParameters
from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math import hipscat_id

Expand Down Expand Up @@ -140,21 +140,19 @@ def _check_arguments(self):
if not self.filter_function:
self.filter_function = passthrough_filter_function

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type=self.catalog_type,
output_path=self.output_path,
epoch=self.epoch,
ra_column=self.ra_column,
dec_column=self.dec_column,
)
def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": self.catalog_type,
"total_rows": total_rows,
"epoch": self.epoch,
"ra_column": self.ra_column,
"dec_column": self.dec_column,
}
return CatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"catalog_name": self.output_catalog_name,
"epoch": self.epoch,
Expand All @@ -171,7 +169,9 @@ def additional_runtime_provenance_info(self):
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": self.file_reader.provenance_info(),
"file_reader_info": self.file_reader.provenance_info()
if self.file_reader is not None
else {},
}


Expand All @@ -180,6 +180,7 @@ def check_healpix_order_range(
):
"""Helper method to heck if the `order` is within the range determined by the
`lower_bound` and `upper_bound`, inclusive.
Args:
order (int): healpix order to check
field_name (str): field name to use in the error message
Expand Down
28 changes: 17 additions & 11 deletions src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ def _map_pixels(args, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
raw_histogram = np.add(raw_histogram, result)
resume.write_mapping_start_key(args.tmp_path, future.key)
resume.write_histogram(args.tmp_path, raw_histogram)
resume.write_mapping_done_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some mapping stages failed. See logs for details.")
resume.set_mapping_done(args.tmp_path)
return raw_histogram
Expand Down Expand Up @@ -98,10 +98,10 @@ def _split_pixels(args, alignment_future, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
resume.write_splitting_done_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some splitting stages failed. See logs for details.")
resume.set_splitting_done(args.tmp_path)

Expand Down Expand Up @@ -143,10 +143,10 @@ def _reduce_pixels(args, destination_pixel_map, client):
total=len(futures),
disable=(not args.progress_bar),
):
if future.status == "error": # pragma: no cover
if future.status == "error": # pragma: no cover
some_error = True
resume.write_reducing_key(args.tmp_path, future.key)
if some_error: # pragma: no cover
if some_error: # pragma: no cover
raise RuntimeError("Some reducing stages failed. See logs for details.")
resume.set_reducing_done(args.tmp_path)

Expand Down Expand Up @@ -215,20 +215,26 @@ def run_with_client(args, client):
with tqdm(
total=6, desc="Finishing", disable=not args.progress_bar
) as step_progress:
catalog_parameters = args.to_catalog_parameters()
catalog_parameters.total_rows = int(raw_histogram.sum())
io.write_provenance_info(catalog_parameters, args.provenance_info())
catalog_info = args.to_catalog_info(int(raw_histogram.sum()))
io.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)

io.write_catalog_info(catalog_parameters)
io.write_catalog_info(
catalog_base_dir=args.catalog_path, dataset_info=catalog_info
)
step_progress.update(1)
if not args.debug_stats_only:
io.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram)
step_progress.update(1)
io.write_partition_info(
catalog_parameters, destination_healpix_pixel_map=destination_pixel_map
catalog_base_dir=args.catalog_path,
destination_healpix_pixel_map=destination_pixel_map,
)
step_progress.update(1)
resume.clean_resume_files(args.tmp_path)
Expand Down
27 changes: 14 additions & 13 deletions src/hipscat_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from dataclasses import dataclass, field
from typing import List, Optional

from hipscat.catalog import Catalog, CatalogParameters
from hipscat.catalog import Catalog
from hipscat.catalog.index.index_catalog_info import IndexCatalogInfo

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down Expand Up @@ -46,19 +47,19 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type="index",
output_path=self.output_path,
)
def to_catalog_info(self, total_rows) -> IndexCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"total_rows": total_rows,
"catalog_type": "index",
"primary_catalog": str(self.input_catalog_path),
"indexing_column": self.indexing_column,
"extra_columns": self.extra_columns,
}
return IndexCatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": str(self.input_catalog_path),
"indexing_column": self.indexing_column,
Expand Down
13 changes: 9 additions & 4 deletions src/hipscat_import/index/run_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ def run(args):
) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /association/run_association.py
catalog_params = args.to_catalog_parameters()
catalog_params.total_rows = int(rows_written)
write_metadata.write_provenance_info(catalog_params, args.provenance_info())
catalog_info = args.to_catalog_info(int(rows_written))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(catalog_params)
write_metadata.write_catalog_info(
catalog_base_dir=args.catalog_path, dataset_info=catalog_info
)
step_progress.update(1)
file_io.remove_directory(args.tmp_path, ignore_errors=True)
step_progress.update(1)
Expand Down
26 changes: 25 additions & 1 deletion src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import healpy as hp
import numpy as np
from hipscat.catalog import Catalog
from hipscat.catalog.margin_cache.margin_cache_catalog_info import (
MarginCacheCatalogInfo,
)
from hipscat.io import file_io

from hipscat_import.runtime_arguments import RuntimeArguments
Expand Down Expand Up @@ -53,9 +56,30 @@ def _check_arguments(self):

margin_pixel_nside = hp.order2nside(self.margin_order)

if hp.nside2resol(margin_pixel_nside, arcmin=True) * 60. < self.margin_threshold:
if (
hp.nside2resol(margin_pixel_nside, arcmin=True) * 60.0
< self.margin_threshold
):
# pylint: disable=line-too-long
warnings.warn(
"Warning: margin pixels have a smaller resolution than margin_threshold; this may lead to data loss in the margin cache."
)
# pylint: enable=line-too-long

def to_catalog_info(self, total_rows) -> MarginCacheCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"total_rows": total_rows,
"catalog_type": "margin",
"primary_catalog": self.input_catalog_path,
"margin_threshold": self.margin_threshold,
}
return MarginCacheCatalogInfo(**info)

def additional_runtime_provenance_info(self) -> dict:
return {
"input_catalog_path": str(self.input_catalog_path),
"margin_threshold": self.margin_threshold,
"margin_order": self.margin_order,
}
9 changes: 5 additions & 4 deletions tests/hipscat_import/association/test_association_argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ def test_all_required_args(tmp_path, small_sky_object_catalog):
)


def test_to_catalog_parameters(small_sky_object_catalog, tmp_path):
"""Verify creation of catalog parameters for index to be created."""
def test_to_catalog_info(small_sky_object_catalog, tmp_path):
"""Verify creation of catalog parameters for association table to be created."""
args = AssociationArguments(
primary_input_catalog_path=small_sky_object_catalog,
primary_id_column="id",
Expand All @@ -193,8 +193,9 @@ def test_to_catalog_parameters(small_sky_object_catalog, tmp_path):
output_path=tmp_path,
output_catalog_name="small_sky_self_join",
)
catalog_parameters = args.to_catalog_parameters()
assert catalog_parameters.catalog_name == args.output_catalog_name
catalog_info = args.to_catalog_info(total_rows=10)
assert catalog_info.catalog_name == args.output_catalog_name
assert catalog_info.total_rows == 10


def test_provenance_info(small_sky_object_catalog, tmp_path):
Expand Down
Loading

0 comments on commit c2d0570

Please sign in to comment.