Skip to content

Commit

Permalink
refactor: fix config crop path fails and progress_bar
Browse files Browse the repository at this point in the history
  • Loading branch information
spool committed Sep 25, 2024
1 parent 3d3edf3 commit e42231c
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 98 deletions.
8 changes: 6 additions & 2 deletions python/clim_recal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ClimRecalConfig(BaseRunConfig):
cpm_output_folder: PathLike = CPM_OUTPUT_PATH
cpm_kwargs: dict = field(default_factory=dict)
hads_kwargs: dict = field(default_factory=dict)
cpm_crop_kwargs: dict = field(default_factory=dict)
hads_crop_kwargs: dict = field(default_factory=dict)
resample_start_index: int = 0
resample_stop_index: int | None = None
crop_start_index: int = 0
Expand Down Expand Up @@ -275,7 +277,8 @@ def __post_init__(self) -> None:
# resample_stop_index=self.resample_stop_index,
# crop_start_index=self.crop_start_index,
# crop_stop_index=self.crop_stop_index,
**self.cpm_kwargs,
check_input_paths_exist=False,
**self.cpm_crop_kwargs,
)
self.hads_crops_manager = HADsRegionCropManager(
input_paths=self.hads_output_folder,
Expand All @@ -289,7 +292,8 @@ def __post_init__(self) -> None:
# resample_stop_index=self.resample_stop_index,
# crop_start_index=self.crop_start_index,
# crop_stop_index=self.crop_stop_index,
**self.cpm_kwargs,
check_input_paths_exist=False,
**self.hads_crop_kwargs,
)
self.total_cpus: int | None = cpu_count()
if self.cpus == None or (self.total_cpus and self.cpus >= self.total_cpus):
Expand Down
89 changes: 43 additions & 46 deletions python/clim_recal/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,10 @@

from clim_recal.debiasing.debias_wrapper import VariableOptions

