Skip to content

Commit

Permalink
Correct integer types in index catalog creation (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Jan 10, 2024
1 parent 0ba5bcb commit abfc017
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# pylint: disable=too-many-locals,too-many-arguments


def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.int64):
def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.uint64):
"""Create a path for intermediate pixel data.
This will take the form:
Expand Down Expand Up @@ -81,7 +81,7 @@ def _iterate_input_file(
)
# Set up the pixel data
mapped_pixels = hp.ang2pix(
2 ** highest_order,
2**highest_order,
data[ra_column].values,
data[dec_column].values,
lonlat=True,
Expand Down
12 changes: 9 additions & 3 deletions src/hipscat_import/index/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import dask.dataframe as dd
import numpy as np
from dask.distributed import progress, wait
from hipscat.io import file_io
from hipscat.io import paths
from hipscat.io.file_io import file_io
from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN


Expand All @@ -16,13 +17,17 @@ def create_index(args):
if args.include_order_pixel:
include_columns.extend(["Norder", "Dir", "Npix"])

index_dir = file_io.append_paths_to_pointer(args.catalog_path, "index")
index_dir = paths.append_paths_to_pointer(args.catalog_path, "index")

metadata_file = paths.get_parquet_metadata_pointer(args.input_catalog_path)

metadata = file_io.read_parquet_metadata(metadata_file)
data = dd.read_parquet(
path=args.input_catalog_path,
columns=include_columns,
engine="pyarrow",
dataset={"partitioning": "hive"},
dataset={"partitioning": {"flavor": "hive", "schema": metadata.schema.to_arrow_schema()}},
filesystem="arrow",
)

if args.include_order_pixel:
Expand All @@ -33,6 +38,7 @@ def create_index(args):
data = data.reset_index()
if not args.include_hipscat_index:
data = data.drop(columns=[HIPSCAT_ID_COLUMN])
data = data.drop_duplicates()
data = data.repartition(partition_size=args.compute_partition_size)
data = data.set_index(args.indexing_column)
result = data.to_parquet(
Expand Down
14 changes: 7 additions & 7 deletions src/hipscat_import/soap/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args):
joined_data = joined_data.reset_index()

joined_data["Norder"] = np.full(rows_written, fill_value=object_pixel.order, dtype=np.uint8)
joined_data["Dir"] = np.full(rows_written, fill_value=object_pixel.dir, dtype=np.uint32)
joined_data["Npix"] = np.full(rows_written, fill_value=object_pixel.pixel, dtype=np.uint32)
joined_data["Dir"] = np.full(rows_written, fill_value=object_pixel.dir, dtype=np.uint64)
joined_data["Npix"] = np.full(rows_written, fill_value=object_pixel.pixel, dtype=np.uint64)

joined_data["join_Norder"] = np.full(rows_written, fill_value=source_pixel.order, dtype=np.uint8)
joined_data["join_Dir"] = np.full(rows_written, fill_value=source_pixel.dir, dtype=np.uint32)
joined_data["join_Npix"] = np.full(rows_written, fill_value=source_pixel.pixel, dtype=np.uint32)
joined_data["join_Dir"] = np.full(rows_written, fill_value=source_pixel.dir, dtype=np.uint64)
joined_data["join_Npix"] = np.full(rows_written, fill_value=source_pixel.pixel, dtype=np.uint64)

joined_data.to_parquet(output_file, index=True)

Expand All @@ -64,9 +64,9 @@ def _write_count_results(cache_path, source_healpix, results):
dataframe = pd.DataFrame(results, columns=["Norder", "Npix", "num_rows"])

dataframe["Dir"] = [int(order / 10_000) * 10_000 if order >= 0 else -1 for order, _, _ in results]
dataframe["join_Norder"] = np.full(num_results, fill_value=source_healpix.order, dtype=np.int32)
dataframe["join_Dir"] = [int(order / 10_000) * 10_000 for order in dataframe["join_Norder"]]
dataframe["join_Npix"] = np.full(num_results, fill_value=source_healpix.pixel, dtype=np.int32)
dataframe["join_Norder"] = np.full(num_results, fill_value=source_healpix.order, dtype=np.uint8)
dataframe["join_Dir"] = np.full(num_results, fill_value=source_healpix.dir, dtype=np.uint64)
dataframe["join_Npix"] = np.full(num_results, fill_value=source_healpix.pixel, dtype=np.uint64)

## Reorder columns.
dataframe = dataframe[["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]]
Expand Down
Binary file not shown.
Binary file not shown.
Binary file modified tests/hipscat_import/data/small_sky_object_catalog/_metadata
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
"epoch": "J2000",
"ra_column": "ra",
"dec_column": "dec"
}
}
Binary file modified tests/hipscat_import/data/small_sky_object_catalog/point_map.fits
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
{
"catalog_name": "small_sky_object_catalog",
"catalog_type": "object",
"version": "0.0.10.dev7+g0a79f90.d20230418",
"generation_date": "2023.04.20",
"epoch": "J2000",
"ra_kw": "ra",
"dec_kw": "dec",
"total_rows": 131,
"epoch": "J2000",
"ra_column": "ra",
"dec_column": "dec",
"version": "0.2.1",
"generation_date": "2024.01.09",
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.0.4.dev28+g2e31821.d20230420",
"version": "0.2.1",
"runtime_args": {
"catalog_name": "small_sky_object_catalog",
"output_path": "/home/data",
"output_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/",
"output_artifact_name": "small_sky_object_catalog",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0",
"dask_tmp": "",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "/home/data/small_sky_object_catalog",
"tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0/small_sky_object_catalog/intermediate",
"catalog_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog",
"tmp_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog/intermediate",
"epoch": "J2000",
"catalog_type": "object",
"input_path": "/home/data/small_sky_parts",
"input_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky",
"input_paths": [
"/home/data/small_sky_parts/catalog_00_of_05.csv",
"/home/data/small_sky_parts/catalog_01_of_05.csv",
"/home/data/small_sky_parts/catalog_02_of_05.csv",
"/home/data/small_sky_parts/catalog_03_of_05.csv",
"/home/data/small_sky_parts/catalog_04_of_05.csv"
"file:///home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky/catalog.csv"
],
"input_format": "csv",
"input_file_list": [],
"ra_column": "ra",
"dec_column": "dec",
"sort_columns": "id",
"highest_healpix_order": 1,
"use_hipscat_index": false,
"sort_columns": null,
"constant_healpix_order": -1,
"highest_healpix_order": 7,
"pixel_threshold": 1000000,
"mapping_healpix_order": 7,
"debug_stats_only": false,
"file_reader_info": {
"input_reader_type": "CsvReader",
Expand All @@ -50,4 +49,4 @@
}
}
}
}
}
53 changes: 53 additions & 0 deletions tests/hipscat_import/index/test_run_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -108,3 +109,55 @@ def test_run_index_on_source(

schema = pq.read_metadata(os.path.join(args.catalog_path, "_common_metadata")).schema.to_arrow_schema()
assert schema.equals(basic_index_parquet_schema, check_metadata=False)


@pytest.mark.dask
def test_run_index_on_source_object_id(
small_sky_source_catalog,
dask_client, # pylint: disable=unused-argument
tmp_path,
assert_parquet_file_index,
):
"""Test appropriate metadata is written."""

args = IndexArguments(
input_catalog_path=small_sky_source_catalog,
indexing_column="object_id",
output_path=tmp_path,
output_artifact_name="small_sky_source_object_id_index",
overwrite=True,
include_hipscat_index=False,
progress_bar=False,
)
runner.run(args)

# Check that the catalog metadata file exists
catalog = Dataset.read_from_hipscat(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path

basic_index_parquet_schema = pa.schema(
[
pa.field("Norder", pa.uint8()),
pa.field("Dir", pa.uint64()),
pa.field("Npix", pa.uint64()),
pa.field("object_id", pa.int64()),
]
)

outfile = os.path.join(args.catalog_path, "index", "part.0.parquet")
schema = pq.read_metadata(outfile).schema.to_arrow_schema()
assert schema.equals(basic_index_parquet_schema, check_metadata=False)

id_range = np.arange(700, 831)
## Some of the objects have sources that span two source partitions.
doubled_up = [706, 707, 716, 726, 730, 736, 740, 779, 780, 784, 787, 789, 790, 792, 797, 818, 820]
doubled_up.extend(id_range)

assert_parquet_file_index(outfile, doubled_up)

schema = pq.read_metadata(os.path.join(args.catalog_path, "_metadata")).schema.to_arrow_schema()
assert schema.equals(basic_index_parquet_schema, check_metadata=False)

schema = pq.read_metadata(os.path.join(args.catalog_path, "_common_metadata")).schema.to_arrow_schema()
assert schema.equals(basic_index_parquet_schema, check_metadata=False)

0 comments on commit abfc017

Please sign in to comment.