Skip to content

Commit

Permalink
Address pylint findings
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Dec 14, 2023
1 parent 1c8216e commit 7eadbc5
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 13 deletions.
7 changes: 4 additions & 3 deletions src/hipscat_import/cross_match/macauff_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hipscat_import.catalog.map_reduce import _get_pixel_directory, _iterate_input_file
from hipscat_import.catalog.resume_plan import ResumePlan

# pylint: disable=too-many-arguments,too-many-locals

def split_associations(
input_file,
Expand Down Expand Up @@ -66,12 +67,12 @@ def split_associations(
pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet"
)
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes
del data, filtered_data, data_indexes

ResumePlan.splitting_key_done(tmp_path=tmp_path, splitting_key=splitting_key)


def reduce_associations(left_pixel, tmp_path, catalog_path):
def reduce_associations(left_pixel, tmp_path, catalog_path, reduce_key):
"""For all points determined to be in the target left_pixel, map them to the appropriate right_pixel
and aggregate into a single parquet file."""
inputs = _get_pixel_directory(tmp_path, left_pixel.order, left_pixel.pixel)
Expand Down Expand Up @@ -99,4 +100,4 @@ def reduce_associations(left_pixel, tmp_path, catalog_path):
join_pixel_frame = join_pixel_frames.get_group((join_pixel.order, join_pixel.pixel)).reset_index()
writer.write_table(pa.Table.from_pandas(join_pixel_frame, schema=merged_table.schema))

ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=f"{left_pixel.order}_{left_pixel.pixel}")
ResumePlan.reducing_key_done(tmp_path=tmp_path, reducing_key=reduce_key)
14 changes: 4 additions & 10 deletions src/hipscat_import/cross_match/macauff_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Optional, Tuple
from typing import List, Tuple

import healpy as hp
import numpy as np
from hipscat import pixel_math
from hipscat.catalog import Catalog
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.pixel_tree import PixelAlignment, align_trees
from numpy import frombuffer
from tqdm import tqdm

from hipscat_import.cross_match.macauff_arguments import MacauffArguments
from hipscat_import.pipeline_resume_plan import PipelineResumePlan

# pylint: disable=duplicate-code

@dataclass
class MacauffResumePlan(PipelineResumePlan):
Expand All @@ -40,9 +34,9 @@ def __init__(self, args: MacauffArguments, left_pixels):
raise ValueError("tmp_path is required")
super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path)
self.input_paths = args.input_paths
self.gather_plan(args, left_pixels)
self.gather_plan(left_pixels)

def gather_plan(self, args, left_pixels):
def gather_plan(self, left_pixels):
"""Initialize the plan."""
## Make sure it's safe to use existing resume state.
super().safe_to_resume()
Expand Down
1 change: 1 addition & 0 deletions src/hipscat_import/cross_match/run_macauff_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def reduce(args, left_pixels, resume_plan, client):
left_pixel=left_pixel,
tmp_path=args.tmp_path,
catalog_path=args.catalog_path,
reduce_key=pixel_key,
)
)

Expand Down

0 comments on commit 7eadbc5

Please sign in to comment.