diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 504ea060..ddb2d0a4 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -75,6 +75,26 @@ def _map_to_margin_shards(client, args, partition_pixels, margin_pairs): ): ... +def _reduce_margin_shards(client, args, partition_pixels): + """Create all the jobs for reducing margin cache shards into singular files""" + futures = [] + + for pix in partition_pixels: + futures.append( + client.submit( + mcmr.reduce_margin_shards, + output_path=args.catalog_path, + partition_order=pix.order, + partition_pixel=pix.pixel + ) + ) + + for _ in tqdm( + as_completed(futures), + desc="Reducing ", + total=len(futures), + ): + ... def generate_margin_cache(args, client): """Generate a margin cache for a given input catalog. @@ -103,3 +123,9 @@ def generate_margin_cache(args, client): partition_pixels=partition_stats, margin_pairs=margin_pairs, ) + + _reduce_margin_shards( + client=client, + args=args, + partition_pixels=partition_stats + ) diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 8ff13c41..63336d10 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -1,6 +1,7 @@ import healpy as hp import numpy as np import pandas as pd +import pyarrow.dataset as ds from hipscat import pixel_math from hipscat.io import file_io, paths @@ -74,9 +75,10 @@ def _to_pixel_shard( # TODO: this should be a utility function in `hipscat` # that properly handles the hive formatting # generate a file name for our margin shard - partition_file = paths.pixel_catalog_file(output_path, order, pix) - partition_dir = f"{partition_file[:-8]}/" - shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix) + partition_dir = _get_partition_directory(output_path, order, pix) + shard_dir = paths.pixel_directory( + partition_dir, source_order, source_pix + ) file_io.make_directory(shard_dir, exist_ok=True) @@ -134,3 +136,34 @@ def _margin_filter_polar( # pylint: enable=singleton-comparison return data + +def _get_partition_directory(path, order, pix): + """Get the directory where a partition pixel's margin shards live""" + partition_file = paths.pixel_catalog_file(path, order, pix) + + # removes the '.parquet' and adds a slash + partition_dir = f"{partition_file[:-8]}/" + + return partition_dir + +def reduce_margin_shards(output_path, partition_order, partition_pixel): + """Reduce all partition pixel directories into a single file""" + shard_dir = _get_partition_directory( + output_path, + partition_order, + partition_pixel + ) + + if file_io.does_file_or_directory_exist(shard_dir): + data = ds.dataset(shard_dir, format="parquet") + full_df = data.to_table().to_pandas() + + if len(full_df): + margin_cache_file_path = paths.pixel_catalog_file( + output_path, + partition_order, + partition_pixel + ) + + full_df.to_parquet(margin_cache_file_path) + file_io.remove_directory(shard_dir) diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index fadeb841..fe2d4e9c 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -3,6 +3,8 @@ import os import re +import healpy as hp +import numpy as np import numpy.testing as npt import pandas as pd import pytest @@ -162,6 +164,67 @@ def mixed_schema_csv_parquet(test_data_dir): def resume_dir(test_data_dir): return os.path.join(test_data_dir, "resume") +@pytest.fixture +def basic_data_shard_df(): + ras = np.arange(0.,360.) + dec = np.full(360, 0.) + ppix = np.full(360, 21) + porder = np.full(360, 1) + norder = np.full(360, 1) + npix = np.full(360, 0) + + test_df = pd.DataFrame( + data=zip(ras, dec, ppix, porder, norder, npix), + columns=[ + "weird_ra", + "weird_dec", + "partition_pixel", + "partition_order", + "Norder", + "Npix" + ] + ) + + test_df["margin_pixel"] = hp.ang2pix( + 2**3, + test_df["weird_ra"].values, + test_df["weird_dec"].values, + lonlat=True, + nest=True + ) + + return test_df + +@pytest.fixture +def polar_data_shard_df(): + ras = np.arange(0.,360.) + dec = np.full(360, 89.9) + ppix = np.full(360, 15) + porder = np.full(360, 2) + norder = np.full(360, 2) + npix = np.full(360, 0) + + test_df = pd.DataFrame( + data=zip(ras, dec, ppix, porder, norder, npix), + columns=[ + "weird_ra", + "weird_dec", + "partition_pixel", + "partition_order", + "Norder", + "Npix" + ] + ) + + test_df["margin_pixel"] = hp.ang2pix( + 2**3, + test_df["weird_ra"].values, + test_df["weird_dec"].values, + lonlat=True, + nest=True + ) + + return test_df @pytest.fixture def assert_text_file_matches(): diff --git a/tests/hipscat_import/margin_cache/test_margin_cache.py b/tests/hipscat_import/margin_cache/test_margin_cache.py index 333acb02..56f13389 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache.py @@ -1,8 +1,9 @@ """Tests of map reduce operations""" import numpy as np import numpy.testing as npt +import pandas as pd import pytest -from hipscat.io import file_io +from hipscat.io import file_io, paths import hipscat_import.margin_cache.margin_cache as mc from hipscat_import.margin_cache import MarginCacheArguments @@ -23,9 +24,19 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): assert args.catalog.catalog_info.ra_column == "source_ra" mc.generate_margin_cache(args, dask_client) - # TODO: add more verification of output to this test once the - # reduce phase is implemented. + print(args.catalog.partition_info.get_healpix_pixels()) + + norder = 1 + npix = 47 + + test_file = paths.pixel_catalog_file( + args.catalog_path, norder, npix + ) + + data = pd.read_parquet(test_file) + + assert len(data) == 4 def test_partition_margin_pixel_pairs(small_sky_source_catalog, tmp_path): args = MarginCacheArguments( diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 403302ed..e5ae1d47 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -1,8 +1,6 @@ -import healpy as hp -import numpy as np import pandas as pd import pytest -from hipscat.io import file_io +from hipscat.io import file_io, paths from hipscat_import.margin_cache import margin_cache_map_reduce @@ -30,38 +28,10 @@ def validate_result_dataframe(df_path, expected_len): for col in drop_cols: assert col not in cols - @pytest.mark.timeout(5) -def test_to_pixel_shard_equator(tmp_path): - ras = np.arange(0.0, 360.0) - dec = np.full(360, 0.0) - ppix = np.full(360, 21) - porder = np.full(360, 1) - norder = np.full(360, 1) - npix = np.full(360, 0) - - test_df = pd.DataFrame( - data=zip(ras, dec, ppix, porder, norder, npix), - columns=[ - "weird_ra", - "weird_dec", - "partition_pixel", - "partition_order", - "Norder", - "Npix", - ], - ) - - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True, - ) - +def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): margin_cache_map_reduce._to_pixel_shard( - test_df, + basic_data_shard_df, margin_threshold=0.1, output_path=tmp_path, margin_order=3, @@ -77,38 +47,10 @@ def test_to_pixel_shard_equator(tmp_path): validate_result_dataframe(path, 46) - @pytest.mark.timeout(5) -def test_to_pixel_shard_polar(tmp_path): - ras = np.arange(0.0, 360.0) - dec = np.full(360, 89.9) - ppix = np.full(360, 15) - porder = np.full(360, 2) - norder = np.full(360, 2) - npix = np.full(360, 0) - - test_df = pd.DataFrame( - data=zip(ras, dec, ppix, porder, norder, npix), - columns=[ - "weird_ra", - "weird_dec", - "partition_pixel", - "partition_order", - "Norder", - "Npix", - ], - ) - - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True, - ) - +def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): margin_cache_map_reduce._to_pixel_shard( - test_df, + polar_data_shard_df, margin_threshold=0.1, output_path=tmp_path, margin_order=3, @@ -123,3 +65,37 @@ def test_to_pixel_shard_polar(tmp_path): assert file_io.does_file_or_directory_exist(path) validate_result_dataframe(path, 317) + +def test_reduce_margin_shards(tmp_path, basic_data_shard_df): + partition_dir = margin_cache_map_reduce._get_partition_directory( + tmp_path, 1, 21 + ) + shard_dir = paths.pixel_directory( + partition_dir, 1, 21 + ) + + file_io.make_directory(shard_dir, exist_ok=True) + + first_shard_path = paths.pixel_catalog_file( + partition_dir, 1, 0 + ) + second_shard_path = paths.pixel_catalog_file( + partition_dir, 1, 1 + ) + + print(first_shard_path) + + shard_df = basic_data_shard_df.drop(columns=[ + "partition_order", + "partition_pixel", + "margin_pixel" + ]) + + shard_df.to_parquet(first_shard_path) + shard_df.to_parquet(second_shard_path) + + margin_cache_map_reduce.reduce_margin_shards(tmp_path, 1, 21) + + result_path = paths.pixel_catalog_file(tmp_path, 1, 21) + + validate_result_dataframe(result_path, 720)