From dde7026cd76a5e2023ff6ed91450bd20a98b5d32 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 20 Nov 2023 17:08:52 -0500 Subject: [PATCH 1/3] Add tests for more complex sorting scenarios. --- .../hipscat_import/catalog/test_map_reduce.py | 143 +++++++++++++++--- tests/hipscat_import/conftest.py | 11 +- 2 files changed, 128 insertions(+), 26 deletions(-) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 6e2a5e9d..53f55404 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -1,6 +1,7 @@ """Tests of map reduce operations""" import os +from io import StringIO import hipscat.pixel_math as hist import numpy as np @@ -315,27 +316,6 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) - # expected_indexes = [ - # 13598131468743213056, - # 13560933976658411520, - # 13561582046530240512, - # 13696722494273093632, - # 13588709332114997248, - # 13552942781667737600, - # 13601023174257934336, - # 13557123557418336256, - # 13591216801265483776, - # 13565852277582856192, - # 13553697461939208192, - # 13563711661973438464, - # 13590818251897569280, - # 13560168899495854080, - # 13557816572940124160, - # 13596001812279721984, - # 13564690156971098112, - # 13557377060258709504, - # ] - # assert_parquet_file_index(output_file, expected_indexes) data_frame = pd.read_parquet(output_file, engine="pyarrow") assert data_frame.index.name == "_hipscat_index" npt.assert_array_equal( @@ -385,3 +365,124 @@ def test_reduce_bad_expectation(parquet_shards_dir, tmp_path): id_column="id", delete_input_files=False, ) + + +def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): + """Test reducing and requesting specific sort columns. + + Logically, the input data has a mix of orderings in files, object IDs, and timestamps. + Each source is partitioned according to the linked object's radec, and so will be + ordered within the same hipscat_index value. + + First, we take some time to set up these silly data points, then we test out + reducing them into a single parquet file using a mix of reduction options. + """ + os.makedirs(os.path.join(tmp_path, "reducing")) + shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11") + os.makedirs(shard_dir) + + file1_string = """source_id,object_id,time,ra,dec +1200,700,3000,282.5,-58.5 +1201,700,4000,282.5,-58.5 +1402,702,3000,310.5,-27.5 +1403,702,3100,310.5,-27.5 +1404,702,3200,310.5,-27.5 +1505,703,4000,286.5,-69.5""" + file1_data = pd.read_csv(StringIO(file1_string)) + file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet")) + + file2_string = """source_id,object_id,time,ra,dec +1206,700,2000,282.5,-58.5 +1307,701,2200,299.5,-48.5 +1308,701,2100,299.5,-48.5 +1309,701,2000,299.5,-48.5""" + file2_data = pd.read_csv(StringIO(file2_string)) + file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet")) + + ## Sort option 1: by source_id + ## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are + ## (703, 700, 701, 702) + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column="source_id", + delete_input_files=False, + ) + + output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + assert_parquet_file_ids( + output_file, + "source_id", + [1505, 1200, 1201, 1206, 1307, 1308, 1309, 1402, 1403, 1404], + resort_ids=False, + ) + + assert_parquet_file_ids( + output_file, + "object_id", + [703, 700, 700, 700, 701, 701, 701, 702, 702, 702], + resort_ids=False, + ) + + ## Sort option 2: by object id and time + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column=["object_id", "time"], + delete_input_files=False, + ) + + output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + assert_parquet_file_ids( + output_file, + "source_id", + [1505, 1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404], + resort_ids=False, + ) + assert_parquet_file_ids( + output_file, + "time", + [4000, 2000, 3000, 4000, 2000, 2100, 2200, 3000, 3100, 3200], + resort_ids=False, + ) + + ## Sort option 3: by object id and time WITHOUT hipscat index. + ## The 1500 block of ids goes back to the end, because we're not using + ## spatial properties for sorting, only numeric. + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column=["object_id", "time"], + add_hipscat_index=False, + delete_input_files=False, + ) + + output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + assert_parquet_file_ids( + output_file, + "source_id", + [1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404, 1505], + resort_ids=False, + ) + \ No newline at end of file diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 3bc386f8..91c1ba6b 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -198,7 +198,7 @@ def basic_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2**3, + 2 ** 3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, @@ -230,7 +230,7 @@ def polar_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2**3, + 2 ** 3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, @@ -279,7 +279,7 @@ def assert_text_file_matches(expected_lines, file_name): @pytest.fixture def assert_parquet_file_ids(): - def assert_parquet_file_ids(file_name, id_column, expected_ids): + def assert_parquet_file_ids(file_name, id_column, expected_ids, resort_ids=True): """ Convenience method to read a parquet file and compare the object IDs to a list of expected objects. @@ -294,8 +294,9 @@ def assert_parquet_file_ids(file_name, id_column, expected_ids): data_frame = pd.read_parquet(file_name, engine="pyarrow") assert id_column in data_frame.columns ids = data_frame[id_column].tolist() - ids.sort() - expected_ids.sort() + if resort_ids: + ids.sort() + expected_ids.sort() assert len(ids) == len( expected_ids From 48b9fb8d9106e10cdc0888d0b51e7bcb991750e0 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 20 Nov 2023 19:10:48 -0500 Subject: [PATCH 2/3] Spacing. --- tests/hipscat_import/catalog/test_map_reduce.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 53f55404..cc1d1d07 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -461,7 +461,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): ) ## Sort option 3: by object id and time WITHOUT hipscat index. - ## The 1500 block of ids goes back to the end, because we're not using + ## The 1500 block of ids goes back to the end, because we're not using ## spatial properties for sorting, only numeric. mr.reduce_pixel_shards( cache_shard_path=os.path.join(tmp_path, "reduce_shards"), @@ -485,4 +485,3 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): [1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404, 1505], resort_ids=False, ) - \ No newline at end of file From d102816646963baf6bc90942b3585adee0a8856e Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Tue, 21 Nov 2023 09:04:44 -0500 Subject: [PATCH 3/3] Add dataframe comparisons. --- tests/.pylintrc | 2 +- .../hipscat_import/catalog/test_map_reduce.py | 43 ++++++++++++++++--- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/tests/.pylintrc b/tests/.pylintrc index ea91b009..56418441 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -333,7 +333,7 @@ indent-string=' ' max-line-length=110 # Maximum number of lines in a module. -max-module-lines=500 +max-module-lines=1000 # Allow the body of a class to be on the same line as the declaration if body # contains single statement. diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index cc1d1d07..890c2411 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -3,6 +3,7 @@ import os from io import StringIO +import healpy as hp import hipscat.pixel_math as hist import numpy as np import numpy.testing as npt @@ -380,6 +381,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): os.makedirs(os.path.join(tmp_path, "reducing")) shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11") os.makedirs(shard_dir) + output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") file1_string = """source_id,object_id,time,ra,dec 1200,700,3000,282.5,-58.5 @@ -399,7 +401,18 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): file2_data = pd.read_csv(StringIO(file2_string)) file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet")) - ## Sort option 1: by source_id + combined_data = pd.concat([file1_data, file2_data]) + combined_data["norder19_healpix"] = hp.ang2pix( + 2 ** 19, + combined_data["ra"].values, + combined_data["dec"].values, + lonlat=True, + nest=True, + ) + ## Use this to prune generated columns like Norder, Npix, and _hipscat_index + comparison_columns = ["source_id", "object_id", "time", "ra", "dec"] + + ######################## Sort option 1: by source_id ## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are ## (703, 700, 701, 702) mr.reduce_pixel_shards( @@ -416,7 +429,13 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): delete_input_files=False, ) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + ## sort order is effectively (norder19 healpix, source_id) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["norder19_healpix", "source_id"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) assert_parquet_file_ids( output_file, "source_id", @@ -431,7 +450,8 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): resort_ids=False, ) - ## Sort option 2: by object id and time + ######################## Sort option 2: by object id and time + ## sort order is effectively (norder19 healpix, object id, time) mr.reduce_pixel_shards( cache_shard_path=os.path.join(tmp_path, "reduce_shards"), resume_path=tmp_path, @@ -446,7 +466,12 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): delete_input_files=False, ) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["norder19_healpix", "object_id", "time"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) assert_parquet_file_ids( output_file, "source_id", @@ -460,9 +485,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): resort_ids=False, ) - ## Sort option 3: by object id and time WITHOUT hipscat index. + ######################## Sort option 3: by object id and time WITHOUT hipscat index. ## The 1500 block of ids goes back to the end, because we're not using ## spatial properties for sorting, only numeric. + ## sort order is effectively (object id, time) mr.reduce_pixel_shards( cache_shard_path=os.path.join(tmp_path, "reduce_shards"), resume_path=tmp_path, @@ -478,7 +504,12 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): delete_input_files=False, ) - output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["object_id", "time"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) assert_parquet_file_ids( output_file, "source_id",