Skip to content

Commit

Permalink
Merge branch 'main' into max/margin_cache_reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwest-uw committed Jun 23, 2023
2 parents 6762e54 + 328b9fc commit 330d8a3
Show file tree
Hide file tree
Showing 43 changed files with 643 additions and 819 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/testing-and-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ jobs:
run: |
sudo apt-get update
python -m pip install --upgrade pip
pip install .
pip install .[dev]
pip install -e .[dev]
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Run unit tests with pytest
run: |
Expand Down
7 changes: 0 additions & 7 deletions docs/guide/command_line.rst

This file was deleted.

1 change: 0 additions & 1 deletion docs/guide/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ Installation
Other Topics
-------------------------------------------------------------------------------

* :doc:`command_line`
* :doc:`resume`
1 change: 0 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Utility for ingesting large survey data into HiPSCat structure.
:caption: Importing Catalogs

guide/overview
guide/command_line
guide/resume
Notebooks <notebooks>

Expand Down
4 changes: 2 additions & 2 deletions docs/notebooks/intro_notebook.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
" # Calculate the cosine of each value of X\n",
" z = np.cos(x)\n",
" # Plot the sine wave in blue, using degrees rather than radians on the X axis\n",
" pl.plot(xdeg, y, color='blue', label='Sine wave')\n",
" pl.plot(xdeg, y, color=\"blue\", label=\"Sine wave\")\n",
" # Plot the cos wave in green, using degrees rather than radians on the X axis\n",
" pl.plot(xdeg, z, color='green', label='Cosine wave')\n",
" pl.plot(xdeg, z, color=\"green\", label=\"Cosine wave\")\n",
" pl.xlabel(\"Degrees\")\n",
" # More sensible X axis values\n",
" pl.xticks(np.arange(0, 361, 45))\n",
Expand Down
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ write_to = "src/hipscat_import/_version.py"

[tool.setuptools.package-data]
hipscat_import = ["py.typed"]
[project.scripts]
hipscat-import = "hipscat_import.control:main"
hc = "hipscat_import.control:main"

[tool.pytest.ini_options]
timeout = 1
markers = [
"dask: mark tests as having a dask client runtime dependency",
]

