Skip to content

Commit

Permalink
Merge pull request #165 from alan-turing-institute/andy-server-script
Browse files Browse the repository at this point in the history
Add wrapper script to run the pipeline in batches
  • Loading branch information
griff-rees authored Sep 16, 2024
2 parents d89712c + fc1cfe9 commit 9b21737
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 30 deletions.
28 changes: 28 additions & 0 deletions bash/run-pipeline-iteratively.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
set -e
set -x

# Start-index goes from 1
max_index=500

# Input and output paths
hads_input_path="/datadrive/HadsUKgrid/"
cpm_input_path="/datadrive/UKCP2.2/"
output_path="/datadrive/clim-recal-results/group_run_`date +%F-%H-%M`"
log_path="$output_path/logs"

mkdir -p $log_path

for i in $(seq 0 $max_index); do
echo "Running for index={$i}"
{
clim-recal \
--start-index $i \
--total-from-index 1 \
--hads-input-path $hads_input_path \
--cpm-input-path $cpm_input_path \
--output-path $output_path \
--execute
} 2>&1 | tee $log_path/log_$i.txt

done
1 change: 1 addition & 0 deletions compose/ua-linux-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- NB_GID=1001
volumes:
- /mnt/vmfileshare:/mnt/vmfileshare
- /datadrive:/datadrive

