diff --git a/pyproject.toml b/pyproject.toml index 59ef225f..058e151f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dependencies = [ "dask[distributed]", "deprecated", "healpy", - "hipscat >= 0.1.2", + "hipscat >= 0.1.5", "ipykernel", # Support for Jupyter notebooks "pandas < 2.1.0", "pyarrow", diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index fb43b35b..81703bb3 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -40,6 +40,8 @@ class ImportArguments(RuntimeArguments): """column for right ascension""" dec_column: str = "dec" """column for declination""" + use_hipscat_index: bool = False + """use an existing hipscat spatial index as the position, instead of ra/dec""" id_column: str = "id" """column for survey identifier, or other sortable column""" add_hipscat_index: bool = True @@ -133,6 +135,7 @@ def additional_runtime_provenance_info(self) -> dict: "input_file_list": self.input_file_list, "ra_column": self.ra_column, "dec_column": self.dec_column, + "use_hipscat_index": self.use_hipscat_index, "id_column": self.id_column, "constant_healpix_order": self.constant_healpix_order, "highest_healpix_order": self.highest_healpix_order, diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 97ed8def..41e06db5 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -6,6 +6,7 @@ import pyarrow.parquet as pq from hipscat import pixel_math from hipscat.io import FilePointer, file_io, paths +from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN, hipscat_id_to_healpix from hipscat_import.catalog.file_readers import InputReader from hipscat_import.catalog.resume_plan import ResumePlan @@ -53,6 +54,7 @@ def _iterate_input_file( highest_order, ra_column, dec_column, + use_hipscat_index = False, ): """Helper function to handle input file reading and healpix pixel calculation""" if not file_reader: @@ -61,18 +63,28 @@ def _iterate_input_file( required_columns = [ra_column, dec_column] for chunk_number, data in enumerate(file_reader.read(input_file)): - if not all(x in data.columns for x in required_columns): - raise ValueError( - f"Invalid column names in input file: {ra_column}, {dec_column} not in {input_file}" + if use_hipscat_index: + if data.index.name == HIPSCAT_ID_COLUMN: + mapped_pixels = hipscat_id_to_healpix(data.index, target_order=highest_order) + elif HIPSCAT_ID_COLUMN in data.columns: + mapped_pixels = hipscat_id_to_healpix(data[HIPSCAT_ID_COLUMN], target_order=highest_order) + else: + raise ValueError( + f"Invalid column names in input file: {HIPSCAT_ID_COLUMN} not in {input_file}" + ) + else: + if not all(x in data.columns for x in required_columns): + raise ValueError( + f"Invalid column names in input file: {', '.join(required_columns)} not in {input_file}" + ) + # Set up the pixel data + mapped_pixels = hp.ang2pix( + 2**highest_order, + data[ra_column].values, + data[dec_column].values, + lonlat=True, + nest=True, ) - # Set up the pixel data - mapped_pixels = hp.ang2pix( - 2**highest_order, - data[ra_column].values, - data[dec_column].values, - lonlat=True, - nest=True, - ) yield chunk_number, data, mapped_pixels @@ -84,6 +96,7 @@ def map_to_pixels( highest_order, ra_column, dec_column, + use_hipscat_index = False ): """Map a file of input objects to their healpix pixels. @@ -107,7 +120,7 @@ def map_to_pixels( """ histo = pixel_math.empty_histogram(highest_order) for _, _, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) mapped_pixel = mapped_pixel.astype(np.int64) @@ -125,6 +138,7 @@ def split_pixels( cache_shard_path: FilePointer, resume_path: FilePointer, alignment=None, + use_hipscat_index = False, ): """Map a file of input objects to their healpix pixels and split into shards. @@ -145,7 +159,7 @@ def split_pixels( FileNotFoundError: if the file does not exist, or is a directory """ for chunk_number, data, mapped_pixels in _iterate_input_file( - input_file, file_reader, highest_order, ra_column, dec_column + input_file, file_reader, highest_order, ra_column, dec_column, use_hipscat_index ): aligned_pixels = alignment[mapped_pixels] unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) @@ -181,6 +195,7 @@ def reduce_pixel_shards( ra_column, dec_column, id_column, + use_hipscat_index = False, add_hipscat_index=True, delete_input_files=True, use_schema_file="", @@ -260,8 +275,8 @@ def reduce_pixel_shards( dataframe = merged_table.to_pandas() if id_column: dataframe = dataframe.sort_values(id_column) - if add_hipscat_index: - dataframe["_hipscat_index"] = pixel_math.compute_hipscat_id( + if add_hipscat_index and not use_hipscat_index: + dataframe[HIPSCAT_ID_COLUMN] = pixel_math.compute_hipscat_id( dataframe[ra_column].values, dataframe[dec_column].values, ) @@ -278,7 +293,7 @@ def reduce_pixel_shards( ## If we had a meaningful index before, preserve it as a column. if _has_named_index(dataframe): dataframe = dataframe.reset_index() - dataframe = dataframe.set_index("_hipscat_index").sort_index() + dataframe = dataframe.set_index(HIPSCAT_ID_COLUMN).sort_index() dataframe.to_parquet(destination_file) del dataframe, merged_table, tables diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 218bd539..36a7476c 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -35,6 +35,7 @@ def _map_pixels(args, client): highest_order=args.mapping_healpix_order, ra_column=args.ra_column, dec_column=args.dec_column, + use_hipscat_index=args.use_hipscat_index, ) ) args.resume_plan.wait_for_mapping(futures) @@ -62,6 +63,7 @@ def _split_pixels(args, alignment_future, client): cache_shard_path=args.tmp_path, resume_path=args.resume_plan.tmp_path, alignment=alignment_future, + use_hipscat_index=args.use_hipscat_index, ) ) @@ -96,6 +98,7 @@ def _reduce_pixels(args, destination_pixel_map, client): id_column=args.id_column, add_hipscat_index=args.add_hipscat_index, use_schema_file=args.use_schema_file, + use_hipscat_index=args.use_hipscat_index, ) ) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index a7cb5399..e446a2a9 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -4,6 +4,7 @@ import numpy as np from dask.distributed import progress, wait from hipscat.io import file_io +from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN def create_index(args): @@ -31,7 +32,7 @@ def create_index(args): data["Npix"] = data["Npix"].astype(np.int32) data = data.reset_index() if not args.include_hipscat_index: - data = data.drop(columns=["_hipscat_index"]) + data = data.drop(columns=[HIPSCAT_ID_COLUMN]) data = data.repartition(partition_size=args.compute_partition_size) data = data.set_index(args.indexing_column) result = data.to_parquet( diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 6168e4d9..0a2d36a5 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -134,6 +134,38 @@ def test_map_headers(tmp_path, formats_headers_csv): assert (result == expected).all() +def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): + os.makedirs(os.path.join(tmp_path, "histograms")) + input_file = os.path.join(formats_dir, "hipscat_index.csv") + mr.map_to_pixels( + input_file=input_file, + file_reader=get_file_reader("csv"), + highest_order=0, + ra_column="NOPE", + dec_column="NOPE", + use_hipscat_index=True, # radec don't matter. just use existing index + resume_path=tmp_path, + mapping_key="map_0", + ) + + expected = hist.empty_histogram(0) + expected[11] = 131 + + result = read_partial_histogram(tmp_path, "map_0") + npt.assert_array_equal(result, expected) + + with pytest.raises(ValueError, match="_hipscat_index not in"): + mr.map_to_pixels( + input_file=small_sky_single_file, + file_reader=get_file_reader("csv"), + highest_order=0, + ra_column="NOPE", + dec_column="NOPE", + use_hipscat_index=True, # no pre-existing index! expect failure. + resume_path=tmp_path, + mapping_key="map_0", + ) + def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet): """Test loading the a file when using a parquet schema file for dtypes""" os.makedirs(os.path.join(tmp_path, "histograms")) @@ -161,7 +193,6 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque npt.assert_array_equal(result, expected) assert (result == expected).all() - def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" os.makedirs(os.path.join(tmp_path, "histograms")) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index e195008c..00901ad5 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -356,3 +356,59 @@ def read(self, input_file): expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) + + +@pytest.mark.dask +def test_import_hipscat_index( + dask_client, + formats_dir, + assert_parquet_file_ids, + tmp_path, +): + """Test basic execution, using a previously-computed _hipscat_index column for spatial partitioning.""" + ## First, let's just check the assumptions we have about our input file: + ## - should have _hipscat_index as the indexed column + ## - should NOT have any columns like "ra" or "dec" + input_file = os.path.join(formats_dir, "hipscat_index.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(input_file, "id", expected_ids) + + data_frame = pd.read_parquet(input_file, engine="pyarrow") + assert data_frame.index.name == "_hipscat_index" + npt.assert_array_equal(data_frame.columns, ["id"]) + + args = ImportArguments( + output_catalog_name="using_hipscat_index", + input_file_list=[input_file], + input_format="parquet", + output_path=tmp_path, + dask_tmp=tmp_path, + use_hipscat_index=True, + add_hipscat_index=False, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + id_column="_hipscat_index", + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet") + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + assert data_frame.index.name == "_hipscat_index" + npt.assert_array_equal( + data_frame.columns, + ["id", "Norder", "Dir", "Npix"], + ) diff --git a/tests/hipscat_import/data/test_formats/hipscat_index.csv b/tests/hipscat_import/data/test_formats/hipscat_index.csv new file mode 100644 index 00000000..153091e5 --- /dev/null +++ b/tests/hipscat_import/data/test_formats/hipscat_index.csv @@ -0,0 +1,132 @@ +_hipscat_index,id,ra,dec,ra_error,dec_error,Norder,Dir,Npix +12749688880727326720,707,308.5,-69.5,0,0,0,0,11 +12751184493818150912,792,320.5,-69.5,0,0,0,0,11 +12753202806647685120,811,315.5,-68.5,0,0,0,0,11 +12753202806647685121,723,315.5,-68.5,0,0,0,0,11 +12770681119980912640,826,335.5,-69.5,0,0,0,0,11 +12771980657148559360,750,338.5,-67.5,0,0,0,0,11 +12776409575968473088,771,348.5,-67.5,0,0,0,0,11 +12782714789977653248,734,348.5,-66.5,0,0,0,0,11 +12786706826733289472,738,345.5,-64.5,0,0,0,0,11 +12786894563780329472,772,348.5,-64.5,0,0,0,0,11 +12788339839317573632,776,344.5,-63.5,0,0,0,0,11 +12797951905556856832,733,329.5,-65.5,0,0,0,0,11 +12801026705158307840,804,322.5,-66.5,0,0,0,0,11 +12818067795442925568,747,327.5,-61.5,0,0,0,0,11 +12823504327528153088,739,332.5,-57.5,0,0,0,0,11 +12842381331509805056,816,288.5,-69.5,0,0,0,0,11 +12842473731565551616,703,286.5,-69.5,0,0,0,0,11 +12855054043935932416,794,300.5,-66.5,0,0,0,0,11 +12856781556059996160,735,299.5,-65.5,0,0,0,0,11 +12859878138972209152,797,308.5,-62.5,0,0,0,0,11 +12866984851890241536,815,283.5,-68.5,0,0,0,0,11 +12882093266048122880,748,296.5,-63.5,0,0,0,0,11 +12886291525662670848,716,305.5,-60.5,0,0,0,0,11 +12886577464536465408,807,303.5,-60.5,0,0,0,0,11 +12887770713741590528,768,297.5,-60.5,0,0,0,0,11 +12888117478487490560,729,299.5,-59.5,0,0,0,0,11 +12888375204127965184,810,301.5,-59.5,0,0,0,0,11 +12890425758039670784,718,292.5,-60.5,0,0,0,0,11 +12897705201133158400,818,300.5,-55.5,0,0,0,0,11 +12901304742075957248,766,310.5,-63.5,0,0,0,0,11 +12904011555938500608,730,322.5,-61.5,0,0,0,0,11 +12924400840801779712,758,325.5,-53.5,0,0,0,0,11 +12924737222707511296,780,326.5,-52.5,0,0,0,0,11 +12926803467124604928,775,321.5,-54.5,0,0,0,0,11 +12927513300782022656,760,320.5,-53.5,0,0,0,0,11 +12935235931912273920,795,306.5,-58.5,0,0,0,0,11 +12946238438616596480,822,301.5,-54.5,0,0,0,0,11 +12947523513744359424,736,303.5,-52.5,0,0,0,0,11 +12949977409238597632,801,309.5,-50.5,0,0,0,0,11 +12951015418364952576,830,306.5,-50.5,0,0,0,0,11 +12957936896993918976,817,318.5,-48.5,0,0,0,0,11 +12958541318065225728,787,320.5,-47.5,0,0,0,0,11 +12980498864409673728,812,346.5,-60.5,0,0,0,0,11 +12985050869937471488,722,350.5,-58.5,0,0,0,0,11 +13025270726448381952,731,343.5,-52.5,0,0,0,0,11 +13031060802264629248,720,344.5,-47.5,0,0,0,0,11 +13040468461170458624,823,338.5,-45.5,0,0,0,0,11 +13055884976753475584,742,348.5,-45.5,0,0,0,0,11 +13093160001097170944,719,344.5,-39.5,0,0,0,0,11 +13094378277252890624,710,341.5,-39.5,0,0,0,0,11 +13095317624672223232,726,341.5,-37.5,0,0,0,0,11 +13097779065304121344,744,349.5,-39.5,0,0,0,0,11 +13100157308065808384,813,349.5,-37.5,0,0,0,0,11 +13109184215138697216,757,346.5,-34.5,0,0,0,0,11 +13114993892334239744,821,330.5,-52.5,0,0,0,0,11 +13117165557772189696,762,327.5,-51.5,0,0,0,0,11 +13122077940282032128,728,328.5,-47.5,0,0,0,0,11 +13123208770404483072,781,330.5,-46.5,0,0,0,0,11 +13130546552927944704,704,326.5,-45.5,0,0,0,0,11 +13135578070553460736,751,330.5,-44.5,0,0,0,0,11 +13158407025211736064,724,323.5,-41.5,0,0,0,0,11 +13164283224702058496,808,320.5,-40.5,0,0,0,0,11 +13186894729939255296,784,338.5,-40.5,0,0,0,0,11 +13187453677775880192,732,337.5,-39.5,0,0,0,0,11 +13189921792761790464,745,337.5,-38.5,0,0,0,0,11 +13202401744484564992,786,336.5,-33.5,0,0,0,0,11 +13203103043639312384,705,335.5,-32.5,0,0,0,0,11 +13211086588563423232,779,347.5,-29.5,0,0,0,0,11 +13235029212974284800,761,329.5,-29.5,0,0,0,0,11 +13239388759557931008,828,330.5,-26.5,0,0,0,0,11 +13250788433850269696,803,336.5,-25.5,0,0,0,0,11 +13263647230914461696,788,283.5,-61.5,0,0,0,0,11 +13272631885323829248,700,282.5,-58.5,0,0,0,0,11 +13277499429092327424,793,289.5,-58.5,0,0,0,0,11 +13283409463257071616,749,293.5,-55.5,0,0,0,0,11 +13284984179453329408,805,297.5,-52.5,0,0,0,0,11 +13293316792777703424,773,293.5,-50.5,0,0,0,0,11 +13300970211545972736,774,281.5,-54.5,0,0,0,0,11 +13316869903572008960,712,288.5,-49.5,0,0,0,0,11 +13319655515505033216,759,290.5,-48.5,0,0,0,0,11 +13325709382806142976,820,286.5,-46.5,0,0,0,0,11 +13326118614579806208,789,287.5,-45.5,0,0,0,0,11 +13335640766354030592,711,305.5,-49.5,0,0,0,0,11 +13335856080517857280,802,304.5,-49.5,0,0,0,0,11 +13341394068685455360,701,299.5,-48.5,0,0,0,0,11 +13347311673342427136,727,301.5,-44.5,0,0,0,0,11 +13348003826582421504,717,303.5,-43.5,0,0,0,0,11 +13351146793404989440,753,307.5,-45.5,0,0,0,0,11 +13358998609274601472,769,307.5,-42.5,0,0,0,0,11 +13359333484913491968,725,308.5,-41.5,0,0,0,0,11 +13362536511002640384,827,310.5,-40.5,0,0,0,0,11 +13364612928339181568,777,307.5,-39.5,0,0,0,0,11 +13368388511275679744,764,297.5,-45.5,0,0,0,0,11 +13369482380335644672,785,296.5,-44.5,0,0,0,0,11 +13369514156621824000,709,294.5,-45.5,0,0,0,0,11 +13374210622061805568,713,298.5,-41.5,0,0,0,0,11 +13382429402164363264,800,299.5,-37.5,0,0,0,0,11 +13384601479449411584,706,297.5,-36.5,0,0,0,0,11 +13387360701694083072,755,303.5,-38.5,0,0,0,0,11 +13387360701694083073,741,303.5,-38.5,0,0,0,0,11 +13388334615593222144,714,303.5,-37.5,0,0,0,0,11 +13389212170495983616,763,306.5,-38.5,0,0,0,0,11 +13389509163101454336,708,307.5,-37.5,0,0,0,0,11 +13392589952663945216,765,306.5,-35.5,0,0,0,0,11 +13393588426222075904,740,306.5,-33.5,0,0,0,0,11 +13425161974698737664,783,286.5,-42.5,0,0,0,0,11 +13462800185222496256,790,286.5,-35.5,0,0,0,0,11 +13465233373970563072,809,283.5,-34.5,0,0,0,0,11 +13467391906581315584,715,280.5,-35.5,0,0,0,0,11 +13477206946360590336,782,290.5,-39.5,0,0,0,0,11 +13488986123334057984,752,291.5,-34.5,0,0,0,0,11 +13520476867982786560,746,283.5,-31.5,0,0,0,0,11 +13521835979425447936,770,285.5,-29.5,0,0,0,0,11 +13552942781667737600,756,319.5,-35.5,0,0,0,0,11 +13553697461939208192,798,316.5,-36.5,0,0,0,0,11 +13557123557418336256,778,313.5,-36.5,0,0,0,0,11 +13557377060258709504,829,314.5,-35.5,0,0,0,0,11 +13557816572940124160,819,313.5,-35.5,0,0,0,0,11 +13560168899495854080,814,312.5,-33.5,0,0,0,0,11 +13560933976658411520,721,314.5,-34.5,0,0,0,0,11 +13561582046530240512,737,316.5,-33.5,0,0,0,0,11 +13563711661973438464,799,313.5,-31.5,0,0,0,0,11 +13564690156971098112,825,315.5,-30.5,0,0,0,0,11 +13565852277582856192,796,320.5,-33.5,0,0,0,0,11 +13588709332114997248,754,313.5,-30.5,0,0,0,0,11 +13590818251897569280,806,312.5,-29.5,0,0,0,0,11 +13591216801265483776,791,312.5,-28.5,0,0,0,0,11 +13596001812279721984,824,305.5,-28.5,0,0,0,0,11 +13598131468743213056,702,310.5,-27.5,0,0,0,0,11 +13601023174257934336,767,314.5,-29.5,0,0,0,0,11 +13696722494273093632,743,307.5,-25.5,0,0,0,0,11 diff --git a/tests/hipscat_import/data/test_formats/hipscat_index.parquet b/tests/hipscat_import/data/test_formats/hipscat_index.parquet new file mode 100644 index 00000000..44bdf663 Binary files /dev/null and b/tests/hipscat_import/data/test_formats/hipscat_index.parquet differ