Skip to content

Commit

Permalink
Merge pull request #94 from astronomy-commons/max/margin_cache_reduce
Browse files Browse the repository at this point in the history
Margin cache generation reduce phase
  • Loading branch information
maxwest-uw committed Jun 23, 2023
2 parents 328b9fc + 330d8a3 commit d6c0d77
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 69 deletions.
26 changes: 26 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ def _map_to_margin_shards(client, args, partition_pixels, margin_pairs):
):
...

def _reduce_margin_shards(client, args, partition_pixels):
"""Create all the jobs for reducing margin cache shards into singular files"""
futures = []

for pix in partition_pixels:
futures.append(
client.submit(
mcmr.reduce_margin_shards,
output_path=args.catalog_path,
partition_order=pix.order,
partition_pixel=pix.pixel
)
)

for _ in tqdm(
as_completed(futures),
desc="Reducing ",
total=len(futures),
):
...

def generate_margin_cache(args, client):
"""Generate a margin cache for a given input catalog.
Expand Down Expand Up @@ -103,3 +123,9 @@ def generate_margin_cache(args, client):
partition_pixels=partition_stats,
margin_pairs=margin_pairs,
)

_reduce_margin_shards(
client=client,
args=args,
partition_pixels=partition_stats
)
39 changes: 36 additions & 3 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import healpy as hp
import numpy as np
import pandas as pd
import pyarrow.dataset as ds
from hipscat import pixel_math
from hipscat.io import file_io, paths

Expand Down Expand Up @@ -74,9 +75,10 @@ def _to_pixel_shard(
# TODO: this should be a utility function in `hipscat`
# that properly handles the hive formatting
# generate a file name for our margin shard
partition_file = paths.pixel_catalog_file(output_path, order, pix)
partition_dir = f"{partition_file[:-8]}/"
shard_dir = paths.pixel_directory(partition_dir, source_order, source_pix)
partition_dir = _get_partition_directory(output_path, order, pix)
shard_dir = paths.pixel_directory(
partition_dir, source_order, source_pix
)

file_io.make_directory(shard_dir, exist_ok=True)

Expand Down Expand Up @@ -134,3 +136,34 @@ def _margin_filter_polar(
# pylint: enable=singleton-comparison

return data

def _get_partition_directory(path, order, pix):
"""Get the directory where a partition pixel's margin shards live"""
partition_file = paths.pixel_catalog_file(path, order, pix)

# removes the '.parquet' and adds a slash
partition_dir = f"{partition_file[:-8]}/"

return partition_dir

def reduce_margin_shards(output_path, partition_order, partition_pixel):
"""Reduce all partition pixel directories into a single file"""
shard_dir = _get_partition_directory(
output_path,
partition_order,
partition_pixel
)

if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
full_df = data.to_table().to_pandas()

if len(full_df):
margin_cache_file_path = paths.pixel_catalog_file(
output_path,
partition_order,
partition_pixel
)

full_df.to_parquet(margin_cache_file_path)
file_io.remove_directory(shard_dir)
63 changes: 63 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import re

import healpy as hp
import numpy as np
import numpy.testing as npt
import pandas as pd
import pytest
Expand Down Expand Up @@ -162,6 +164,67 @@ def mixed_schema_csv_parquet(test_data_dir):
def resume_dir(test_data_dir):
return os.path.join(test_data_dir, "resume")

@pytest.fixture
def basic_data_shard_df():
ras = np.arange(0.,360.)
dec = np.full(360, 0.)
ppix = np.full(360, 21)
porder = np.full(360, 1)
norder = np.full(360, 1)
npix = np.full(360, 0)

test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix"
]
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True
)

return test_df

@pytest.fixture
def polar_data_shard_df():
ras = np.arange(0.,360.)
dec = np.full(360, 89.9)
ppix = np.full(360, 15)
porder = np.full(360, 2)
norder = np.full(360, 2)
npix = np.full(360, 0)

test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix"
]
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True
)

return test_df