rstudio:
volumes:
Expand Down
12 changes: 8 additions & 4 deletions python/clim_recal/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ def pipeline(
all_runs: Annotated[bool, typer.Option("--all-runs")] = False,
default_runs: Annotated[bool, typer.Option("--default-runs")] = False,
all_methods: Annotated[bool, typer.Option("--all-methods")] = False,
cpm_projection: Annotated[bool, typer.Option("--project-cpm")] = True,
hads_projection: Annotated[bool, typer.Option("--project-hads")] = True,
crop_hads: Annotated[bool, typer.Option("--crop-cpm")] = True,
crop_cpm: Annotated[bool, typer.Option("--crop-hads")] = True,
cpm_projection: Annotated[
bool, typer.Option("--project-cpm/--skip-project-cpm")
] = True,
hads_projection: Annotated[
bool, typer.Option("--project-hads/--skip-project-hads")
] = True,
crop_hads: Annotated[bool, typer.Option("--crop-hads/--skip-crop-hads")] = True,
crop_cpm: Annotated[bool, typer.Option("--crop-cpm/--skip-crop-cpm")] = True,
execute: Annotated[bool, typer.Option("--execute")] = False,
start_index: Annotated[int, typer.Option("--start-index", "-s", min=0)] = 0,
total: Annotated[int, typer.Option("--total-from-index", "-t", min=0)] = 0,
Expand Down
44 changes: 18 additions & 26 deletions python/clim_recal/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
from typing import Any, Callable, Final, Iterable, Iterator, Literal, Sequence

import dill as pickle
import numpy as np
import rioxarray # nopycln: import
from osgeo.gdal import Dataset as GDALDataset
from osgeo.gdal import GRA_NearestNeighbour
from rich import print
from tqdm.rich import trange
from xarray import Dataset
Expand Down Expand Up @@ -59,33 +57,17 @@
ChangeDayType = set[tuple[int, int]]

CLIMATE_DATA_MOUNT_PATH: Path = climate_data_mount_path()
DEFAULT_INTERPOLATION_METHOD: str = "linear"
"""Default method to infer missing estimates in a time series."""

CFCalendarSTANDARD: Final[str] = "standard"

RESAMPLING_OUTPUT_PATH: Final[PathLike] = (
CLIMATE_DATA_MOUNT_PATH / "CPM-365/andys-two-gdal-step-approach/resample"
)
CROP_OUTPUT_PATH: Final[PathLike] = (
CLIMATE_DATA_MOUNT_PATH / "CPM-365/andys-two-gdal-step-approach/crop"
)
RAW_HADS_PATH: Final[PathLike] = CLIMATE_DATA_MOUNT_PATH / "Raw/HadsUKgrid"
RAW_CPM_PATH: Final[PathLike] = CLIMATE_DATA_MOUNT_PATH / "Raw/UKCP2.2"
RAW_HADS_TASMAX_PATH: Final[PathLike] = RAW_HADS_PATH / "tasmax/day"
RAW_CPM_TASMAX_PATH: Final[PathLike] = RAW_CPM_PATH / "tasmax/01/latest"

# TODO: remove REPROJECTED_CPM_TASMAX_05_LATEST_INPUT_PATH
REPROJECTED_CPM_TASMAX_05_LATEST_INPUT_PATH: Final[PathLike] = Path(
CLIMATE_DATA_MOUNT_PATH / "Reprojected_infill/UKCP2.2/tasmax/05/latest"
)

CPRUK_RESAMPLING_METHOD: Final[str] = GRA_NearestNeighbour
ResamplingArgs = tuple[PathLike, np.ndarray, np.ndarray, PathLike]
ResamplingCallable = Callable[[list | tuple], int]
CPM_STANDARD_CALENDAR_PATH: Final[Path] = Path("cpm-standard-calendar")
CPM_SPATIAL_COORDS_PATH: Final[Path] = Path("cpm-to-27700-spatial")

CPM_OUTPUT_LOCAL_PATH: Final[Path] = Path("cpm")
HADS_OUTPUT_LOCAL_PATH: Final[Path] = Path("hads")
CPM_CROP_OUTPUT_LOCAL_PATH: Final[Path] = Path("cpm-crop")
Expand Down Expand Up @@ -283,6 +265,9 @@ def range_crop_projection(
step: int = 1,
override_export_path: Path | None = None,
return_results: bool = False,
# possible meanse of reducing memory issues by removing
# xarray instance while keeping paths for logging purposes
# delete_xarray_after_save: bool = True,
**kwargs,
) -> list[Path]:
start = start or self.start_index
Expand Down Expand Up @@ -312,6 +297,7 @@ def crop_projection(
**kwargs,
) -> Path | T_Dataset:
"""Crop a projection to `region` geometry."""
console.log(f"Preparing to crop `_reprojected_paths` index {index} from {self}")
try:
assert hasattr(self, "_reprojected_paths")
except AssertionError:
Expand All @@ -333,6 +319,7 @@ def crop_projection(
path.mkdir(exist_ok=True, parents=True)
resampled_xr: Dataset = self._reprojected_paths[index]

console.log(f"From {self} crop {resampled_xr}")
cropped: Dataset = crop_xarray(
xr_time_series=resampled_xr,
crop_box=RegionOptions.bounding_box(self.crop_region),
Expand Down Expand Up @@ -437,15 +424,17 @@ def set_cpm_for_coord_alignment(self) -> None:
def to_reprojection(
self,
index: int = 0,
# override_export_path: Path | None = None,
override_export_path: Path | None = None,
return_results: bool = False,
source_to_index: Sequence | None = None,
) -> Path | T_Dataset:
source_path: Path = self._get_source_path(
index=index, source_to_index=source_to_index
)
path: PathLike = self.output_path
console.log(f"Setting 'cpm_for_coord_alignment' for {self}")
self.set_cpm_for_coord_alignment()
console.log(f"Set 'cpm_for_coord_alignment' for {self}")
return apply_geo_func(
source_path=source_path,
func=self._resample_func,
Expand Down Expand Up @@ -497,7 +486,7 @@ class CPMResampler(ResamblerBase):
>>> resample_test_cpm_output_path: Path = getfixture(
... 'resample_test_cpm_output_path')
>>> cpm_resampler: CPMResampler = CPMResampler(
... input_path=REPROJECTED_CPM_TASMAX_05_LATEST_INPUT_PATH,
... input_path=RAW_CPM_TASMAX_PATH,
... output_path=resample_test_cpm_output_path,
... input_file_extension=TIF_EXTENSION_STR,
... )
Expand All @@ -518,6 +507,7 @@ class CPMResampler(ResamblerBase):
crop_path: PathLike = RESAMPLING_OUTPUT_PATH / CPM_CROP_OUTPUT_LOCAL_PATH
input_file_x_column_name: str = CPM_RAW_X_COLUMN_NAME
input_file_y_column_name: str = CPM_RAW_Y_COLUMN_NAME
prior_time_series: PathLike | Dataset | None = None
_resample_func: ReprojectFuncType = cpm_reproject_with_standard_calendar

@property
Expand All @@ -527,14 +517,15 @@ def cpm_variable_name(self) -> str:
def to_reprojection(
self,
index: int = 0,
# override_export_path: Path | None = None,
override_export_path: Path | None = None,
return_results: bool = False,
source_to_index: Sequence | None = None,
) -> Path | T_Dataset:
source_path: Path = self._get_source_path(
index=index, source_to_index=source_to_index
)
path: PathLike = self.output_path
console.log(f"Reprojecting index CPM {index}...")
result: Path | T_Dataset | GDALDataset = apply_geo_func(
source_path=source_path,
func=self._resample_func,
Expand All @@ -544,6 +535,7 @@ def to_reprojection(
variable_name=self.cpm_variable_name,
return_results=return_results,
)
console.log(f"Completed index CPM {index}...")
if isinstance(result, PathLike):
if not hasattr(self, "_reprojected_paths"):
self._reprojected_paths: list[Path] = []
Expand Down Expand Up @@ -622,8 +614,8 @@ def input_folder(self) -> Path | None:
@property
def resample_folder(self) -> Path | None:
"""Return `self._output_path` set by `set_resample_paths()`."""
if hasattr(self, "_input_path"):
return Path(self._input_path)
if hasattr(self, "_output_path"):
return Path(self._output_path)
else:
return None

Expand Down Expand Up @@ -951,7 +943,7 @@ def _gen_crop_folder_paths(
)
for var in self.variables:
for region in self.crop_regions:
crop_path = Path(path) / "hads" / var / region
crop_path = Path(path) / "hads" / region / var
if append_cropped_path_dict:
self._cropped_path_dict[crop_path] = region
yield crop_path
Expand Down Expand Up @@ -1123,12 +1115,12 @@ def _gen_crop_folder_paths(
crop_path: Path = (
Path(path)
/ "cpm"
/ VariableOptions.cpm_value(var)
/ region
/ VariableOptions.cpm_value(var)
/ run_type
)
else:
crop_path: Path = Path(path) / "cpm" / var / region / run_type
crop_path: Path = Path(path) / "cpm" / region / var / run_type
if append_cropped_path_dict:
self._cropped_path_dict[crop_path] = region
yield crop_path

0 comments on commit 9b21737

Please sign in to comment.