diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 1fff4eff..ec4a6003 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -173,7 +173,7 @@ def split_pixels( pixel_dir = _get_pixel_directory(cache_shard_path, order, pixel) file_io.make_directory(pixel_dir, exist_ok=True) output_file = file_io.append_paths_to_pointer( - pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" + pixel_dir, f"shard_{splitting_key}_{chunk_number}_{unique_index}.parquet" ) if _has_named_index(filtered_data): filtered_data.to_parquet(output_file, index=True) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index edfe4d70..a327381c 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -264,13 +264,40 @@ def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_ alignment=alignment, ) - file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_11", "shard_0_0.parquet") + file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_11", "shard_0_0_0.parquet") expected_ids = [*range(700, 708)] assert_parquet_file_ids(file_name, "object_id", expected_ids) - file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_1", "shard_0_0.parquet") + file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_1", "shard_0_0_0.parquet") assert not os.path.exists(file_name) +def test_split_pixels_same_destination(same_destination_csv, tmp_path): + """Test splitting a file where data points fall in different high order pixels + but have the same destination pixel.""" + os.makedirs(os.path.join(tmp_path, "splitting")) + alignment = np.full(hp.nside2npix(2**10), None) + alignment[1306154] = (4, 318, 1) + alignment[1305941] = (4, 318, 2) + mr.split_pixels( + input_file=same_destination_csv, + file_reader=get_file_reader("csv"), + highest_order=10, + ra_column="CatWISE_RA", + dec_column="CatWISE_Dec", + splitting_key="0", + cache_shard_path=tmp_path, + resume_path=tmp_path, + alignment=alignment + ) + + file_name1 = os.path.join(tmp_path, "order_4", "dir_0", "pixel_318", "shard_0_0_0.parquet") + assert os.path.exists(file_name1) + + file_name2 = os.path.join(tmp_path, "order_4", "dir_0", "pixel_318", "shard_0_0_1.parquet") + assert os.path.exists(file_name2) + + merged_df = pa.parquet.read_table(os.path.join(tmp_path, "order_4", "dir_0", "pixel_318")) + assert len(merged_df) == 3 def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path): """Test reducing into one large pixel""" diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 91c1ba6b..d7cd12ef 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -98,6 +98,10 @@ def empty_data_dir(test_data_dir): def macauff_data_dir(test_data_dir): return os.path.join(test_data_dir, "macauff") +@pytest.fixture +def same_destination_csv(test_data_dir): + return os.path.join(test_data_dir, "macauff", "same_destination.csv") + @pytest.fixture def formats_dir(test_data_dir): diff --git a/tests/hipscat_import/data/macauff/same_destination.csv b/tests/hipscat_import/data/macauff/same_destination.csv new file mode 100644 index 00000000..9d39d831 --- /dev/null +++ b/tests/hipscat_import/data/macauff/same_destination.csv @@ -0,0 +1,4 @@ +Gaia_designation,Gaia_RA,Gaia_Dec,CatWISE_Name,CatWISE_RA,CatWISE_Dec +Gaia DR3 718066137586181888,131.149007,36.811269,J084435.76+364840.6,131.149020,36.811303 +Gaia DR3 717948799080732160,133.580994,37.158579,J085419.43+370931.1,133.580985,37.158652 +Gaia DR3 717948799080732160,133.580994,37.158579,J085419.43+370931.1,133.580985,37.158652 \ No newline at end of file