Skip to content

Commit

Permalink
Add stdin for parameters (#501)
Browse files Browse the repository at this point in the history
* add stdin for parameters

* add filter to stdout for other modules

* add tests


---------

Co-authored-by: anikaweinmann <[email protected]>
  • Loading branch information
anikaweinmann and anikaweinmann authored Feb 23, 2024
1 parent 5371cb4 commit 19cc57f
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 15 deletions.
99 changes: 88 additions & 11 deletions src/actinia_core/core/common/process_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# performance processing of geographical data that uses GRASS GIS for
# computational tasks. For details, see https://actinia.mundialis.de/
#
# Copyright (c) 2016-2021 Sören Gebbert and mundialis GmbH & Co. KG
# Copyright (c) 2016-2024 Sören Gebbert and mundialis GmbH & Co. KG
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -56,11 +56,18 @@
" Anika Weinmann"
)
__copyright__ = (
"Copyright 2016-2022, Sören Gebbert and mundialis GmbH & Co. KG"
"Copyright 2016-2024, Sören Gebbert and mundialis GmbH & Co. KG"
)
__maintainer__ = "mundialis"


def get_param_stdin_part(text):
"""Function to get method and filter from parameter value"""
for delimiter in ["::", " ", "+", "-", "*", ":", "(", ")"]:
text = text.split(delimiter, 1)[0]
return text


class ProcessChainConverter(object):
"""
Convert the process chain description into a process list that can be
Expand Down Expand Up @@ -143,6 +150,7 @@ def __init__(
self.webhook_finished = None
self.webhook_update = None
self.webhook_auth = None
self.stdin_num = 0

def process_chain_to_process_list(self, process_chain):
if not process_chain:
Expand Down Expand Up @@ -614,6 +622,7 @@ def _create_module_process(self, module_descr):
if self.message_logger:
self.message_logger.info(str(module_descr))
params = []
param_stdin_funcs = {}

if "id" not in module_descr:
raise AsyncProcessError(
Expand All @@ -640,12 +649,12 @@ def _create_module_process(self, module_descr):

if "inputs" in module_descr:
self._add_grass_module_input_parameter_to_list(
module_descr, params, id
module_descr, params, param_stdin_funcs, id
)

if "outputs" in module_descr:
self._add_grass_module_output_parameter_to_list(
module_descr, params, id, module_name
module_descr, params, param_stdin_funcs, id, module_name
)

if "flags" in module_descr:
Expand Down Expand Up @@ -688,6 +697,7 @@ def _create_module_process(self, module_descr):
executable=module_name,
executable_params=params,
stdin_source=stdin_func,
param_stdin_sources=param_stdin_funcs,
id=id,
)

Expand All @@ -714,16 +724,63 @@ def _create_stdin_process(self, module_descr, id):
"The stdin option in id %s misses the ::" % str(id)
)
object_id, method = module_descr["stdin"].split("::")
if "stdout" == method is True:
stdin_func = self.process_dict[object_id].stdout
elif "stderr" == method is True:
stdin_func = self.process_dict[object_id].stderr
if "stdout" == method:
stdin_func = self.process_dict[object_id].get_stdout
elif "stderr" == method:
stdin_func = self.process_dict[object_id].get_stderr
else:
raise AsyncProcessError(
"The stdout or stderr flag in id %s is missing" % str(id)
)
return stdin_func

def _create_param_stdin_process(self, param_stdin_funcs, param_val, param):
"""Helper methods to create parameter stdin process.
Args:
module_descr (dict): The module description
param_stdin_funcs(dict): The dictionary with the stdout/stderr
functions
param_val(str): The value of parameter of the module
param(str): The parameter name of the module
Returns:
stdin_func(Process): An object of type Process that
contains the module name, the
parameter list and stdin definitions
filter(str): A string to filter stdout e.g. "max" for r.univar
"""
for mod in self.process_dict:
p_splitted = param_val.split(f"{mod}::")
p_len = len(p_splitted)
for i in range(1, p_len):
object_id = mod
method = get_param_stdin_part(p_splitted[i])
rest_str = p_splitted[i].replace(method, "")
filter = None
if rest_str.startswith("::"):
filter = get_param_stdin_part(rest_str)

if "stdout" == method:
stdin_func = self.process_dict[object_id].get_stdout
elif "stderr" == method:
stdin_func = self.process_dict[object_id].get_stderr
else:
raise AsyncProcessError(
f"The stdout or stderr flag in id {id} is missing"
)
func_str = f"{object_id}::{method}"
func_name = f"PARAM_STDIN_FUNC_{self.stdin_num}"
if filter:
func_str += f"::{filter}"
param += f"::{filter}"

param_stdin_funcs[self.stdin_num] = stdin_func
param_val = param_val.replace(func_str, func_name)
self.stdin_num += 1

return param_val

def _create_exec_process(self, module_descr):
"""Analyse a grass process description dict and create a Process
that is used to execute a common Linux binary.
Expand Down Expand Up @@ -770,7 +827,6 @@ def _create_exec_process(self, module_descr):
executable = module_descr["exe"]