@pytest.fixture
def assert_text_file_matches():
Expand Down
17 changes: 14 additions & 3 deletions tests/hipscat_import/margin_cache/test_margin_cache.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Tests of map reduce operations"""
import numpy as np
import numpy.testing as npt
import pandas as pd
import pytest
from hipscat.io import file_io
from hipscat.io import file_io, paths

import hipscat_import.margin_cache.margin_cache as mc
from hipscat_import.margin_cache import MarginCacheArguments
Expand All @@ -23,9 +24,19 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client):
assert args.catalog.catalog_info.ra_column == "source_ra"

mc.generate_margin_cache(args, dask_client)
# TODO: add more verification of output to this test once the
# reduce phase is implemented.

print(args.catalog.partition_info.get_healpix_pixels())

norder = 1
npix = 47

test_file = paths.pixel_catalog_file(
args.catalog_path, norder, npix
)

data = pd.read_parquet(test_file)

assert len(data) == 4

def test_partition_margin_pixel_pairs(small_sky_source_catalog, tmp_path):
args = MarginCacheArguments(
Expand Down
102 changes: 39 additions & 63 deletions tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import healpy as hp
import numpy as np
import pandas as pd
import pytest
from hipscat.io import file_io
from hipscat.io import file_io, paths

from hipscat_import.margin_cache import margin_cache_map_reduce

Expand Down Expand Up @@ -30,38 +28,10 @@ def validate_result_dataframe(df_path, expected_len):
for col in drop_cols:
assert col not in cols


@pytest.mark.timeout(5)
def test_to_pixel_shard_equator(tmp_path):
ras = np.arange(0.0, 360.0)
dec = np.full(360, 0.0)
ppix = np.full(360, 21)
porder = np.full(360, 1)
norder = np.full(360, 1)
npix = np.full(360, 0)

test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix",
],
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True,
)

def test_to_pixel_shard_equator(tmp_path, basic_data_shard_df):
margin_cache_map_reduce._to_pixel_shard(
test_df,
basic_data_shard_df,
margin_threshold=0.1,
output_path=tmp_path,
margin_order=3,
Expand All @@ -77,38 +47,10 @@ def test_to_pixel_shard_equator(tmp_path):

validate_result_dataframe(path, 46)


@pytest.mark.timeout(5)
def test_to_pixel_shard_polar(tmp_path):
ras = np.arange(0.0, 360.0)
dec = np.full(360, 89.9)
ppix = np.full(360, 15)
porder = np.full(360, 2)
norder = np.full(360, 2)
npix = np.full(360, 0)

test_df = pd.DataFrame(
data=zip(ras, dec, ppix, porder, norder, npix),
columns=[
"weird_ra",
"weird_dec",
"partition_pixel",
"partition_order",
"Norder",
"Npix",
],
)

test_df["margin_pixel"] = hp.ang2pix(
2**3,
test_df["weird_ra"].values,
test_df["weird_dec"].values,
lonlat=True,
nest=True,
)

def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df):
margin_cache_map_reduce._to_pixel_shard(
test_df,
polar_data_shard_df,
margin_threshold=0.1,
output_path=tmp_path,
margin_order=3,
Expand All @@ -123,3 +65,37 @@ def test_to_pixel_shard_polar(tmp_path):
assert file_io.does_file_or_directory_exist(path)

validate_result_dataframe(path, 317)

def test_reduce_margin_shards(tmp_path, basic_data_shard_df):
partition_dir = margin_cache_map_reduce._get_partition_directory(
tmp_path, 1, 21
)
shard_dir = paths.pixel_directory(
partition_dir, 1, 21
)

file_io.make_directory(shard_dir, exist_ok=True)

first_shard_path = paths.pixel_catalog_file(
partition_dir, 1, 0
)
second_shard_path = paths.pixel_catalog_file(
partition_dir, 1, 1
)

print(first_shard_path)

shard_df = basic_data_shard_df.drop(columns=[
"partition_order",
"partition_pixel",
"margin_pixel"
])

shard_df.to_parquet(first_shard_path)
shard_df.to_parquet(second_shard_path)

margin_cache_map_reduce.reduce_margin_shards(tmp_path, 1, 21)

result_path = paths.pixel_catalog_file(tmp_path, 1, 21)

validate_result_dataframe(result_path, 720)

0 comments on commit d6c0d77

Please sign in to comment.