Skip to content

Commit

Permalink
Argument renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Nov 21, 2023
1 parent c31fe68 commit 9199323
Show file tree
Hide file tree
Showing 37 changed files with 178 additions and 176 deletions.
8 changes: 4 additions & 4 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ will look something like:
.. code-block:: python
args = ImportArguments(
id_column="ObjectID",
sort_columns="ObjectID",
ra_column="ObjectRA",
dec_column="ObjectDec",
input_path="./my_data",
input_format="csv",
output_catalog_name="test_cat",
output_artifact_name="test_cat",
output_path="./output",
)
Expand Down Expand Up @@ -164,11 +164,11 @@ for either pipeline success or failure.
Output
-------------------------------------------------------------------------------

You must specify a name for the catalog, using ``output_catalog_name``.
You must specify a name for the catalog, using ``output_artifact_name``.

You must specify where you want your catalog data to be written, using
``output_path``. This path should be the base directory for your catalogs, as
the full path for the catalog will take the form of ``output_path/output_catalog_name``.
the full path for the catalog will take the form of ``output_path/output_artifact_name``.

If there is already catalog data in the indicated directory, you can force a
new catalog to be written in the directory with the ``overwrite`` flag.
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs/public/allwise.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Example import
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
output_catalog_name="allwise",
output_artifact_name="allwise",
input_path="/path/to/allwise/",
input_format="csv.bz2",
file_reader=CsvReader(
Expand All @@ -54,7 +54,7 @@ Example import
use_schema_file="allwise_schema.parquet",
ra_column="ra",
dec_column="dec",
id_column="source_id",
sort_columns="source_id",
pixel_threshold=1_000_000,
highest_healpix_order=7,
output_path="/path/to/catalogs/",
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs/public/neowise.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Example import
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
output_catalog_name="neowise_1",
output_artifact_name="neowise_1",
input_path="/path/to/neowiser_year8/",
input_format="csv.bz2",
file_reader=CsvReader(
Expand All @@ -56,7 +56,7 @@ Example import
pixel_threshold=2_000_000,
highest_healpix_order=9,
use_schema_file="neowise_schema.parquet",
id_column="SOURCE_ID",
sort_columns="SOURCE_ID",
output_path="/path/to/catalogs/",
)
runner.run(args)
Expand Down
8 changes: 4 additions & 4 deletions docs/catalogs/public/panstarrs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Example import of objects (otmo)
in_file_paths = glob.glob("/path/to/otmo/OTMO_**.csv")
in_file_paths.sort()
args = ImportArguments(
output_catalog_name="ps1_otmo",
output_artifact_name="ps1_otmo",
input_file_list=in_file_paths,
input_format="csv",
file_reader=CsvReader(
Expand All @@ -53,7 +53,7 @@ Example import of objects (otmo)
),
ra_column="raMean",
dec_column="decMean",
id_column="objID",
sort_columns="objID",
)
runner.pipeline(args)
Expand All @@ -71,7 +71,7 @@ Example import of detections
in_file_paths = glob.glob("/path/to/detection/detection**.csv")
in_file_paths.sort()
args = ImportArguments(
output_catalog_name="ps1_detection",
output_artifact_name="ps1_detection",
input_file_list=in_file_paths,
input_format="csv",
file_reader=CsvReader(
Expand All @@ -84,6 +84,6 @@ Example import of detections
),
ra_column="ra",
dec_column="dec",
id_column="objID",
sort_columns="objID",
)
runner.pipeline(args)
4 changes: 2 additions & 2 deletions docs/catalogs/public/sdss.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ Example import
import hipscat_import.pipeline as runner
args = ImportArguments(
output_catalog_name="sdss_dr16q",
output_artifact_name="sdss_dr16q",
input_path="/data/sdss/parquet/",
input_format="parquet",
ra_column="RA",
dec_column="DEC",
id_column="ID",
sort_columns="ID",
pixel_threshold=1_000_000,
highest_healpix_order=7,
output_path="/path/to/catalogs/",
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs/public/tic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Example import
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
output_catalog_name="tic_1",
output_artifact_name="tic_1",
input_path="/path/to/tic/",
input_format="csv.gz",
file_reader=CsvReader(
Expand All @@ -49,7 +49,7 @@ Example import
).read,
ra_column="ra",
dec_column="dec",
id_column="ID",
sort_columns="ID",
output_path="/path/to/catalogs/",
use_schema_file="tic_schema.parquet",
)
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs/public/zubercal.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ Challenges with this data set
files.sort()
args = ImportArguments(
output_catalog_name="zubercal",
output_artifact_name="zubercal",
input_file_list=files,
## NB - you need the parens here!
file_reader=ZubercalParquetReader(),
input_format="parquet",
catalog_type="source",
ra_column="objra",
dec_column="objdec",
id_column="objectid",
sort_columns="objectid",
highest_healpix_order=10,
pixel_threshold=20_000_000,
output_path="/path/to/catalogs/",
Expand Down
4 changes: 2 additions & 2 deletions docs/notebooks/unequal_schema.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"tmp_path = tempfile.TemporaryDirectory()\n",
"\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_bad\",\n",
" output_artifact_name=\"mixed_csv_bad\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
Expand Down Expand Up @@ -128,7 +128,7 @@
"source": [
"tmp_path = tempfile.TemporaryDirectory()\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_good\",\n",
" output_artifact_name=\"mixed_csv_good\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
Expand Down
12 changes: 7 additions & 5 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class ImportArguments(RuntimeArguments):
"""column for declination"""
use_hipscat_index: bool = False
"""use an existing hipscat spatial index as the position, instead of ra/dec"""
id_column: str = "id"
"""column for survey identifier, or other sortable column"""
sort_columns: str | None = None
"""column for survey identifier, or other sortable column. if sorting by multiple columns,
they should be comma-separated. if `add_hipscat_index=True`, this sorting will be used to
resolve the counter within the same higher-order pixel space"""
add_hipscat_index: bool = True
"""add the hipscat spatial index field alongside the data"""
use_schema_file: str | None = None
Expand Down Expand Up @@ -115,7 +117,7 @@ def _check_arguments(self):
def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"catalog_type": self.catalog_type,
"total_rows": total_rows,
"epoch": self.epoch,
Expand All @@ -126,7 +128,7 @@ def to_catalog_info(self, total_rows) -> CatalogInfo:

def additional_runtime_provenance_info(self) -> dict:
return {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"epoch": self.epoch,
"catalog_type": self.catalog_type,
"input_path": str(self.input_path),
Expand All @@ -136,7 +138,7 @@ def additional_runtime_provenance_info(self) -> dict:
"ra_column": self.ra_column,
"dec_column": self.dec_column,
"use_hipscat_index": self.use_hipscat_index,
"id_column": self.id_column,
"sort_columns": self.sort_columns,
"constant_healpix_order": self.constant_healpix_order,
"highest_healpix_order": self.highest_healpix_order,
"pixel_threshold": self.pixel_threshold,
Expand Down
10 changes: 5 additions & 5 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _iterate_input_file(
)
# Set up the pixel data
mapped_pixels = hp.ang2pix(
2**highest_order,
2 ** highest_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
Expand Down Expand Up @@ -194,7 +194,7 @@ def reduce_pixel_shards(
output_path,
ra_column,
dec_column,
id_column,
sort_columns: str = "",
use_hipscat_index=False,
add_hipscat_index=True,
delete_input_files=True,
Expand Down Expand Up @@ -230,7 +230,7 @@ def reduce_pixel_shards(
destination_pixel_size (int): expected number of rows to write
for the catalog's final pixel
output_path (FilePointer): where to write the final catalog pixel data
id_column (str): column for survey identifier, or other sortable column
sort_columns (str): column for survey identifier, or other sortable column
add_hipscat_index (bool): should we add a _hipscat_index column to
the resulting parquet file?
delete_input_files (bool): should we delete the intermediate files
Expand Down Expand Up @@ -273,8 +273,8 @@ def reduce_pixel_shards(
)

dataframe = merged_table.to_pandas()
if id_column:
dataframe = dataframe.sort_values(id_column)
if sort_columns:
dataframe = dataframe.sort_values(sort_columns.split(","))
if add_hipscat_index and not use_hipscat_index:
dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _reduce_pixels(args, destination_pixel_map, client):
output_path=args.catalog_path,
ra_column=args.ra_column,
dec_column=args.dec_column,
id_column=args.id_column,
sort_columns=args.sort_columns,
add_hipscat_index=args.add_hipscat_index,
use_schema_file=args.use_schema_file,
use_hipscat_index=args.use_hipscat_index,
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _check_arguments(self):
def to_catalog_info(self, total_rows) -> IndexCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"total_rows": total_rows,
"catalog_type": "index",
"primary_catalog": str(self.input_catalog_path),
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _check_arguments(self):
def to_catalog_info(self, total_rows) -> MarginCacheCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"total_rows": total_rows,
"catalog_type": "margin",
"primary_catalog": self.input_catalog_path,
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def map_pixel_shards(
data = file_io.load_parquet_to_pandas(partition_file)

data["margin_pixel"] = hp.ang2pix(
2**margin_order,
2 ** margin_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
Expand Down
4 changes: 2 additions & 2 deletions src/hipscat_import/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _send_failure_email(args: RuntimeArguments, exception: Exception):
message["Subject"] = "hipscat-import failure."
message["To"] = args.completion_email_address
message.set_content(
f"output_catalog_name: {args.output_catalog_name}"
f"output_artifact_name: {args.output_artifact_name}"
"\n\nSee logs for more details"
f"\n\nFailed with message:\n\n{exception}"
)
Expand All @@ -82,7 +82,7 @@ def _send_success_email(args):
message = EmailMessage()
message["Subject"] = "hipscat-import success."
message["To"] = args.completion_email_address
message.set_content(f"output_catalog_name: {args.output_catalog_name}")
message.set_content(f"output_artifact_name: {args.output_artifact_name}")

_send_email(message)

Expand Down
22 changes: 11 additions & 11 deletions src/hipscat_import/runtime_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RuntimeArguments:
## Output
output_path: str = ""
"""base path where new catalog should be output"""
output_catalog_name: str = ""
output_artifact_name: str = ""
"""short, convenient name for the catalog"""

## Execution
Expand All @@ -44,7 +44,7 @@ class RuntimeArguments:

catalog_path: FilePointer | None = None
"""constructed output path for the catalog that will be something like
<output_path>/<output_catalog_name>"""
<output_path>/<output_artifact_name>"""
tmp_path: FilePointer | None = None
"""constructed temp path - defaults to tmp_dir, then dask_tmp, but will create
a new temp directory under catalog_path if no other options are provided"""
Expand All @@ -55,17 +55,17 @@ def __post_init__(self):
def _check_arguments(self):
if not self.output_path:
raise ValueError("output_path is required")
if not self.output_catalog_name:
raise ValueError("output_catalog_name is required")
if re.search(r"[^A-Za-z0-9_\-\\]", self.output_catalog_name):
raise ValueError("output_catalog_name contains invalid characters")
if not self.output_artifact_name:
raise ValueError("output_artifact_name is required")
if re.search(r"[^A-Za-z0-9_\-\\]", self.output_artifact_name):
raise ValueError("output_artifact_name contains invalid characters")

if self.dask_n_workers <= 0:
raise ValueError("dask_n_workers should be greather than 0")
if self.dask_threads_per_worker <= 0:
raise ValueError("dask_threads_per_worker should be greater than 0")

self.catalog_path = file_io.append_paths_to_pointer(self.output_path, self.output_catalog_name)
self.catalog_path = file_io.append_paths_to_pointer(self.output_path, self.output_artifact_name)
if not self.overwrite:
if file_io.directory_has_contents(self.catalog_path):
raise ValueError(
Expand All @@ -78,13 +78,13 @@ def _check_arguments(self):
if not file_io.does_file_or_directory_exist(self.tmp_dir):
raise FileNotFoundError(f"tmp_dir ({self.tmp_dir}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.tmp_dir, self.output_catalog_name, "intermediate"
self.tmp_dir, self.output_artifact_name, "intermediate"
)
elif self.dask_tmp:
if not file_io.does_file_or_directory_exist(self.dask_tmp):
raise FileNotFoundError(f"dask_tmp ({self.dask_tmp}) not found on local storage")
self.tmp_path = file_io.append_paths_to_pointer(
self.dask_tmp, self.output_catalog_name, "intermediate"
self.dask_tmp, self.output_artifact_name, "intermediate"
)
else:
self.tmp_path = file_io.append_paths_to_pointer(self.catalog_path, "intermediate")
Expand All @@ -97,9 +97,9 @@ def provenance_info(self) -> dict:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""
runtime_args = {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"output_path": str(self.output_path),
"output_catalog_name": self.output_catalog_name,
"output_artifact_name": self.output_artifact_name,
"tmp_dir": str(self.tmp_dir),
"overwrite": self.overwrite,
"dask_tmp": str(self.dask_tmp),
Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _check_arguments(self):
def to_catalog_info(self, total_rows) -> AssociationCatalogInfo:
"""Catalog-type-specific dataset info."""
info = {
"catalog_name": self.output_catalog_name,
"catalog_name": self.output_artifact_name,
"catalog_type": CatalogType.ASSOCIATION,
"total_rows": total_rows,
"primary_column": self.object_id_column,
Expand Down
Loading

0 comments on commit 9199323

Please sign in to comment.