diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 6f30551e..84705202 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -59,7 +59,7 @@ def _map_to_margin_shards(client, args, partition_pixels, margin_pairs): partition_file=partition_file, margin_pairs=mp_future, margin_threshold=args.margin_threshold, - output_path=args.catalog_path, + output_path=args.tmp_path, margin_order=args.margin_order, ra_column=args.catalog.catalog_info.ra_column, dec_column=args.catalog.catalog_info.dec_column, @@ -83,6 +83,7 @@ def _reduce_margin_shards(client, args, partition_pixels): futures.append( client.submit( mcmr.reduce_margin_shards, + intermediate_directory=args.tmp_path, output_path=args.catalog_path, partition_order=pix.order, partition_pixel=pix.pixel, diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index 4eb6f194..9ac7743c 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -96,9 +96,9 @@ def _to_pixel_shard(data, margin_threshold, output_path, ra_column, dec_column): del data, margin_data, final_df -def reduce_margin_shards(output_path, partition_order, partition_pixel): +def reduce_margin_shards(intermediate_directory, output_path, partition_order, partition_pixel): """Reduce all partition pixel directories into a single file""" - shard_dir = get_pixel_cache_directory(output_path, HealpixPixel(partition_order, partition_pixel)) + shard_dir = get_pixel_cache_directory(intermediate_directory, HealpixPixel(partition_order, partition_pixel)) if file_io.does_file_or_directory_exist(shard_dir): data = ds.dataset(shard_dir, format="parquet") diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index 5f150d57..deec432c 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -69,7 +69,8 @@ def test_to_pixel_shard_polar(tmp_path, polar_data_shard_df): @pytest.mark.dask def test_reduce_margin_shards(tmp_path, basic_data_shard_df): - partition_dir = get_pixel_cache_directory(tmp_path, HealpixPixel(1, 21)) + intermediate_dir = os.path.join(tmp_path, "intermediate") + partition_dir = get_pixel_cache_directory(intermediate_dir, HealpixPixel(1, 21)) shard_dir = paths.pixel_directory(partition_dir, 1, 21) os.makedirs(shard_dir) @@ -82,7 +83,7 @@ def test_reduce_margin_shards(tmp_path, basic_data_shard_df): shard_df.to_parquet(first_shard_path) shard_df.to_parquet(second_shard_path) - margin_cache_map_reduce.reduce_margin_shards(tmp_path, 1, 21) + margin_cache_map_reduce.reduce_margin_shards(intermediate_dir, tmp_path, 1, 21) result_path = paths.pixel_catalog_file(tmp_path, 1, 21)