params = []

if "params" in module_descr:
for search_string in module_descr["params"]:
# Search for file identifiers and generate the temporary file
Expand Down Expand Up @@ -1166,7 +1222,7 @@ def _create_exec_process_legacy(self, id, module_descr):
return p

def _add_grass_module_input_parameter_to_list(
self, module_descr, params, id
self, module_descr, params, param_stdin_funcs, id
):
"""Helper method to set the input parameters of a grass module and add
them to the params list.
Expand All @@ -1176,6 +1232,8 @@ def _add_grass_module_input_parameter_to_list(
params (list): The list of the grass module inputs parameters with
param=value entries (here the input parameter are
added)
param_stdin_funcs (dict): The dictonary with the stdin parameter
functions
id (str): The id of this process in the process chain
"""
if isinstance(module_descr["inputs"], list) is False:
Expand Down Expand Up @@ -1210,6 +1268,14 @@ def _add_grass_module_input_parameter_to_list(
file_id
] = self.generate_temp_file_path()
param = "%s=%s" % (param, self.temporary_pc_files[file_id])
elif "::" in value and value.split("::")[1] in [
"stdout",
"stderr",
]:
param_val = self._create_param_stdin_process(
param_stdin_funcs, value, param
)
param = f"{param}={param_val}"
else:
param = "%s=%s" % (param, value)
# Check for mapset in input name and append it,
Expand Down Expand Up @@ -1255,7 +1321,7 @@ def _add_grass_module_input_parameter_to_list(
params.append(param)

def _add_grass_module_output_parameter_to_list(
self, module_descr, params, id, module_name
self, module_descr, params, param_stdin_funcs, id, module_name
):
"""Helper method to set the output parameters of a grass module and add
them to the params list. If export is in the output parameter the
Expand All @@ -1266,6 +1332,8 @@ def _add_grass_module_output_parameter_to_list(
params (list): The list of the grass module parameters with
param=value entries (here the output parameter are
added)
param_stdin_funcs (dict): The dictonary with the stdin parameter
functions
id (str): The id of this process in the process chain
module_name (str): The name of the grass module
Expand Down Expand Up @@ -1341,6 +1409,15 @@ def _add_grass_module_output_parameter_to_list(
file_id,
output["export"]["format"].lower(),
)
elif "::" in value and value.split("::")[1] in [
"stdout",
"stderr",
]:
id = module_descr["id"]
param_val = self._create_param_stdin_process(
param_stdin_funcs, value, param
)
param = f"{param}={param_val}"
else:
param = "%s=%s" % (param, value)
params.append(param)
Expand Down
4 changes: 4 additions & 0 deletions src/actinia_core/core/common/process_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
executable,
executable_params,
stdin_source=None,
param_stdin_sources=None,
skip_permission_check=False,
id=None,
):
Expand All @@ -58,6 +59,8 @@ def __init__(
for the executable
stdin_source (str): The get_stdout or get_stderr method of a
Process
param_stdin_sources (dict): The get_stdout or get_stderr methods of
a process parameter
skip_permission_check(boolean): Skip permission check for the
module or executable, this is
meaningful for internal process
Expand All @@ -75,6 +78,7 @@ def __init__(

self.executable_params = executable_params
self.stdin_source = stdin_source
self.param_stdin_sources = param_stdin_sources
self.stdout = None
self.stderr = None
self.skip_permission_check = skip_permission_check
Expand Down
6 changes: 5 additions & 1 deletion src/actinia_core/models/process_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,11 @@ class Executable(Schema):
"params": {
"type": "array",
"items": {"type": "string"},
"description": "A list of input parameters of a GRASS GIS module.",
"description": "A list of input parameters of a GRASS GIS module."
"By setting module_id::stdout(::filter) the stdout of another "
"module can be used as input for the current module. E.g. "
"'r_univar_module_id::stdout::max' can be used to do a rescaling "
"of a raster.",
},
"stdin": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# performance processing of geographical data that uses GRASS GIS for
# computational tasks. For details, see https://actinia.mundialis.de/
#
# Copyright (c) 2016-2023 Sören Gebbert and mundialis GmbH & Co. KG
# Copyright (c) 2016-2024 Sören Gebbert and mundialis GmbH & Co. KG
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -47,7 +47,10 @@
from actinia_core.core.redis_lock import RedisLockingInterface
from actinia_core.core.resources_logger import ResourceLogger
from actinia_core.core.mapset_merge_utils import change_mapsetname
from actinia_core.core.common.process_chain import ProcessChainConverter
from actinia_core.core.common.process_chain import (
get_param_stdin_part,
ProcessChainConverter,
)
from actinia_core.core.common.exceptions import (
AsyncProcessError,
AsyncProcessTermination,
Expand All @@ -71,7 +74,7 @@
__license__ = "GPLv3"
__author__ = "Sören Gebbert, Anika Weinmann, Lina Krisztian"
__copyright__ = (
"Copyright 2016-2023, Sören Gebbert and mundialis GmbH & Co. KG"
"Copyright 2016-2024, Sören Gebbert and mundialis GmbH & Co. KG"
)
__maintainer__ = "mundialis GmbH & Co. KG"

Expand Down Expand Up @@ -1685,6 +1688,42 @@ def _run_executable(self, process, poll_time=0.005):
)
stdin_file = None

if process.param_stdin_sources:
for num, func in process.param_stdin_sources.items():
func_name = f"PARAM_STDIN_FUNC_{num}"
for i in range(len(process.executable_params)):
param = process.executable_params[i]
if func_name in param:
par, val = param.split("=", 1)
par_val = func().strip()
val_splitted = val.split(func_name)
for j in range(1, len(val_splitted)):
filtered_par_value = par_val
filtered_func_name = func_name
# filter stdout/stderr
if "::" in val_splitted[j]:
filter = get_param_stdin_part(
val_splitted[j][2:]
)
if "=" not in par_val:
raise AsyncProcessError(
"Error while running executable "
f"<{process.executable}>: <{filter}> "
"cannot be selected. Maybe you have to "
"set the '-g' flag for the stdout/stderr "
"module."
)
filtered_par_value = {
x.split("=")[0]: x.split("=")[1]
for x in par_val.split()
}[filter]
filtered_func_name += f"::{filter}"
process.executable_params[
i
] = process.executable_params[i].replace(
filtered_func_name, filtered_par_value
)

if process.stdin_source is not None:
tmp_file = self.proc_chain_converter.generate_temp_file_path()
stdin_file = open(tmp_file, "w")
Expand Down
Loading

0 comments on commit 19cc57f

Please sign in to comment.