from .utils.core import (
_get_source_path,
climate_data_mount_path,
console,
multiprocess_execute,
)
from .utils.core import _get_source_path, climate_data_mount_path, multiprocess_execute
from .utils.data import (
CPM_END_DATE,
CPM_OUTPUT_PATH,
CPM_RAW_X_COLUMN_NAME,
CPM_RAW_Y_COLUMN_NAME,
CPM_START_DATE,
CPM_SUB_PATH,
HADS_END_DATE,
Expand Down Expand Up @@ -103,8 +96,8 @@ class ResamplerBase:
final_crs: str = BRITISH_NATIONAL_GRID_EPSG
input_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR
export_file_extension: NETCDF_OR_TIF = NETCDF_EXTENSION_STR
input_file_x_column_name: str = ""
input_file_y_column_name: str = ""
# input_file_x_column_name: str = ""
# input_file_y_column_name: str = ""
start_index: int = 0
stop_index: int | None = None
_result_paths: dict[PathLike, PathLike | None] = field(default_factory=dict)
Expand Down Expand Up @@ -229,8 +222,9 @@ def range_to_reprojection(
step: int = 1,
override_export_path: Path | None = None,
source_to_index: Sequence | None = None,
return_path: bool = False,
return_path: bool = True,
write_results: bool = True,
progress_bar: bool = True,
) -> Iterator[Path]:
# start = start or self.resample_start_index
# stop = stop or self.resample_stop_index
Expand All @@ -241,14 +235,24 @@ def range_to_reprojection(
if stop is None:
stop = len(self)
# export_path: Path = Path(override_export_path or self.output_path)
for index in track(range(start, stop, step), description="Processing..."):
yield self.to_reprojection(
index=index,
override_export_path=override_export_path,
source_to_index=source_to_index,
return_path=return_path,
write_results=write_results,
)
if progress_bar:
for index in track(range(start, stop, step), description="Processing..."):
yield self.to_reprojection(
index=index,
override_export_path=override_export_path,
source_to_index=source_to_index,
return_path=return_path,
write_results=write_results,
)
else:
for index in range(start, stop, step):
yield self.to_reprojection(
index=index,
override_export_path=override_export_path,
source_to_index=source_to_index,
return_path=return_path,
write_results=write_results,
)
# yield self.to_reprojection()
# export_paths.append(
# method(
Expand Down Expand Up @@ -488,17 +492,13 @@ def to_reprojection(
source_to_index: Sequence | None = None,
**kwargs,
) -> Path | T_Dataset:
# source_path: Path = self._get_source_path(
# index=index, source_to_index=source_to_index
# )
source_path: Path = _get_source_path(
self, index=index, source_to_index=source_to_index
)
# path PathLike = self.output_path
console.log(f"Setting 'cpm_for_coord_alignment' for {self}")
# console.log(f"Setting 'cpm_for_coord_alignment' for {self}")
self.set_cpm_for_coord_alignment()
console.log(f"Set 'cpm_for_coord_alignment' for {self}")
console.log(f"Starting HADs index {index}...")
# console.log(f"Set 'cpm_for_coord_alignment' for {self}")
# console.log(f"Starting HADs index {index}...")
result: T_Dataset = hads_resample_and_reproject(
source_path,
variable_name=self.variable_name,
Expand All @@ -507,7 +507,7 @@ def to_reprojection(
cpm_to_match=self.cpm_for_coord_alignment,
**kwargs,
)
console.log(f"Completed HADs index {index}...")
# console.log(f"Completed HADs index {index}...")
self._result_paths[source_path] = None
if write_results or return_path:
export_path: Path = override_export_path or resample_output_path(
Expand Down Expand Up @@ -556,10 +556,6 @@ class CPMResampler(ResamplerBase):
Path or file to spatially crop `input_files` with.
final_crs
Coordinate Reference System (CRS) to return final format in.
input_file_x_column_name
Column name in `input_files` or `input` for `x` coordinates.
input_file_y_column_name
Column name in `input_files` or `input` for `y` coordinates.
input_file_extension
File extensions to glob `input_files` with.
Expand Down Expand Up @@ -587,8 +583,8 @@ class CPMResampler(ResamplerBase):
input_path: PathLike | None = RAW_CPM_TASMAX_PATH
output_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_OUTPUT_PATH
# crop_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_LOCAL_PATH
input_file_x_column_name: str = CPM_RAW_X_COLUMN_NAME
input_file_y_column_name: str = CPM_RAW_Y_COLUMN_NAME
# input_file_x_column_name: str = CPM_RAW_X_COLUMN_NAME
# input_file_y_column_name: str = CPM_RAW_Y_COLUMN_NAME
prior_time_series: PathLike | Dataset | None = None
# _resample_func: ReprojectFuncType = cpm_reproject_with_standard_calendar
# _resample_path_func: Callable
Expand All @@ -606,22 +602,14 @@ def to_reprojection(
source_to_index: Sequence | None = None,
**kwargs,
) -> Path | T_Dataset:
# source_path: Path = self._get_source_path(
# index=index, source_to_index=source_to_index
# )
source_path: Path = _get_source_path(
self, index=index, source_to_index=source_to_index
)
# path: PathLike = self.output_path
console.log(f"Converting CPM index {index}...")
# export_path: Path = Path(source_path)
# if new_path_name_func:
# export_path = new_path_name_func(export_path)
# export_path = Path(export_folder) / export_path.name
# console.log(f"Converting CPM index {index}...")
result: T_Dataset = cpm_reproject_with_standard_calendar(
source_path, variable_name=self.cpm_variable_name, **kwargs
)
console.log(f"Completed CPM index {index}...")
# console.log(f"Completed CPM index {index}...")
self._result_paths[source_path] = None
if write_results or return_path:
export_path: Path = override_export_path or resample_output_path(
Expand Down Expand Up @@ -918,7 +906,8 @@ def execute_configs(
self,
multiprocess: bool = False,
cpus: int | None = None,
return_resamplers: bool = True,
return_resamplers: bool = False,
return_path: bool = True,
**kwargs,
) -> tuple[ResamplerBase, ...] | list[T_Dataset | Path]:
"""Run all resampler configurations
Expand All @@ -933,22 +922,30 @@ def execute_configs(
Return instances of generated `HADsResampler` or
`CPMResampler`, or return the `results` of each
`execute` call.
return_path
Return `Path` to results object if True, else resampled `Dataset`.
kwargs
Parameters to path to sampler `execute` calls.
"""
resamplers: tuple[ResamplerBase, ...] = tuple(self.yield_configs())
results: list[tuple[Path, ...]] = []
if multiprocess:
# if kwargs:
# console.print("Note: **kwargs passed ignored if `multiprocess` enabled")
cpus = cpus or self.cpus
if self.total_cpus and cpus:
cpus = min(cpus, self.total_cpus - 1)
results = multiprocess_execute(
resamplers, method_name="execute", cpus=cpus, **kwargs
resamplers,
cpus=cpus,
return_path=return_path,
sub_process_progress_bar=False,
**kwargs,
)
else:
for resampler in resamplers:
print(resampler)
results.append(resampler.execute(**kwargs))
results.append(resampler.execute(return_path=return_path, **kwargs))
if return_resamplers:
return resamplers
else:
Expand Down
Loading

0 comments on commit e42231c

Please sign in to comment.