diff --git a/cset-workflow/includes/plot_spatial_surface_model_field.cylc b/cset-workflow/includes/plot_spatial_surface_model_field.cylc index aa4bc11db..7b5c43bec 100644 --- a/cset-workflow/includes/plot_spatial_surface_model_field.cylc +++ b/cset-workflow/includes/plot_spatial_surface_model_field.cylc @@ -1,4 +1,5 @@ {% if PLOT_SPATIAL_SURFACE_MODEL_FIELD %} +{% for model in models %} {% for model_field in SURFACE_MODEL_FIELDS %} [runtime] [[generic_spatial_plot_time_series_{{model_field}}]] @@ -6,5 +7,6 @@ [[[environment]]] CSET_RECIPE_NAME = "generic_surface_spatial_plot_sequence.yaml" CSET_ADDOPTS = "--VARNAME={{model_field}}" + MODEL_NUMBER = {{model["number"]}} {% endfor %} {% endif %} diff --git a/src/CSET/__init__.py b/src/CSET/__init__.py index 90e256ae2..9dba30e48 100644 --- a/src/CSET/__init__.py +++ b/src/CSET/__init__.py @@ -50,6 +50,7 @@ def main(): "-i", "--input-dir", type=Path, + required=True, help="directory containing input data", ) parser_bake.add_argument( @@ -66,13 +67,6 @@ def main(): required=True, help="recipe file to read", ) - bake_step_control = parser_bake.add_mutually_exclusive_group() - bake_step_control.add_argument( - "--parallel-only", action="store_true", help="only run parallel steps" - ) - bake_step_control.add_argument( - "--collate-only", action="store_true", help="only run collation steps" - ) parser_bake.add_argument( "-s", "--style-file", type=Path, help="colour bar definition to use" ) @@ -194,24 +188,16 @@ def calculate_loglevel(args) -> int: def _bake_command(args, unparsed_args): from CSET._common import parse_variable_options - from CSET.operators import execute_recipe_collate, execute_recipe_parallel + from CSET.operators import execute_recipe recipe_variables = parse_variable_options(unparsed_args) - if not args.collate_only: - # Input dir is needed for parallel steps, but not collate steps. - if not args.input_dir: - raise ArgumentError("the following arguments are required: -i/--input-dir") - execute_recipe_parallel( - args.recipe, - args.input_dir, - args.output_dir, - recipe_variables, - args.style_file, - ) - if not args.parallel_only: - execute_recipe_collate( - args.recipe, args.output_dir, recipe_variables, args.style_file - ) + execute_recipe( + args.recipe, + args.input_dir, + args.output_dir, + recipe_variables, + args.style_file, + ) def _graph_command(args, unparsed_args): diff --git a/src/CSET/_common.py b/src/CSET/_common.py index 1a75bb59e..a66f5a36e 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -72,13 +72,14 @@ def parse_recipe(recipe_yaml: Union[Path, str], variables: dict = None): except ruamel.yaml.parser.ParserError as err: raise ValueError("ParserError: Invalid YAML") from err - logging.debug(recipe) + logging.debug("Recipe before templating:\n%s", recipe) check_recipe_has_steps(recipe) if variables is not None: logging.debug("Recipe variables: %s", variables) recipe = template_variables(recipe, variables) + logging.debug("Recipe after templating:\n%s", recipe) return recipe @@ -103,16 +104,15 @@ def check_recipe_has_steps(recipe: dict): KeyError If needed recipe variables are not supplied. """ - parallel_steps_key = "parallel" if not isinstance(recipe, dict): raise TypeError("Recipe must contain a mapping.") - if "parallel" not in recipe: - raise ValueError("Recipe must contain a 'parallel' key.") + if "steps" not in recipe: + raise ValueError("Recipe must contain a 'steps' key.") try: - if len(recipe[parallel_steps_key]) < 1: - raise ValueError("Recipe must have at least 1 parallel step.") + if len(recipe["steps"]) < 1: + raise ValueError("Recipe must have at least 1 step.") except TypeError as err: - raise ValueError("'parallel' key must contain a sequence of steps.") from err + raise ValueError("'steps' key must contain a sequence of steps.") from err def slugify(s: str) -> str: diff --git a/src/CSET/_workflow_utils/run_cset_recipe.py b/src/CSET/_workflow_utils/run_cset_recipe.py index 223fafd00..aec95df44 100755 --- a/src/CSET/_workflow_utils/run_cset_recipe.py +++ b/src/CSET/_workflow_utils/run_cset_recipe.py @@ -105,7 +105,8 @@ def recipe_id(): ) sys.exit(1) id = p.stdout.decode(sys.stdout.encoding).strip() - return id + model_number = os.environ["MODEL_NUMBER"] + return f"m{model_number}_{id}" def output_directory(): @@ -118,7 +119,8 @@ def data_directory(): """Get the input data directory for the cycle.""" share_directory = os.environ["CYLC_WORKFLOW_SHARE_DIR"] cycle_point = os.environ["CYLC_TASK_CYCLE_POINT"] - return f"{share_directory}/cycle/{cycle_point}/data" + model_number = os.environ["MODEL_NUMBER"] + return f"{share_directory}/cycle/{cycle_point}/data/{model_number}" def create_diagnostic_archive(output_directory): @@ -146,30 +148,7 @@ def add_to_diagnostic_index(output_directory, recipe_id): append_to_index({category: {recipe_id: title}}) -def parallel(): - """Process raw data in parallel.""" - logging.info("Pre-processing data into intermediate form.") - try: - subprocess.run( - ( - "cset", - "-v", - "bake", - f"--recipe={recipe_file()}", - f"--input-dir={data_directory()}", - f"--output-dir={output_directory()}", - f"--style-file={os.getenv('COLORBAR_FILE', '')}", - "--parallel-only", - ), - check=True, - env=subprocess_env(), - ) - except subprocess.CalledProcessError: - logging.error("cset bake exited non-zero while processing.") - sys.exit(1) - - -def collate(): +def run_recipe_steps(): """Collate processed data together and produce output plot. If the intermediate directory doesn't exist then we are running a simple @@ -186,25 +165,20 @@ def collate(): "-v", "bake", f"--recipe={recipe_file()}", + f"--input-dir={data_directory()}", f"--output-dir={output_directory()}", f"--style-file={os.getenv('COLORBAR_FILE', '')}", - "--collate-only", ), check=True, env=subprocess_env(), ) except subprocess.CalledProcessError: logging.error("cset bake exited non-zero while collating.") - sys.exit(1) + raise create_diagnostic_archive(output_directory()) add_to_diagnostic_index(output_directory(), recipe_id()) def run(): """Run workflow script.""" - # Check if we are running in parallel or collate mode. - bake_mode = os.getenv("CSET_BAKE_MODE") - if bake_mode == "parallel": - parallel() - elif bake_mode == "collate": - collate() + run_recipe_steps() diff --git a/src/CSET/operators/__init__.py b/src/CSET/operators/__init__.py index cd473f176..65b419dfe 100644 --- a/src/CSET/operators/__init__.py +++ b/src/CSET/operators/__init__.py @@ -145,8 +145,8 @@ def _step_parser(step: dict, step_input: any) -> str: def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path): """Execute the steps in a recipe.""" original_working_directory = Path.cwd() - os.chdir(output_directory) try: + os.chdir(output_directory) logger = logging.getLogger() diagnostic_log = logging.FileHandler( filename="CSET.log", mode="w", encoding="UTF-8" @@ -168,14 +168,14 @@ def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Pa os.chdir(original_working_directory) -def execute_recipe_parallel( +def execute_recipe( recipe_yaml: Union[Path, str], input_directory: Path, output_directory: Path, recipe_variables: dict = None, style_file: Path = None, ) -> None: - """Parse and executes the parallel steps from a recipe file. + """Parse and executes the steps from a recipe file. Parameters ---------- @@ -202,51 +202,13 @@ def execute_recipe_parallel( TypeError The provided recipe is not a stream or Path. """ - if recipe_variables is None: - recipe_variables = {} recipe = parse_recipe(recipe_yaml, recipe_variables) step_input = Path(input_directory).absolute() - # Create output directory, and an inter-cycle intermediate directory. + # Create output directory. try: - (output_directory / "intermediate").mkdir(parents=True, exist_ok=True) + output_directory.mkdir(parents=True, exist_ok=True) except (FileExistsError, NotADirectoryError) as err: logging.error("Output directory is a file. %s", output_directory) raise err - steps = recipe["parallel"] + steps = recipe["steps"] _run_steps(recipe, steps, step_input, output_directory, style_file) - - -def execute_recipe_collate( - recipe_yaml: Union[Path, str], - output_directory: Path, - recipe_variables: dict = None, - style_file: Path = None, -) -> None: - """Parse and execute the collation steps from a recipe file. - - Parameters - ---------- - recipe_yaml: Path or str - Path to a file containing, or string of, a recipe's YAML describing the - operators that need running. If a Path is provided it is opened and - read. - output_directory: Path - Pathlike indicating desired location of output. Must already exist. - recipe_variables: dict - Dictionary of variables for the recipe. - - Raises - ------ - ValueError - The recipe is not well formed. - TypeError - The provided recipe is not a stream or Path. - """ - if recipe_variables is None: - recipe_variables = {} - output_directory = Path(output_directory).resolve() - assert output_directory.is_dir() - recipe = parse_recipe(recipe_yaml, recipe_variables) - # If collate doesn't exist treat it as having no steps. - steps = recipe.get("collate", []) - _run_steps(recipe, steps, output_directory, output_directory, style_file) diff --git a/src/CSET/recipes/generic_surface_spatial_plot_sequence.yaml b/src/CSET/recipes/generic_surface_spatial_plot_sequence.yaml index 0c9311852..43fb2354b 100644 --- a/src/CSET/recipes/generic_surface_spatial_plot_sequence.yaml +++ b/src/CSET/recipes/generic_surface_spatial_plot_sequence.yaml @@ -2,7 +2,7 @@ category: Quick Look title: Surface $VARNAME description: Extracts and plots the surface $VARNAME from a file. -parallel: +steps: - operator: read.read_cube constraint: operator: constraints.combine_constraints @@ -16,16 +16,6 @@ parallel: operator: constraints.generate_level_constraint coordinate: "pressure" levels: [] - validity_time_constraint: - operator: constraints.generate_time_constraint - time_start: $VALIDITY_TIME - - - operator: write.write_cube_to_nc - filename: intermediate/surface_field - -collate: - - operator: read.read_cube - filename_pattern: intermediate/*.nc - operator: plot.spatial_contour_plot sequence_coordinate: time