From bc50e6122e06556870f474ec0c45247a6e7a96d4 Mon Sep 17 00:00:00 2001 From: Max West Date: Tue, 6 Jun 2023 15:39:05 -0700 Subject: [PATCH 1/3] margin cache shard reduction implementation first pass --- .../margin_cache/margin_cache.py | 27 ++++++++++++++ .../margin_cache/margin_cache_map_reduce.py | 35 +++++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 59734bba..ab67345f 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -76,6 +76,27 @@ def _map_to_margin_shards(client, args, partition_pixels, margin_pairs): ): ... +def _reduce_margin_shards(client, args, partition_pixels): + """Create all the jobs for reducing margin cache shards into singular files""" + futures = [] + + for pix in partition_pixels: + futures.append( + client.submit( + mcmr.reduce_margin_shards, + output_path=args.catalog_path, + partition_order=pix.order, + partition_pixel=pix.pixel + ) + ) + + for _ in tqdm( + as_completed(futures), + desc="Reducing ", + total=len(futures), + ): + ... + def generate_margin_cache(args): """Generate a margin cache for a given input catalog. The input catalog must be in hipscat format. @@ -127,3 +148,9 @@ def generate_margin_cache_with_client(client, args): partition_pixels=partition_stats, margin_pairs=margin_pairs, ) + + _reduce_margin_shards( + client=client, + args=args, + partition_pixels=partition_stats + ) diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index ee83837d..1c3d365e 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -1,6 +1,7 @@ import healpy as hp import numpy as np import pandas as pd +import pyarrow.dataset as ds from hipscat import pixel_math from hipscat.io import file_io, paths @@ -72,8 +73,7 @@ def _to_pixel_shard(data, margin_threshold, output_path, margin_order, ra_column # TODO: this should be a utility function in `hipscat` # that properly handles the hive formatting # generate a file name for our margin shard - partition_file = paths.pixel_catalog_file(output_path, order, pix) - partition_dir = f"{partition_file[:-8]}/" + partition_dir = _get_partition_directory(output_path, order, pix) shard_dir = paths.pixel_directory( partition_dir, source_order, source_pix ) @@ -137,3 +137,34 @@ def _margin_filter_polar( # pylint: enable=singleton-comparison return data + +def _get_partition_directory(path, order, pix): + """Get the directory where a partition pixel's margin shards live""" + partition_file = paths.pixel_catalog_file(path, order, pix) + + # removes the '.parquet' and adds a slash + partition_dir = f"{partition_file[:-8]}/" + + return partition_dir + +def reduce_margin_shards(output_path, partition_order, partition_pixel): + """Reduce all partition pixel directories into a single file""" + shard_dir = _get_partition_directory( + output_path, + partition_order, + partition_pixel + ) + + if file_io.does_file_or_directory_exist(shard_dir): + data = ds.dataset(shard_dir, format="parquet", partitioning="hive") + full_df = data.to_table().to_pandas() + + if len(full_df): + margin_cache_file_path = paths.pixel_catalog_file( + output_path, + partition_order, + partition_pixel + ) + + full_df.to_parquet(margin_cache_file_path) + file_io.remove_directory(shard_dir) From f3f01b1b93b94c2bdf9ab9ad4008a7fbc2ad2ab6 Mon Sep 17 00:00:00 2001 From: Max West Date: Wed, 14 Jun 2023 23:57:25 -0700 Subject: [PATCH 2/3] refine margin_cache tests a little bit --- .../margin_cache/test_margin_cache.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache.py b/tests/hipscat_import/margin_cache/test_margin_cache.py index 06672e1a..cfdafb05 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache.py @@ -1,8 +1,9 @@ """Tests of map reduce operations""" import numpy as np import numpy.testing as npt +import pandas as pd import pytest -from hipscat.io import file_io +from hipscat.io import file_io, paths import hipscat_import.margin_cache.margin_cache as mc from hipscat_import.margin_cache import MarginCacheArguments @@ -21,8 +22,19 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): assert args.catalog.catalog_info.ra_column == "source_ra" mc.generate_margin_cache_with_client(dask_client, args) - # TODO: add more verification of output to this test once the - # reduce phase is implemented. + + print(args.catalog.partition_info.get_healpix_pixels()) + + norder = 1 + npix = 47 + + test_file = paths.pixel_catalog_file( + args.catalog_path, norder, npix + ) + + data = pd.read_parquet(test_file) + + assert len(data) == 4 def test_partition_margin_pixel_pairs(small_sky_source_catalog, tmp_path): args = MarginCacheArguments( From 6762e548cbd9ac2b641e4327ab87517371ec9698 Mon Sep 17 00:00:00 2001 From: Max West Date: Thu, 22 Jun 2023 16:05:46 -0700 Subject: [PATCH 3/3] reduce unit tests --- .../margin_cache/margin_cache_map_reduce.py | 2 +- tests/hipscat_import/conftest.py | 63 +++++++++++ .../test_margin_cache_map_reduce.py | 100 +++++++----------- 3 files changed, 103 insertions(+), 62 deletions(-) diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 1c3d365e..000395a9 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -156,7 +156,7 @@ def reduce_margin_shards(output_path, partition_order, partition_pixel): ) if file_io.does_file_or_directory_exist(shard_dir): - data = ds.dataset(shard_dir, format="parquet", partitioning="hive") + data = ds.dataset(shard_dir, format="parquet") full_df = data.to_table().to_pandas() if len(full_df): diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index 2e64d854..9e091137 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -3,6 +3,8 @@ import os import re +import healpy as hp +import numpy as np import numpy.testing as npt import pandas as pd import pytest @@ -157,6 +159,67 @@ def mixed_schema_csv_parquet(test_data_dir): def resume_dir(test_data_dir): return os.path.join(test_data_dir, "resume") +@pytest.fixture +def basic_data_shard_df(): + ras = np.arange(0.,360.) + dec = np.full(360, 0.) + ppix = np.full(360, 21) + porder = np.full(360, 1) + norder = np.full(360, 1) + npix = np.full(360, 0) + + test_df = pd.DataFrame( + data=zip(ras, dec, ppix, porder, norder, npix), + columns=[ + "weird_ra", + "weird_dec", + "partition_pixel", + "partition_order", + "Norder", + "Npix" + ] + ) + + test_df["margin_pixel"] = hp.ang2pix( + 2**3, + test_df["weird_ra"].values, + test_df["weird_dec"].values, + lonlat=True, + nest=True + ) + + return test_df + +@pytest.fixture +def polar_data_shard_df(): + ras = np.arange(0.,360.) + dec = np.full(360, 89.9) + ppix = np.full(360, 15) + porder = np.full(360, 2) + norder = np.full(360, 2) + npix = np.full(360, 0) + + test_df = pd.DataFrame( + data=zip(ras, dec, ppix, porder, norder, npix), + columns=[ + "weird_ra", + "weird_dec", + "partition_pixel", + "partition_order", + "Norder", + "Npix" + ] + ) + + test_df["margin_pixel"] = hp.ang2pix( + 2**3, + test_df["weird_ra"].values, + test_df["weird_dec"].values, + lonlat=True, + nest=True + ) + + return test_df @pytest.fixture def assert_text_file_matches(): diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index b58b7c29..9db54bf5 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -1,8 +1,6 @@ -import healpy as hp -import numpy as np import pandas as pd import pytest -from hipscat.io import file_io +from hipscat.io import file_io, paths from hipscat_import.margin_cache import margin_cache_map_reduce @@ -33,36 +31,9 @@ def validate_result_dataframe(df_path, expected_len): assert col not in cols @pytest.mark.timeout(5) -def test_to_pixel_shard_equator(tmp_path): - ras = np.arange(0.,360.) - dec = np.full(360, 0.) - ppix = np.full(360, 21) - porder = np.full(360, 1) - norder = np.full(360, 1) - npix = np.full(360, 0) - - test_df = pd.DataFrame( - data=zip(ras, dec, ppix, porder, norder, npix), - columns=[ - "weird_ra", - "weird_dec", - "partition_pixel", - "partition_order", - "Norder", - "Npix" - ] - ) - - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True - ) - +def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df): margin_cache_map_reduce._to_pixel_shard( - test_df, + basic_data_shard_df, margin_threshold=0.1, output_path=tmp_path, margin_order=3, @@ -80,36 +51,9 @@ def test_to_pixel_shard_equator(tmp_path): validate_result_dataframe(path, 46) @pytest.mark.timeout(5) -def test_to_pixel_shard_polar(tmp_path): - ras = np.arange(0.,360.) - dec = np.full(360, 89.9) - ppix = np.full(360, 15) - porder = np.full(360, 2) - norder = np.full(360, 2) - npix = np.full(360, 0) - - test_df = pd.DataFrame( - data=zip(ras, dec, ppix, porder, norder, npix), - columns=[ - "weird_ra", - "weird_dec", - "partition_pixel", - "partition_order", - "Norder", - "Npix" - ] - ) - - test_df["margin_pixel"] = hp.ang2pix( - 2**3, - test_df["weird_ra"].values, - test_df["weird_dec"].values, - lonlat=True, - nest=True - ) - +def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): margin_cache_map_reduce._to_pixel_shard( - test_df, + polar_data_shard_df, margin_threshold=0.1, output_path=tmp_path, margin_order=3, @@ -125,3 +69,37 @@ def test_to_pixel_shard_polar(tmp_path): assert file_io.does_file_or_directory_exist(path) validate_result_dataframe(path, 317) + +def test_reduce_margin_shards(tmp_path, basic_data_shard_df): + partition_dir = margin_cache_map_reduce._get_partition_directory( + tmp_path, 1, 21 + ) + shard_dir = paths.pixel_directory( + partition_dir, 1, 21 + ) + + file_io.make_directory(shard_dir, exist_ok=True) + + first_shard_path = paths.pixel_catalog_file( + partition_dir, 1, 0 + ) + second_shard_path = paths.pixel_catalog_file( + partition_dir, 1, 1 + ) + + print(first_shard_path) + + shard_df = basic_data_shard_df.drop(columns=[ + "partition_order", + "partition_pixel", + "margin_pixel" + ]) + + shard_df.to_parquet(first_shard_path) + shard_df.to_parquet(second_shard_path) + + margin_cache_map_reduce.reduce_margin_shards(tmp_path, 1, 21) + + result_path = paths.pixel_catalog_file(tmp_path, 1, 21) + + validate_result_dataframe(result_path, 720)