[tool.coverage.report]
omit = [
"src/hipscat_import/_version.py", # auto-generated
"src/hipscat_import/pipeline.py", # too annoying to test
]
1 change: 0 additions & 1 deletion src/hipscat_import/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""All modules for hipscat-import package"""

from .control import main
from .runtime_arguments import RuntimeArguments
7 changes: 0 additions & 7 deletions src/hipscat_import/__main__.py

This file was deleted.

29 changes: 16 additions & 13 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from dataclasses import dataclass

from hipscat.catalog import CatalogParameters
from hipscat.catalog.association_catalog.association_catalog import (
AssociationCatalogInfo,
)

from hipscat_import.runtime_arguments import RuntimeArguments

Expand Down Expand Up @@ -51,19 +53,20 @@ def _check_arguments(self):
if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type="association",
output_path=self.output_path,
)
def to_catalog_info(self, total_rows) -> AssociationCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": "association",
"total_rows": total_rows,
"primary_column": self.primary_id_column,
"primary_catalog": str(self.primary_input_catalog_path),
"join_column": self.join_id_column,
"join_catalog": str(self.join_input_catalog_path),
}
return AssociationCatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"primary_input_catalog_path": str(self.primary_input_catalog_path),
"primary_id_column": self.primary_id_column,
Expand Down
22 changes: 12 additions & 10 deletions src/hipscat_import/association/run_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,13 @@
from hipscat_import.association.map_reduce import map_association, reduce_association


def _validate_args(args):
def run(args):
"""Run the association pipeline"""
if not args:
raise TypeError("args is required and should be type AssociationArguments")
if not isinstance(args, AssociationArguments):
raise TypeError("args must be type AssociationArguments")


def run(args):
"""Run the association pipeline"""
_validate_args(args)

with tqdm(total=1, desc="Mapping ", disable=not args.progress_bar) as step_progress:
map_association(args)
step_progress.update(1)
Expand All @@ -40,11 +36,17 @@ def run(args):
) as step_progress:
# pylint: disable=duplicate-code
# Very similar to /index/run_index.py
catalog_params = args.to_catalog_parameters()
catalog_params.total_rows = int(rows_written)
write_metadata.write_provenance_info(catalog_params, args.provenance_info())
catalog_info = args.to_catalog_info(int(rows_written))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(catalog_params)
catalog_info = args.to_catalog_info(total_rows=int(rows_written))
write_metadata.write_catalog_info(
dataset_info=catalog_info, catalog_base_dir=args.catalog_path
)
step_progress.update(1)
write_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
Expand Down
32 changes: 22 additions & 10 deletions src/hipscat_import/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
"""All modules for importing new catalogs."""

from .arguments import ImportArguments
from .command_line_arguments import parse_command_line
from .file_readers import (CsvReader, FitsReader, InputReader, ParquetReader,
get_file_reader)
from .file_readers import (
CsvReader,
FitsReader,
InputReader,
ParquetReader,
get_file_reader,
)
from .map_reduce import map_to_pixels, reduce_pixel_shards, split_pixels
from .resume_files import (clean_resume_files, is_mapping_done,
is_reducing_done, read_histogram, read_mapping_keys,
read_reducing_keys, set_mapping_done,
set_reducing_done, write_histogram,
write_mapping_done_key, write_mapping_start_key,
write_reducing_key)
from .run_import import run, run_with_client
from .resume_files import (
clean_resume_files,
is_mapping_done,
is_reducing_done,
read_histogram,
read_mapping_keys,
read_reducing_keys,
set_mapping_done,
set_reducing_done,
write_histogram,
write_mapping_done_key,
write_mapping_start_key,
write_reducing_key,
)
from .run_import import run
37 changes: 19 additions & 18 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, List

import pandas as pd
from hipscat.catalog import CatalogParameters
from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math import hipscat_id

Expand Down Expand Up @@ -96,8 +96,8 @@ def _check_arguments(self):
check_healpix_order_range(
self.highest_healpix_order, "highest_healpix_order"
)
if not 100 <= self.pixel_threshold <= 10_000_000:
raise ValueError("pixel_threshold should be between 100 and 10,000,000")
if not 100 <= self.pixel_threshold <= 1_000_000_000:
raise ValueError("pixel_threshold should be between 100 and 1,000,000,000")
self.mapping_healpix_order = self.highest_healpix_order

if self.catalog_type not in ("source", "object"):
Expand Down Expand Up @@ -140,21 +140,19 @@ def _check_arguments(self):
if not self.filter_function:
self.filter_function = passthrough_filter_function

def to_catalog_parameters(self) -> CatalogParameters:
"""Convert importing arguments into hipscat catalog parameters.
Returns:
CatalogParameters for catalog being created.
"""
return CatalogParameters(
catalog_name=self.output_catalog_name,
catalog_type=self.catalog_type,
output_path=self.output_path,
epoch=self.epoch,
ra_column=self.ra_column,
dec_column=self.dec_column,
)
def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_type": self.catalog_type,
"total_rows": total_rows,
"epoch": self.epoch,
"ra_column": self.ra_column,
"dec_column": self.dec_column,
}
return CatalogInfo(**info)

def additional_runtime_provenance_info(self):
def additional_runtime_provenance_info(self) -> dict:
return {
"catalog_name": self.output_catalog_name,
"epoch": self.epoch,
Expand All @@ -171,7 +169,9 @@ def additional_runtime_provenance_info(self):
"pixel_threshold": self.pixel_threshold,
"mapping_healpix_order": self.mapping_healpix_order,
"debug_stats_only": self.debug_stats_only,
"file_reader_info": self.file_reader.provenance_info(),
"file_reader_info": self.file_reader.provenance_info()
if self.file_reader is not None
else {},
}


Expand All @@ -180,6 +180,7 @@ def check_healpix_order_range(
):
"""Helper method to heck if the `order` is within the range determined by the
`lower_bound` and `upper_bound`, inclusive.
Args:
order (int): healpix order to check
field_name (str): field name to use in the error message
Expand Down
Loading

0 comments on commit 330d8a3

Please sign in to comment.