diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 1cc7fe16..9c3115e2 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -16,7 +16,7 @@ # pylint: disable=too-many-locals,too-many-arguments -def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.int64): +def _get_pixel_directory(cache_path: FilePointer, order: np.int64, pixel: np.uint64): """Create a path for intermediate pixel data. This will take the form: @@ -81,7 +81,7 @@ def _iterate_input_file( ) # Set up the pixel data mapped_pixels = hp.ang2pix( - 2 ** highest_order, + 2**highest_order, data[ra_column].values, data[dec_column].values, lonlat=True, diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index f728e3df..988104de 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -3,7 +3,8 @@ import dask.dataframe as dd import numpy as np from dask.distributed import progress, wait -from hipscat.io import file_io +from hipscat.io import paths +from hipscat.io.file_io import file_io from hipscat.pixel_math.hipscat_id import HIPSCAT_ID_COLUMN @@ -16,13 +17,17 @@ def create_index(args): if args.include_order_pixel: include_columns.extend(["Norder", "Dir", "Npix"]) - index_dir = file_io.append_paths_to_pointer(args.catalog_path, "index") + index_dir = paths.append_paths_to_pointer(args.catalog_path, "index") + metadata_file = paths.get_parquet_metadata_pointer(args.input_catalog_path) + + metadata = file_io.read_parquet_metadata(metadata_file) data = dd.read_parquet( path=args.input_catalog_path, columns=include_columns, engine="pyarrow", - dataset={"partitioning": "hive"}, + dataset={"partitioning": {"flavor": "hive", "schema": metadata.schema.to_arrow_schema()}}, + filesystem="arrow", ) if args.include_order_pixel: @@ -33,6 +38,7 @@ def create_index(args): data = data.reset_index() if not args.include_hipscat_index: data = data.drop(columns=[HIPSCAT_ID_COLUMN]) + data = data.drop_duplicates() data = data.repartition(partition_size=args.compute_partition_size) data = data.set_index(args.indexing_column) result = data.to_parquet( diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 95f1e83a..0a495119 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -46,12 +46,12 @@ def _count_joins_for_object(source_data, source_pixel, object_pixel, soap_args): joined_data = joined_data.reset_index() joined_data["Norder"] = np.full(rows_written, fill_value=object_pixel.order, dtype=np.uint8) - joined_data["Dir"] = np.full(rows_written, fill_value=object_pixel.dir, dtype=np.uint32) - joined_data["Npix"] = np.full(rows_written, fill_value=object_pixel.pixel, dtype=np.uint32) + joined_data["Dir"] = np.full(rows_written, fill_value=object_pixel.dir, dtype=np.uint64) + joined_data["Npix"] = np.full(rows_written, fill_value=object_pixel.pixel, dtype=np.uint64) joined_data["join_Norder"] = np.full(rows_written, fill_value=source_pixel.order, dtype=np.uint8) - joined_data["join_Dir"] = np.full(rows_written, fill_value=source_pixel.dir, dtype=np.uint32) - joined_data["join_Npix"] = np.full(rows_written, fill_value=source_pixel.pixel, dtype=np.uint32) + joined_data["join_Dir"] = np.full(rows_written, fill_value=source_pixel.dir, dtype=np.uint64) + joined_data["join_Npix"] = np.full(rows_written, fill_value=source_pixel.pixel, dtype=np.uint64) joined_data.to_parquet(output_file, index=True) @@ -64,9 +64,9 @@ def _write_count_results(cache_path, source_healpix, results): dataframe = pd.DataFrame(results, columns=["Norder", "Npix", "num_rows"]) dataframe["Dir"] = [int(order / 10_000) * 10_000 if order >= 0 else -1 for order, _, _ in results] - dataframe["join_Norder"] = np.full(num_results, fill_value=source_healpix.order, dtype=np.int32) - dataframe["join_Dir"] = [int(order / 10_000) * 10_000 for order in dataframe["join_Norder"]] - dataframe["join_Npix"] = np.full(num_results, fill_value=source_healpix.pixel, dtype=np.int32) + dataframe["join_Norder"] = np.full(num_results, fill_value=source_healpix.order, dtype=np.uint8) + dataframe["join_Dir"] = np.full(num_results, fill_value=source_healpix.dir, dtype=np.uint64) + dataframe["join_Npix"] = np.full(num_results, fill_value=source_healpix.pixel, dtype=np.uint64) ## Reorder columns. dataframe = dataframe[["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"]] diff --git a/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet b/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet index 7aed5e2b..e0cb8d94 100644 Binary files a/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet and b/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata b/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata index a1505a28..4cf7a744 100644 Binary files a/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata and b/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/_metadata b/tests/hipscat_import/data/small_sky_object_catalog/_metadata index 63b03188..26df207b 100644 Binary files a/tests/hipscat_import/data/small_sky_object_catalog/_metadata and b/tests/hipscat_import/data/small_sky_object_catalog/_metadata differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json b/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json index 59eaef9c..ef9a8b6d 100644 --- a/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json +++ b/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json @@ -5,4 +5,4 @@ "epoch": "J2000", "ra_column": "ra", "dec_column": "dec" -} \ No newline at end of file +} diff --git a/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits b/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits index e7287c9f..1971966f 100644 Binary files a/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits and b/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json b/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json index be110689..08ef05fe 100644 --- a/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json +++ b/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json @@ -1,43 +1,42 @@ { "catalog_name": "small_sky_object_catalog", "catalog_type": "object", - "version": "0.0.10.dev7+g0a79f90.d20230418", - "generation_date": "2023.04.20", - "epoch": "J2000", - "ra_kw": "ra", - "dec_kw": "dec", "total_rows": 131, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec", + "version": "0.2.1", + "generation_date": "2024.01.09", "tool_args": { "tool_name": "hipscat_import", - "version": "0.0.4.dev28+g2e31821.d20230420", + "version": "0.2.1", "runtime_args": { "catalog_name": "small_sky_object_catalog", - "output_path": "/home/data", + "output_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/", "output_artifact_name": "small_sky_object_catalog", "tmp_dir": "", "overwrite": true, - "dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0", + "dask_tmp": "", "dask_n_workers": 1, "dask_threads_per_worker": 1, - "catalog_path": "/home/data/small_sky_object_catalog", - "tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0/small_sky_object_catalog/intermediate", + "catalog_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog", + "tmp_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky_object_catalog/intermediate", "epoch": "J2000", "catalog_type": "object", - "input_path": "/home/data/small_sky_parts", + "input_path": "/home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky", "input_paths": [ - "/home/data/small_sky_parts/catalog_00_of_05.csv", - "/home/data/small_sky_parts/catalog_01_of_05.csv", - "/home/data/small_sky_parts/catalog_02_of_05.csv", - "/home/data/small_sky_parts/catalog_03_of_05.csv", - "/home/data/small_sky_parts/catalog_04_of_05.csv" + "file:///home/delucchi/git/hipscat-import/tests/hipscat_import/data/small_sky/catalog.csv" ], "input_format": "csv", "input_file_list": [], "ra_column": "ra", "dec_column": "dec", - "sort_columns": "id", - "highest_healpix_order": 1, + "use_hipscat_index": false, + "sort_columns": null, + "constant_healpix_order": -1, + "highest_healpix_order": 7, "pixel_threshold": 1000000, + "mapping_healpix_order": 7, "debug_stats_only": false, "file_reader_info": { "input_reader_type": "CsvReader", @@ -50,4 +49,4 @@ } } } -} \ No newline at end of file +} diff --git a/tests/hipscat_import/index/test_run_index.py b/tests/hipscat_import/index/test_run_index.py index 80190d10..6ebace01 100644 --- a/tests/hipscat_import/index/test_run_index.py +++ b/tests/hipscat_import/index/test_run_index.py @@ -2,6 +2,7 @@ import os +import numpy as np import pyarrow as pa import pyarrow.parquet as pq import pytest @@ -108,3 +109,55 @@ def test_run_index_on_source( schema = pq.read_metadata(os.path.join(args.catalog_path, "_common_metadata")).schema.to_arrow_schema() assert schema.equals(basic_index_parquet_schema, check_metadata=False) + + +@pytest.mark.dask +def test_run_index_on_source_object_id( + small_sky_source_catalog, + dask_client, # pylint: disable=unused-argument + tmp_path, + assert_parquet_file_index, +): + """Test appropriate metadata is written.""" + + args = IndexArguments( + input_catalog_path=small_sky_source_catalog, + indexing_column="object_id", + output_path=tmp_path, + output_artifact_name="small_sky_source_object_id_index", + overwrite=True, + include_hipscat_index=False, + progress_bar=False, + ) + runner.run(args) + + # Check that the catalog metadata file exists + catalog = Dataset.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + + basic_index_parquet_schema = pa.schema( + [ + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("object_id", pa.int64()), + ] + ) + + outfile = os.path.join(args.catalog_path, "index", "part.0.parquet") + schema = pq.read_metadata(outfile).schema.to_arrow_schema() + assert schema.equals(basic_index_parquet_schema, check_metadata=False) + + id_range = np.arange(700, 831) + ## Some of the objects have sources that span two source partitions. + doubled_up = [706, 707, 716, 726, 730, 736, 740, 779, 780, 784, 787, 789, 790, 792, 797, 818, 820] + doubled_up.extend(id_range) + + assert_parquet_file_index(outfile, doubled_up) + + schema = pq.read_metadata(os.path.join(args.catalog_path, "_metadata")).schema.to_arrow_schema() + assert schema.equals(basic_index_parquet_schema, check_metadata=False) + + schema = pq.read_metadata(os.path.join(args.catalog_path, "_common_metadata")).schema.to_arrow_schema() + assert schema.equals(basic_index_parquet_schema, check_metadata=False)