diff --git a/src/hipscat_import/cross_match/macauff_map_reduce.py b/src/hipscat_import/cross_match/macauff_map_reduce.py index 35d54d45..a83b7c07 100644 --- a/src/hipscat_import/cross_match/macauff_map_reduce.py +++ b/src/hipscat_import/cross_match/macauff_map_reduce.py @@ -9,6 +9,7 @@ from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file from hipscat_import.catalog.resume_plan import ResumePlan +# pylint: disable=too-many-arguments,too-many-locals def split_associations( input_file, @@ -66,12 +67,12 @@ def split_associations( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" ) filtered_data.to_parquet(output_file, index=False) - del filtered_data, data_indexes + del data, filtered_data, data_indexes ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key) -def reduce_associations(left_pixel, tmp_path, catalog_path): +def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key): """For all points determined to be in the target left_pixel, map them to the appropriate right_pixel and aggregate into a single parquet file.""" inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel) @@ -99,4 +100,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path): join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index() writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema)) - ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}") + ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key) diff --git a/src/hipscat_import/cross_match/macauff_resume_plan.py b/src/hipscat_import/cross_match/macauff_resume_plan.py index ce020c8d..cf481cbd 100644 --- a/src/hipscat_import/cross_match/macauff_resume_plan.py +++ b/src/hipscat_import/cross_match/macauff_resume_plan.py @@ -3,21 +3,15 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import List, Optional, Tuple +from typing import List, Tuple -import healpy as hp -import numpy as np -from hipscat import pixel_math -from hipscat.catalog import Catalog from hipscat.io import FilePointer, file_io from hipscat.pixel_math.healpix_pixel import HealpixPixel -from hipscat.pixel_tree import PixelAlignment, align_trees -from numpy import frombuffer -from tqdm import tqdm from hipscat_import.cross_match.macauff_arguments import MacauffArguments from hipscat_import.pipeline_resume_plan import PipelineResumePlan +# pylint: disable=duplicate-code @dataclass class MacauffResumePlan(PipelineResumePlan): @@ -40,9 +34,9 @@ def __init__(self, args: MacauffArguments, left_pixels): raise ValueError("tmp_path is required") super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) self.input_paths = args.input_paths - self.gather_plan(args, left_pixels) + self.gather_plan(left_pixels) - def gather_plan(self, args, left_pixels): + def gather_plan(self, left_pixels): """Initialize the plan.""" ## Make sure it's safe to use existing resume state. super().safe_to_resume() diff --git a/src/hipscat_import/cross_match/run_macauff_import.py b/src/hipscat_import/cross_match/run_macauff_import.py index ecc503a7..35424058 100644 --- a/src/hipscat_import/cross_match/run_macauff_import.py +++ b/src/hipscat_import/cross_match/run_macauff_import.py @@ -67,6 +67,7 @@ def reduce(args, left_pixels, resume_plan, client): left_pixel=left_pixel, tmp_path=args.tmp_path, catalog_path=args.catalog_path, + reduce_key=pixel_key, ) )