Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for more complex sorting scenarios. #167

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ indent-string=' '
max-line-length=110

# Maximum number of lines in a module.
max-module-lines=500
max-module-lines=1000

# Allow the body of a class to be on the same line as the declaration if body
# contains single statement.
Expand Down
173 changes: 152 additions & 21 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Tests of map reduce operations"""

import os
from io import StringIO

import healpy as hp
import hipscat.pixel_math as hist
import numpy as np
import numpy.testing as npt
Expand Down Expand Up @@ -315,27 +317,6 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)
# expected_indexes = [
# 13598131468743213056,
# 13560933976658411520,
# 13561582046530240512,
# 13696722494273093632,
# 13588709332114997248,
# 13552942781667737600,
# 13601023174257934336,
# 13557123557418336256,
# 13591216801265483776,
# 13565852277582856192,
# 13553697461939208192,
# 13563711661973438464,
# 13590818251897569280,
# 13560168899495854080,
# 13557816572940124160,
# 13596001812279721984,
# 13564690156971098112,
# 13557377060258709504,
# ]
# assert_parquet_file_index(output_file, expected_indexes)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
assert data_frame.index.name == "_hipscat_index"
npt.assert_array_equal(
Expand Down Expand Up @@ -385,3 +366,153 @@ def test_reduce_bad_expectation(parquet_shards_dir, tmp_path):
id_column="id",
delete_input_files=False,
)


def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
"""Test reducing and requesting specific sort columns.

Logically, the input data has a mix of orderings in files, object IDs, and timestamps.
Each source is partitioned according to the linked object's radec, and so will be
ordered within the same hipscat_index value.

First, we take some time to set up these silly data points, then we test out
reducing them into a single parquet file using a mix of reduction options.
"""
os.makedirs(os.path.join(tmp_path, "reducing"))
shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11")
os.makedirs(shard_dir)
output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")

file1_string = """source_id,object_id,time,ra,dec
1200,700,3000,282.5,-58.5
1201,700,4000,282.5,-58.5
1402,702,3000,310.5,-27.5
1403,702,3100,310.5,-27.5
1404,702,3200,310.5,-27.5
1505,703,4000,286.5,-69.5"""
file1_data = pd.read_csv(StringIO(file1_string))
file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet"))

file2_string = """source_id,object_id,time,ra,dec
1206,700,2000,282.5,-58.5
1307,701,2200,299.5,-48.5
1308,701,2100,299.5,-48.5
1309,701,2000,299.5,-48.5"""
file2_data = pd.read_csv(StringIO(file2_string))
file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet"))

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
2 ** 19,
combined_data["ra"].values,
combined_data["dec"].values,
lonlat=True,
nest=True,
)
## Use this to prune generated columns like Norder, Npix, and _hipscat_index
comparison_columns = ["source_id", "object_id", "time", "ra", "dec"]

######################## Sort option 1: by source_id
## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are
## (703, 700, 701, 702)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column="source_id",
delete_input_files=False,
)

## sort order is effectively (norder19 healpix, source_id)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["norder19_healpix", "source_id"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1505, 1200, 1201, 1206, 1307, 1308, 1309, 1402, 1403, 1404],
resort_ids=False,
)

assert_parquet_file_ids(
output_file,
"object_id",
[703, 700, 700, 700, 701, 701, 701, 702, 702, 702],
resort_ids=False,
)

######################## Sort option 2: by object id and time
## sort order is effectively (norder19 healpix, object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column=["object_id", "time"],
delete_input_files=False,
)

data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["norder19_healpix", "object_id", "time"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1505, 1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404],
resort_ids=False,
)
assert_parquet_file_ids(
output_file,
"time",
[4000, 2000, 3000, 4000, 2000, 2100, 2200, 3000, 3100, 3200],
resort_ids=False,
)

######################## Sort option 3: by object id and time WITHOUT hipscat index.
## The 1500 block of ids goes back to the end, because we're not using
## spatial properties for sorting, only numeric.
## sort order is effectively (object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=10,
output_path=tmp_path,
ra_column="ra",
dec_column="dec",
id_column=["object_id", "time"],
add_hipscat_index=False,
delete_input_files=False,
)

data_frame = pd.read_parquet(output_file, engine="pyarrow")
expected_dataframe = combined_data.sort_values(["object_id", "time"])
pd.testing.assert_frame_equal(
expected_dataframe[comparison_columns].reset_index(drop=True),
data_frame[comparison_columns].reset_index(drop=True),
)
assert_parquet_file_ids(
output_file,
"source_id",
[1206, 1200, 1201, 1309, 1308, 1307, 1402, 1403, 1404, 1505],
resort_ids=False,
)
11 changes: 6 additions & 5 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def basic_data_shard_df():
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
2 ** 3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
Expand Down Expand Up @@ -230,7 +230,7 @@ def polar_data_shard_df():
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
2 ** 3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
Expand Down Expand Up @@ -279,7 +279,7 @@ def assert_text_file_matches(expected_lines, file_name):

@pytest.fixture
def assert_parquet_file_ids():
def assert_parquet_file_ids(file_name, id_column, expected_ids):
def assert_parquet_file_ids(file_name, id_column, expected_ids, resort_ids=True):
"""
Convenience method to read a parquet file and compare the object IDs to
a list of expected objects.
Expand All @@ -294,8 +294,9 @@ def assert_parquet_file_ids(file_name, id_column, expected_ids):
data_frame = pd.read_parquet(file_name, engine="pyarrow")
assert id_column in data_frame.columns
ids = data_frame[id_column].tolist()
ids.sort()
expected_ids.sort()
if resort_ids:
ids.sort()
expected_ids.sort()

assert len(ids) == len(
expected_ids
Expand Down