diff --git a/tests/.pylintrc b/tests/.pylintrc index ea91b009..56418441 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -333,7 +333,7 @@ indent-string=' ' max-line-length=110 # Maximum number of lines in a module. -max-module-lines=500 +max-module-lines=1000 # Allow the body of a class to be on the same line as the declaration if body # contains single statement. diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 6e2a5e9d..890c2411 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -1,7 +1,9 @@ """Tests of map reduce operations""" import os +from io import StringIO +import healpy as hp import hipscat.pixel_math as hist import numpy as np import numpy.testing as npt @@ -315,27 +317,6 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) - # expected_indexes = [ - # 13598131468743213056, - # 13560933976658411520, - # 13561582046530240512, - # 13696722494273093632, - # 13588709332114997248, - # 13552942781667737600, - # 13601023174257934336, - # 13557123557418336256, - # 13591216801265483776, - # 13565852277582856192, - # 13553697461939208192, - # 13563711661973438464, - # 13590818251897569280, - # 13560168899495854080, - # 13557816572940124160, - # 13596001812279721984, - # 13564690156971098112, - # 13557377060258709504, - # ] - # assert_parquet_file_index(output_file, expected_indexes) data_frame = pd.read_parquet(output_file, engine="pyarrow") assert data_frame.index.name == "_hipscat_index" npt.assert_array_equal( @@ -385,3 +366,153 @@ def test_reduce_bad_expectation(parquet_shards_dir, tmp_path): id_column="id", delete_input_files=False, ) + + +def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path): + """Test reducing and requesting specific sort columns. + + Logically, the input data has a mix of orderings in files, object IDs, and timestamps. + Each source is partitioned according to the linked object's radec, and so will be + ordered within the same hipscat_index value. + + First, we take some time to set up these silly data points, then we test out + reducing them into a single parquet file using a mix of reduction options. + """ + os.makedirs(os.path.join(tmp_path, "reducing")) + shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11") + os.makedirs(shard_dir) + output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet") + + file1_string = """source_id,object_id,time,ra,dec +1200,700,3000,282.5,-58.5 +1201,700,4000,282.5,-58.5 +1402,702,3000,310.5,-27.5 +1403,702,3100,310.5,-27.5 +1404,702,3200,310.5,-27.5 +1505,703,4000,286.5,-69.5""" + file1_data = pd.read_csv(StringIO(file1_string)) + file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet")) + + file2_string = """source_id,object_id,time,ra,dec +1206,700,2000,282.5,-58.5 +1307,701,2200,299.5,-48.5 +1308,701,2100,299.5,-48.5 +1309,701,2000,299.5,-48.5""" + file2_data = pd.read_csv(StringIO(file2_string)) + file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet")) + + combined_data = pd.concat([file1_data, file2_data]) + combined_data["norder19_healpix"] = hp.ang2pix( + 2 ** 19, + combined_data["ra"].values, + combined_data["dec"].values, + lonlat=True, + nest=True, + ) + ## Use this to prune generated columns like Norder, Npix, and _hipscat_index + comparison_columns = ["source_id", "object_id", "time", "ra", "dec"] + + ######################## Sort option 1: by source_id + ## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are + ## (703, 700, 701, 702) + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column="source_id", + delete_input_files=False, + ) + + ## sort order is effectively (norder19 healpix, source_id) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["norder19_healpix", "source_id"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) + assert_parquet_file_ids( + output_file, + "source_id", + [1505, 1200, 1201, 1206, 1307, 1308, 1309, 1402, 1403, 1404], + resort_ids=False, + ) + + assert_parquet_file_ids( + output_file, + "object_id", + [703, 700, 700, 700, 701, 701, 701, 702, 702, 702], + resort_ids=False, + ) + + ######################## Sort option 2: by object id and time + ## sort order is effectively (norder19 healpix, object id, time) + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column=["object_id", "time"], + delete_input_files=False, + ) + + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["norder19_healpix", "object_id", "time"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) + assert_parquet_file_ids( + output_file, + "source_id", + [1505, 1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404], + resort_ids=False, + ) + assert_parquet_file_ids( + output_file, + "time", + [4000, 2000, 3000, 4000, 2000, 2100, 2200, 3000, 3100, 3200], + resort_ids=False, + ) + + ######################## Sort option 3: by object id and time WITHOUT hipscat index. + ## The 1500 block of ids goes back to the end, because we're not using + ## spatial properties for sorting, only numeric. + ## sort order is effectively (object id, time) + mr.reduce_pixel_shards( + cache_shard_path=os.path.join(tmp_path, "reduce_shards"), + resume_path=tmp_path, + reducing_key="0_11", + destination_pixel_order=0, + destination_pixel_number=11, + destination_pixel_size=10, + output_path=tmp_path, + ra_column="ra", + dec_column="dec", + id_column=["object_id", "time"], + add_hipscat_index=False, + delete_input_files=False, + ) + + data_frame = pd.read_parquet(output_file, engine="pyarrow") + expected_dataframe = combined_data.sort_values(["object_id", "time"]) + pd.testing.assert_frame_equal( + expected_dataframe[comparison_columns].reset_index(drop=True), + data_frame[comparison_columns].reset_index(drop=True), + ) + assert_parquet_file_ids( + output_file, + "source_id", + [1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404, 1505], + resort_ids=False, + ) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 3bc386f8..91c1ba6b 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -198,7 +198,7 @@ def basic_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2**3, + 2 ** 3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, @@ -230,7 +230,7 @@ def polar_data_shard_df(): ) test_df["margin_pixel"] = hp.ang2pix( - 2**3, + 2 ** 3, test_df["weird_ra"].values, test_df["weird_dec"].values, lonlat=True, @@ -279,7 +279,7 @@ def assert_text_file_matches(expected_lines, file_name): @pytest.fixture def assert_parquet_file_ids(): - def assert_parquet_file_ids(file_name, id_column, expected_ids): + def assert_parquet_file_ids(file_name, id_column, expected_ids, resort_ids=True): """ Convenience method to read a parquet file and compare the object IDs to a list of expected objects. @@ -294,8 +294,9 @@ def assert_parquet_file_ids(file_name, id_column, expected_ids): data_frame = pd.read_parquet(file_name, engine="pyarrow") assert id_column in data_frame.columns ids = data_frame[id_column].tolist() - ids.sort() - expected_ids.sort() + if resort_ids: + ids.sort() + expected_ids.sort() assert len(ids) == len( expected_ids