From 67c9ca903c0f346c6b706ef9f7394d577df01585 Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Tue, 21 Nov 2023 20:47:28 +0100 Subject: [PATCH] Refactor: PEtab code to sub-package (#2205) Move PEtab-related code to a new `amici.petab` subpackage. An attempt to organize the growing codebase a bit better. More to be done. Unless anybody is using private functions, this should not break anything. More parts to be moved separately. No change in functionality. --- python/sdist/amici/__init__.py | 3 +- python/sdist/amici/parameter_mapping.py | 474 +----------- python/sdist/amici/petab/__init__.py | 5 + python/sdist/amici/petab/cli/__init__.py | 0 python/sdist/amici/petab/cli/import_petab.py | 160 ++++ python/sdist/amici/petab/conditions.py | 515 +++++++++++++ python/sdist/amici/petab/parameter_mapping.py | 696 +++++++++++++++++ python/sdist/amici/petab/util.py | 103 +++ python/sdist/amici/petab_import.py | 160 +--- python/sdist/amici/petab_import_pysb.py | 3 +- python/sdist/amici/petab_objective.py | 729 +----------------- python/sdist/amici/petab_util.py | 110 +-- python/sdist/setup.cfg | 4 +- 13 files changed, 1525 insertions(+), 1437 deletions(-) create mode 100644 python/sdist/amici/petab/__init__.py create mode 100644 python/sdist/amici/petab/cli/__init__.py create mode 100644 python/sdist/amici/petab/cli/import_petab.py create mode 100644 python/sdist/amici/petab/conditions.py create mode 100644 python/sdist/amici/petab/parameter_mapping.py create mode 100644 python/sdist/amici/petab/util.py diff --git a/python/sdist/amici/__init__.py b/python/sdist/amici/__init__.py index bcb7387fbf..4ee6f05637 100644 --- a/python/sdist/amici/__init__.py +++ b/python/sdist/amici/__init__.py @@ -6,7 +6,6 @@ models and turning them into C++ Python extensions. """ - import contextlib import importlib import os @@ -132,6 +131,8 @@ def get_model(self) -> amici.Model: """Create a model instance.""" ... + AmiciModel = Union[amici.Model, amici.ModelPtr] + class add_path: """Context manager for temporarily changing PYTHONPATH""" diff --git a/python/sdist/amici/parameter_mapping.py b/python/sdist/amici/parameter_mapping.py index f1cf75a150..dd961f43c1 100644 --- a/python/sdist/amici/parameter_mapping.py +++ b/python/sdist/amici/parameter_mapping.py @@ -1,457 +1,17 @@ -""" -Parameter mapping ------------------ - -When performing parameter inference, often parameters need to be mapped from -simulation to estimation parameters, and parameters can differ between -conditions. This can be handled using the `ParameterMapping`. - -Note -~~~~ - -While the parameter mapping can be used directly with AMICI, it was developed -for usage together with PEtab, for which the whole workflow of generating -the mapping is automatized. -""" -from __future__ import annotations - -import numbers -import warnings -from collections.abc import Sequence -from itertools import chain -from typing import Any, Dict, List, Set, Union - -import amici -import numpy as np -from petab.C import * # noqa: F403 - -SingleParameterMapping = Dict[str, Union[numbers.Number, str]] -SingleScaleMapping = Dict[str, str] -AmiciModel = Union[amici.Model, amici.ModelPtr] - - -class ParameterMappingForCondition: - """Parameter mapping for condition. - - Contains mappings for free parameters, fixed parameters, and fixed - preequilibration parameters, both for parameters and scales. - - In the scale mappings, for each simulation parameter the scale - on which the value is passed (and potentially gradients are to be - returned) is given. In the parameter mappings, for each simulation - parameter a corresponding optimization parameter (or a numeric value) - is given. - - If a mapping is not passed, the parameter mappings are assumed to be empty, - and if a scale mapping is not passed, all scales are set to linear. - - :param map_sim_var: - Mapping for free simulation parameters. - :param scale_map_sim_var: - Scales for free simulation parameters. - :param map_preeq_fix: - Mapping for fixed preequilibration parameters. - :param scale_map_preeq_fix: - Scales for fixed preequilibration parameters. - :param map_sim_fix: - Mapping for fixed simulation parameters. - :param scale_map_sim_fix: - Scales for fixed simulation parameters. - """ - - def __init__( - self, - map_sim_var: SingleParameterMapping = None, - scale_map_sim_var: SingleScaleMapping = None, - map_preeq_fix: SingleParameterMapping = None, - scale_map_preeq_fix: SingleScaleMapping = None, - map_sim_fix: SingleParameterMapping = None, - scale_map_sim_fix: SingleScaleMapping = None, - ): - if map_sim_var is None: - map_sim_var = {} - self.map_sim_var = map_sim_var - - if scale_map_sim_var is None: - scale_map_sim_var = {key: LIN for key in map_sim_var} - self.scale_map_sim_var = scale_map_sim_var - - if map_preeq_fix is None: - map_preeq_fix = {} - self.map_preeq_fix = map_preeq_fix - - if scale_map_preeq_fix is None: - scale_map_preeq_fix = {key: LIN for key in map_preeq_fix} - self.scale_map_preeq_fix = scale_map_preeq_fix - - if map_sim_fix is None: - map_sim_fix = {} - self.map_sim_fix = map_sim_fix - - if scale_map_sim_fix is None: - scale_map_sim_fix = {key: LIN for key in map_sim_fix} - self.scale_map_sim_fix = scale_map_sim_fix - - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"map_sim_var={repr(self.map_sim_var)}," - f"scale_map_sim_var={repr(self.scale_map_sim_var)}," - f"map_preeq_fix={repr(self.map_preeq_fix)}," - f"scale_map_preeq_fix={repr(self.scale_map_preeq_fix)}," - f"map_sim_fix={repr(self.map_sim_fix)}," - f"scale_map_sim_fix={repr(self.scale_map_sim_fix)})" - ) - - @property - def free_symbols(self) -> Set[str]: - """Get IDs of all (symbolic) parameters present in this mapping""" - return { - p - for p in chain( - self.map_sim_var.values(), - self.map_preeq_fix.values(), - self.map_sim_fix.values(), - ) - if isinstance(p, str) - } - - -class ParameterMapping(Sequence): - r"""Parameter mapping for multiple conditions. - - This can be used like a list of :class:`ParameterMappingForCondition`\ s. - - :param parameter_mappings: - List of parameter mappings for specific conditions. - """ - - def __init__( - self, parameter_mappings: List[ParameterMappingForCondition] = None - ): - super().__init__() - if parameter_mappings is None: - parameter_mappings = [] - self.parameter_mappings = parameter_mappings - - def __iter__(self): - yield from self.parameter_mappings - - def __getitem__( - self, item - ) -> Union[ParameterMapping, ParameterMappingForCondition]: - result = self.parameter_mappings[item] - if isinstance(result, ParameterMappingForCondition): - return result - return ParameterMapping(result) - - def __len__(self): - return len(self.parameter_mappings) - - def append( - self, parameter_mapping_for_condition: ParameterMappingForCondition - ): - """Append a condition specific parameter mapping.""" - self.parameter_mappings.append(parameter_mapping_for_condition) - - def __repr__(self): - return f"{self.__class__.__name__}({repr(self.parameter_mappings)})" - - @property - def free_symbols(self) -> Set[str]: - """Get IDs of all (symbolic) parameters present in this mapping""" - return set.union(*(mapping.free_symbols for mapping in self)) - - -def fill_in_parameters( - edatas: List[amici.ExpData], - problem_parameters: Dict[str, numbers.Number], - scaled_parameters: bool, - parameter_mapping: ParameterMapping, - amici_model: AmiciModel, -) -> None: - """Fill fixed and dynamic parameters into the edatas (in-place). - - :param edatas: - List of experimental datas :class:`amici.amici.ExpData` with - everything except parameters filled. - :param problem_parameters: - Problem parameters as parameterId=>value dict. Only - parameters included here will be set. Remaining parameters will - be used as currently set in `amici_model`. - :param scaled_parameters: - If True, problem_parameters are assumed to be on the scale provided - in the parameter mapping. If False, they are assumed - to be in linear scale. - :param parameter_mapping: - Parameter mapping for all conditions. - :param amici_model: - AMICI model. - """ - if unused_parameters := ( - set(problem_parameters.keys()) - parameter_mapping.free_symbols - ): - warnings.warn( - "The following problem parameters were not used: " - + str(unused_parameters), - RuntimeWarning, - ) - - for edata, mapping_for_condition in zip(edatas, parameter_mapping): - fill_in_parameters_for_condition( - edata, - problem_parameters, - scaled_parameters, - mapping_for_condition, - amici_model, - ) - - -def fill_in_parameters_for_condition( - edata: amici.ExpData, - problem_parameters: Dict[str, numbers.Number], - scaled_parameters: bool, - parameter_mapping: ParameterMappingForCondition, - amici_model: AmiciModel, -) -> None: - """Fill fixed and dynamic parameters into the edata for condition - (in-place). - - :param edata: - Experimental data object to fill parameters into. - :param problem_parameters: - Problem parameters as parameterId=>value dict. Only - parameters included here will be set. Remaining parameters will - be used as already set in `amici_model` and `edata`. - :param scaled_parameters: - If True, problem_parameters are assumed to be on the scale provided - in the parameter mapping. If False, they - are assumed to be in linear scale. - :param parameter_mapping: - Parameter mapping for current condition. - :param amici_model: - AMICI model - """ - map_sim_var = parameter_mapping.map_sim_var - scale_map_sim_var = parameter_mapping.scale_map_sim_var - map_preeq_fix = parameter_mapping.map_preeq_fix - scale_map_preeq_fix = parameter_mapping.scale_map_preeq_fix - map_sim_fix = parameter_mapping.map_sim_fix - scale_map_sim_fix = parameter_mapping.scale_map_sim_fix - - # Parameter mapping may contain parameter_ids as values, these *must* - # be replaced - - def _get_par(model_par, value, mapping): - """Replace parameter IDs in mapping dicts by values from - problem_parameters where necessary""" - if isinstance(value, str): - try: - # estimated parameter - return problem_parameters[value] - except KeyError: - # condition table overrides must have been handled already, - # e.g. by the PEtab parameter mapping, but parameters from - # InitialAssignments may still be present. - if mapping[value] == model_par: - # prevent infinite recursion - raise - return _get_par(value, mapping[value], mapping) - if model_par in problem_parameters: - # user-provided - return problem_parameters[model_par] - # prevent nan-propagation in derivative - if np.isnan(value): - return 0.0 - # constant value - return value - - map_preeq_fix = { - key: _get_par(key, val, map_preeq_fix) - for key, val in map_preeq_fix.items() - } - map_sim_fix = { - key: _get_par(key, val, map_sim_fix) - for key, val in map_sim_fix.items() - } - map_sim_var = { - key: _get_par(key, val, dict(map_sim_fix, **map_sim_var)) - for key, val in map_sim_var.items() - } - - # If necessary, (un)scale parameters - if scaled_parameters: - unscale_parameters_dict(map_preeq_fix, scale_map_preeq_fix) - unscale_parameters_dict(map_sim_fix, scale_map_sim_fix) - if not scaled_parameters: - # We scale all parameters to the scale they are estimated on, and pass - # that information to amici via edata.{parameters,pscale}. - # The scaling is necessary to obtain correct derivatives. - scale_parameters_dict(map_sim_var, scale_map_sim_var) - # We can skip preequilibration parameters, because they are identical - # with simulation parameters, and only the latter are used from here - # on. - - ########################################################################## - # variable parameters and parameter scale - - # parameter list from mapping dict - parameters = [ - map_sim_var[par_id] for par_id in amici_model.getParameterIds() - ] - - # scales list from mapping dict - scales = [ - petab_to_amici_scale(scale_map_sim_var[par_id]) - for par_id in amici_model.getParameterIds() - ] - - # plist - plist = [ - ip - for ip, par_id in enumerate(amici_model.getParameterIds()) - if isinstance(parameter_mapping.map_sim_var[par_id], str) - ] - - if parameters: - edata.parameters = np.asarray(parameters, dtype=float) - - if scales: - edata.pscale = amici.parameterScalingFromIntVector(scales) - - if plist: - edata.plist = plist - - ########################################################################## - # fixed parameters preequilibration - if map_preeq_fix: - fixed_pars_preeq = [ - map_preeq_fix[par_id] - for par_id in amici_model.getFixedParameterIds() - ] - edata.fixedParametersPreequilibration = fixed_pars_preeq - - ########################################################################## - # fixed parameters simulation - if map_sim_fix: - fixed_pars_sim = [ - map_sim_fix[par_id] - for par_id in amici_model.getFixedParameterIds() - ] - edata.fixedParameters = fixed_pars_sim - - -def petab_to_amici_scale(petab_scale: str) -> int: - """Convert petab scale id to amici scale id.""" - if petab_scale == LIN: - return amici.ParameterScaling_none - if petab_scale == LOG10: - return amici.ParameterScaling_log10 - if petab_scale == LOG: - return amici.ParameterScaling_ln - raise ValueError(f"PEtab scale not recognized: {petab_scale}") - - -def amici_to_petab_scale(amici_scale: int) -> str: - """Convert amici scale id to petab scale id.""" - if amici_scale == amici.ParameterScaling_none: - return LIN - if amici_scale == amici.ParameterScaling_log10: - return LOG10 - if amici_scale == amici.ParameterScaling_ln: - return LOG - raise ValueError(f"AMICI scale not recognized: {amici_scale}") - - -def scale_parameter(value: numbers.Number, petab_scale: str) -> numbers.Number: - """Bring parameter from linear scale to target scale. - - :param value: - Value to scale - :param petab_scale: - Target scale of ``value`` - - :return: - ``value`` on target scale - """ - if petab_scale == LIN: - return value - if petab_scale == LOG10: - return np.log10(value) - if petab_scale == LOG: - return np.log(value) - raise ValueError( - f"Unknown parameter scale {petab_scale}. " - f"Must be from {(LIN, LOG, LOG10)}" - ) - - -def unscale_parameter( - value: numbers.Number, petab_scale: str -) -> numbers.Number: - """Bring parameter from scale to linear scale. - - :param value: - Value to scale - :param petab_scale: - Target scale of ``value`` - - :return: - ``value`` on linear scale - """ - if petab_scale == LIN: - return value - if petab_scale == LOG10: - return np.power(10, value) - if petab_scale == LOG: - return np.exp(value) - raise ValueError( - f"Unknown parameter scale {petab_scale}. " - f"Must be from {(LIN, LOG, LOG10)}" - ) - - -def scale_parameters_dict( - value_dict: Dict[Any, numbers.Number], petab_scale_dict: Dict[Any, str] -) -> None: - """ - Bring parameters from linear scale to target scale. - - Bring values in ``value_dict`` from linear scale to the scale - provided in ``petab_scale_dict`` (in-place). - Both arguments are expected to have the same length and matching keys. - - :param value_dict: - Values to scale - - :param petab_scale_dict: - Target scales of ``values`` - """ - if value_dict.keys() != petab_scale_dict.keys(): - raise AssertionError("Keys don't match.") - - for key, value in value_dict.items(): - value_dict[key] = scale_parameter(value, petab_scale_dict[key]) - - -def unscale_parameters_dict( - value_dict: Dict[Any, numbers.Number], petab_scale_dict: Dict[Any, str] -) -> None: - """ - Bring parameters from target scale to linear scale. - - Bring values in ``value_dict`` from linear scale to the scale - provided in ``petab_scale_dict`` (in-place). - Both arguments are expected to have the same length and matching keys. - - :param value_dict: - Values to scale - - :param petab_scale_dict: - Target scales of ``values`` - """ - if value_dict.keys() != petab_scale_dict.keys(): - raise AssertionError("Keys don't match.") - - for key, value in value_dict.items(): - value_dict[key] = unscale_parameter(value, petab_scale_dict[key]) +# some extra imports for backward-compatibility +from .petab.conditions import ( # noqa # pylint: disable=unused-import + fill_in_parameters, + fill_in_parameters_for_condition, +) +from .petab.parameter_mapping import ( # noqa # pylint: disable=unused-import + ParameterMapping, + ParameterMappingForCondition, + SingleParameterMapping, + SingleScaleMapping, + amici_to_petab_scale, + petab_to_amici_scale, + scale_parameter, + scale_parameters_dict, + unscale_parameter, + unscale_parameters_dict, +) diff --git a/python/sdist/amici/petab/__init__.py b/python/sdist/amici/petab/__init__.py new file mode 100644 index 0000000000..e1ea6b4633 --- /dev/null +++ b/python/sdist/amici/petab/__init__.py @@ -0,0 +1,5 @@ +"""PEtab import related code.""" + +# ID of model parameter that is to be added to SBML model to indicate +# preequilibration +PREEQ_INDICATOR_ID = "preequilibration_indicator" diff --git a/python/sdist/amici/petab/cli/__init__.py b/python/sdist/amici/petab/cli/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/sdist/amici/petab/cli/import_petab.py b/python/sdist/amici/petab/cli/import_petab.py new file mode 100644 index 0000000000..dfa2eb0aca --- /dev/null +++ b/python/sdist/amici/petab/cli/import_petab.py @@ -0,0 +1,160 @@ +import argparse + +import petab +from amici.petab_import import import_model + + +def _parse_cli_args(): + """ + Parse command line arguments + + :return: + Parsed CLI arguments from :mod:`argparse`. + """ + parser = argparse.ArgumentParser( + description="Import PEtab-format model into AMICI." + ) + + # General options: + parser.add_argument( + "-v", + "--verbose", + dest="verbose", + action="store_true", + help="More verbose output", + ) + parser.add_argument( + "-o", + "--output-dir", + dest="model_output_dir", + help="Name of the model directory to create", + ) + parser.add_argument( + "--no-compile", + action="store_false", + dest="compile", + help="Only generate model code, do not compile", + ) + parser.add_argument( + "--no-validate", + action="store_false", + dest="validate", + help="Skip validation of PEtab files", + ) + parser.add_argument( + "--flatten", + dest="flatten", + default=False, + action="store_true", + help="Flatten measurement specific overrides of " + "observable and noise parameters", + ) + parser.add_argument( + "--no-sensitivities", + dest="generate_sensitivity_code", + default=True, + action="store_false", + help="Skip generation of sensitivity code", + ) + + # Call with set of files + parser.add_argument( + "-s", "--sbml", dest="sbml_file_name", help="SBML model filename" + ) + parser.add_argument( + "-m", + "--measurements", + dest="measurement_file_name", + help="Measurement table", + ) + parser.add_argument( + "-c", + "--conditions", + dest="condition_file_name", + help="Conditions table", + ) + parser.add_argument( + "-p", + "--parameters", + dest="parameter_file_name", + help="Parameter table", + ) + parser.add_argument( + "-b", + "--observables", + dest="observable_file_name", + help="Observable table", + ) + + parser.add_argument( + "-y", + "--yaml", + dest="yaml_file_name", + help="PEtab YAML problem filename", + ) + + parser.add_argument( + "-n", + "--model-name", + dest="model_name", + help="Name of the python module generated for the " "model", + ) + + args = parser.parse_args() + + if not args.yaml_file_name and not all( + ( + args.sbml_file_name, + args.condition_file_name, + args.observable_file_name, + ) + ): + parser.error( + "When not specifying a model name or YAML file, then " + "SBML, condition and observable file must be specified" + ) + + return args + + +def _main(): + """ + Command line interface to import a model in the PEtab + (https://github.com/PEtab-dev/PEtab/) format into AMICI. + """ + args = _parse_cli_args() + + if args.yaml_file_name: + pp = petab.Problem.from_yaml(args.yaml_file_name) + else: + pp = petab.Problem.from_files( + sbml_file=args.sbml_file_name, + condition_file=args.condition_file_name, + measurement_file=args.measurement_file_name, + parameter_file=args.parameter_file_name, + observable_files=args.observable_file_name, + ) + + # Check for valid PEtab before potentially modifying it + if args.validate: + petab.lint_problem(pp) + + if args.flatten: + petab.flatten_timepoint_specific_output_overrides(pp) + + import_model( + model_name=args.model_name, + sbml_model=pp.sbml_model, + condition_table=pp.condition_df, + observable_table=pp.observable_df, + measurement_table=pp.measurement_df, + model_output_dir=args.model_output_dir, + compile=args.compile, + generate_sensitivity_code=args.generate_sensitivity_code, + verbose=args.verbose, + validate=False, + ) + + +if __name__ == "__main__": + _main() diff --git a/python/sdist/amici/petab/conditions.py b/python/sdist/amici/petab/conditions.py new file mode 100644 index 0000000000..5b9d87e5cf --- /dev/null +++ b/python/sdist/amici/petab/conditions.py @@ -0,0 +1,515 @@ +"""PEtab conditions to AMICI ExpDatas.""" +import logging +import numbers +import warnings +from typing import Dict, Sequence, Union + +import amici +import numpy as np +import pandas as pd +import petab +from amici import AmiciModel +from petab.C import ( + MEASUREMENT, + NOISE_PARAMETERS, + OBSERVABLE_ID, + PREEQUILIBRATION_CONDITION_ID, + SIMULATION_CONDITION_ID, + TIME, +) + +from .parameter_mapping import ( + ParameterMapping, + ParameterMappingForCondition, + petab_to_amici_scale, + scale_parameters_dict, + unscale_parameters_dict, +) +from .util import get_states_in_condition_table + +logger = logging.getLogger(__name__) + +SingleParameterMapping = Dict[str, Union[numbers.Number, str]] +SingleScaleMapping = Dict[str, str] + + +def fill_in_parameters( + edatas: list[amici.ExpData], + problem_parameters: dict[str, numbers.Number], + scaled_parameters: bool, + parameter_mapping: ParameterMapping, + amici_model: AmiciModel, +) -> None: + """Fill fixed and dynamic parameters into the edatas (in-place). + + :param edatas: + List of experimental datas :class:`amici.amici.ExpData` with + everything except parameters filled. + :param problem_parameters: + Problem parameters as parameterId=>value dict. Only + parameters included here will be set. Remaining parameters will + be used as currently set in `amici_model`. + :param scaled_parameters: + If True, problem_parameters are assumed to be on the scale provided + in the parameter mapping. If False, they are assumed + to be in linear scale. + :param parameter_mapping: + Parameter mapping for all conditions. + :param amici_model: + AMICI model. + """ + if unused_parameters := ( + set(problem_parameters.keys()) - parameter_mapping.free_symbols + ): + warnings.warn( + "The following problem parameters were not used: " + + str(unused_parameters), + RuntimeWarning, + ) + + for edata, mapping_for_condition in zip(edatas, parameter_mapping): + fill_in_parameters_for_condition( + edata, + problem_parameters, + scaled_parameters, + mapping_for_condition, + amici_model, + ) + + +def fill_in_parameters_for_condition( + edata: amici.ExpData, + problem_parameters: Dict[str, numbers.Number], + scaled_parameters: bool, + parameter_mapping: ParameterMappingForCondition, + amici_model: AmiciModel, +) -> None: + """Fill fixed and dynamic parameters into the edata for condition + (in-place). + + :param edata: + Experimental data object to fill parameters into. + :param problem_parameters: + Problem parameters as parameterId=>value dict. Only + parameters included here will be set. Remaining parameters will + be used as already set in `amici_model` and `edata`. + :param scaled_parameters: + If True, problem_parameters are assumed to be on the scale provided + in the parameter mapping. If False, they + are assumed to be in linear scale. + :param parameter_mapping: + Parameter mapping for current condition. + :param amici_model: + AMICI model + """ + map_sim_var = parameter_mapping.map_sim_var + scale_map_sim_var = parameter_mapping.scale_map_sim_var + map_preeq_fix = parameter_mapping.map_preeq_fix + scale_map_preeq_fix = parameter_mapping.scale_map_preeq_fix + map_sim_fix = parameter_mapping.map_sim_fix + scale_map_sim_fix = parameter_mapping.scale_map_sim_fix + + # Parameter mapping may contain parameter_ids as values, these *must* + # be replaced + + def _get_par(model_par, value, mapping): + """Replace parameter IDs in mapping dicts by values from + problem_parameters where necessary""" + if isinstance(value, str): + try: + # estimated parameter + return problem_parameters[value] + except KeyError: + # condition table overrides must have been handled already, + # e.g. by the PEtab parameter mapping, but parameters from + # InitialAssignments may still be present. + if mapping[value] == model_par: + # prevent infinite recursion + raise + return _get_par(value, mapping[value], mapping) + if model_par in problem_parameters: + # user-provided + return problem_parameters[model_par] + # prevent nan-propagation in derivative + if np.isnan(value): + return 0.0 + # constant value + return value + + map_preeq_fix = { + key: _get_par(key, val, map_preeq_fix) + for key, val in map_preeq_fix.items() + } + map_sim_fix = { + key: _get_par(key, val, map_sim_fix) + for key, val in map_sim_fix.items() + } + map_sim_var = { + key: _get_par(key, val, dict(map_sim_fix, **map_sim_var)) + for key, val in map_sim_var.items() + } + + # If necessary, (un)scale parameters + if scaled_parameters: + unscale_parameters_dict(map_preeq_fix, scale_map_preeq_fix) + unscale_parameters_dict(map_sim_fix, scale_map_sim_fix) + if not scaled_parameters: + # We scale all parameters to the scale they are estimated on, and pass + # that information to amici via edata.{parameters,pscale}. + # The scaling is necessary to obtain correct derivatives. + scale_parameters_dict(map_sim_var, scale_map_sim_var) + # We can skip preequilibration parameters, because they are identical + # with simulation parameters, and only the latter are used from here + # on. + + ########################################################################## + # variable parameters and parameter scale + + # parameter list from mapping dict + parameters = [ + map_sim_var[par_id] for par_id in amici_model.getParameterIds() + ] + + # scales list from mapping dict + scales = [ + petab_to_amici_scale(scale_map_sim_var[par_id]) + for par_id in amici_model.getParameterIds() + ] + + # plist + plist = [ + ip + for ip, par_id in enumerate(amici_model.getParameterIds()) + if isinstance(parameter_mapping.map_sim_var[par_id], str) + ] + + if parameters: + edata.parameters = np.asarray(parameters, dtype=float) + + if scales: + edata.pscale = amici.parameterScalingFromIntVector(scales) + + if plist: + edata.plist = plist + + ########################################################################## + # fixed parameters preequilibration + if map_preeq_fix: + fixed_pars_preeq = [ + map_preeq_fix[par_id] + for par_id in amici_model.getFixedParameterIds() + ] + edata.fixedParametersPreequilibration = fixed_pars_preeq + + ########################################################################## + # fixed parameters simulation + if map_sim_fix: + fixed_pars_sim = [ + map_sim_fix[par_id] + for par_id in amici_model.getFixedParameterIds() + ] + edata.fixedParameters = fixed_pars_sim + + +def create_parameterized_edatas( + amici_model: AmiciModel, + petab_problem: petab.Problem, + problem_parameters: Dict[str, numbers.Number], + scaled_parameters: bool = False, + parameter_mapping: ParameterMapping = None, + simulation_conditions: Union[pd.DataFrame, Dict] = None, +) -> list[amici.ExpData]: + """Create list of :class:amici.ExpData objects with parameters filled in. + + :param amici_model: + AMICI Model assumed to be compatible with ``petab_problem``. + :param petab_problem: + PEtab problem to work on. + :param problem_parameters: + Run simulation with these parameters. If ``None``, PEtab + ``nominalValues`` will be used. To be provided as dict, mapping PEtab + problem parameters to SBML IDs. + :param scaled_parameters: + If ``True``, ``problem_parameters`` are assumed to be on the scale + provided in the PEtab parameter table and will be unscaled. + If ``False``, they are assumed to be in linear scale. + :param parameter_mapping: + Optional precomputed PEtab parameter mapping for efficiency, as + generated by :func:`create_parameter_mapping`. + :param simulation_conditions: + Result of :func:`petab.get_simulation_conditions`. Can be provided to + save time if this has been obtained before. + + :return: + List with one :class:`amici.amici.ExpData` per simulation condition, + with filled in timepoints, data and parameters. + """ + # number of amici simulations will be number of unique + # (preequilibrationConditionId, simulationConditionId) pairs. + # Can be optimized by checking for identical condition vectors. + if simulation_conditions is None: + simulation_conditions = ( + petab_problem.get_simulation_conditions_from_measurement_df() + ) + + # Get parameter mapping + if parameter_mapping is None: + from .parameter_mapping import create_parameter_mapping + + parameter_mapping = create_parameter_mapping( + petab_problem=petab_problem, + simulation_conditions=simulation_conditions, + scaled_parameters=scaled_parameters, + amici_model=amici_model, + ) + + # Generate ExpData with all condition-specific information + edatas = create_edatas( + amici_model=amici_model, + petab_problem=petab_problem, + simulation_conditions=simulation_conditions, + ) + + # Fill parameters in ExpDatas (in-place) + fill_in_parameters( + edatas=edatas, + problem_parameters=problem_parameters, + scaled_parameters=scaled_parameters, + parameter_mapping=parameter_mapping, + amici_model=amici_model, + ) + + return edatas + + +def create_edata_for_condition( + condition: Union[Dict, pd.Series], + measurement_df: pd.DataFrame, + amici_model: AmiciModel, + petab_problem: petab.Problem, + observable_ids: list[str], +) -> amici.ExpData: + """Get :class:`amici.amici.ExpData` for the given PEtab condition. + + Sets timepoints, observed data and sigmas. + + :param condition: + :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and + ``simulationConditionId``. + :param measurement_df: + :class:`pandas.DataFrame` with measurements for the given condition. + :param amici_model: + AMICI model + :param petab_problem: + Underlying PEtab problem + :param observable_ids: + List of observable IDs + + :return: + ExpData instance. + """ + if amici_model.nytrue != len(observable_ids): + raise AssertionError( + "Number of AMICI model observables does not " + "match number of PEtab observables." + ) + + # create an ExpData object + edata = amici.ExpData(amici_model) + edata.id = condition[SIMULATION_CONDITION_ID] + if condition.get(PREEQUILIBRATION_CONDITION_ID): + edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID) + ########################################################################## + # enable initial parameters reinitialization + + states_in_condition_table = get_states_in_condition_table( + petab_problem, condition=condition + ) + if ( + condition.get(PREEQUILIBRATION_CONDITION_ID) + and states_in_condition_table + ): + state_ids = amici_model.getStateIds() + state_idx_reinitalization = [ + state_ids.index(s) + for s, (v, v_preeq) in states_in_condition_table.items() + if not np.isnan(v) + ] + edata.reinitialization_state_idxs_sim = state_idx_reinitalization + logger.debug( + "Enabling state reinitialization for condition " + f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - " + f"{condition.get(SIMULATION_CONDITION_ID)} " + f"{states_in_condition_table}" + ) + + ########################################################################## + # timepoints + + # find replicate numbers of time points + timepoints_w_reps = _get_timepoints_with_replicates( + df_for_condition=measurement_df + ) + edata.setTimepoints(timepoints_w_reps) + + ########################################################################## + # measurements and sigmas + y, sigma_y = _get_measurements_and_sigmas( + df_for_condition=measurement_df, + timepoints_w_reps=timepoints_w_reps, + observable_ids=observable_ids, + ) + edata.setObservedData(y.flatten()) + edata.setObservedDataStdDev(sigma_y.flatten()) + + return edata + + +def create_edatas( + amici_model: AmiciModel, + petab_problem: petab.Problem, + simulation_conditions: Union[pd.DataFrame, Dict] = None, +) -> list[amici.ExpData]: + """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. + + :param amici_model: + AMICI model. + :param petab_problem: + Underlying PEtab problem. + :param simulation_conditions: + Result of :func:`petab.get_simulation_conditions`. Can be provided to + save time if this has be obtained before. + + :return: + List with one :class:`amici.amici.ExpData` per simulation condition, + with filled in timepoints and data. + """ + if simulation_conditions is None: + simulation_conditions = ( + petab_problem.get_simulation_conditions_from_measurement_df() + ) + + observable_ids = amici_model.getObservableIds() + + measurement_groupvar = [SIMULATION_CONDITION_ID] + if PREEQUILIBRATION_CONDITION_ID in simulation_conditions: + measurement_groupvar.append(petab.PREEQUILIBRATION_CONDITION_ID) + measurement_dfs = dict( + list(petab_problem.measurement_df.groupby(measurement_groupvar)) + ) + + edatas = [] + for _, condition in simulation_conditions.iterrows(): + # Create amici.ExpData for each simulation + if PREEQUILIBRATION_CONDITION_ID in condition: + measurement_index = ( + condition.get(SIMULATION_CONDITION_ID), + condition.get(PREEQUILIBRATION_CONDITION_ID), + ) + else: + measurement_index = (condition.get(SIMULATION_CONDITION_ID),) + edata = create_edata_for_condition( + condition=condition, + amici_model=amici_model, + measurement_df=measurement_dfs[measurement_index], + petab_problem=petab_problem, + observable_ids=observable_ids, + ) + edatas.append(edata) + + return edatas + + +def _get_timepoints_with_replicates( + df_for_condition: pd.DataFrame, +) -> list[numbers.Number]: + """ + Get list of timepoints including replicate measurements + + :param df_for_condition: + PEtab measurement table subset for a single condition. + + :return: + Sorted list of timepoints, including multiple timepoints accounting + for replicate measurements. + """ + # create sorted list of all timepoints for which measurements exist + timepoints = sorted(df_for_condition[TIME].unique().astype(float)) + + # find replicate numbers of time points + timepoints_w_reps = [] + for time in timepoints: + # subselect for time + df_for_time = df_for_condition[ + df_for_condition.time.astype(float) == time + ] + # rep number is maximum over rep numbers for observables + n_reps = max(df_for_time.groupby([OBSERVABLE_ID, TIME]).size()) + # append time point n_rep times + timepoints_w_reps.extend([time] * n_reps) + + return timepoints_w_reps + + +def _get_measurements_and_sigmas( + df_for_condition: pd.DataFrame, + timepoints_w_reps: Sequence[numbers.Number], + observable_ids: Sequence[str], +) -> tuple[np.array, np.array]: + """ + Get measurements and sigmas + + Generate arrays with measurements and sigmas in AMICI format from a + PEtab measurement table subset for a single condition. + + :param df_for_condition: + Subset of PEtab measurement table for one condition + + :param timepoints_w_reps: + Timepoints for which there exist measurements, including replicates + + :param observable_ids: + List of observable IDs for mapping IDs to indices. + + :return: + arrays for measurement and sigmas + """ + # prepare measurement matrix + y = np.full( + shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan + ) + # prepare sigma matrix + sigma_y = y.copy() + + timepoints = sorted(df_for_condition[TIME].unique().astype(float)) + + for time in timepoints: + # subselect for time + df_for_time = df_for_condition[df_for_condition[TIME] == time] + time_ix_0 = timepoints_w_reps.index(time) + + # remember used time indices for each observable + time_ix_for_obs_ix = {} + + # iterate over measurements + for _, measurement in df_for_time.iterrows(): + # extract observable index + observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) + + # update time index for observable + if observable_ix in time_ix_for_obs_ix: + time_ix_for_obs_ix[observable_ix] += 1 + else: + time_ix_for_obs_ix[observable_ix] = time_ix_0 + + # fill observable and possibly noise parameter + y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[ + MEASUREMENT + ] + if isinstance( + measurement.get(NOISE_PARAMETERS, None), numbers.Number + ): + sigma_y[ + time_ix_for_obs_ix[observable_ix], observable_ix + ] = measurement[NOISE_PARAMETERS] + return y, sigma_y diff --git a/python/sdist/amici/petab/parameter_mapping.py b/python/sdist/amici/petab/parameter_mapping.py new file mode 100644 index 0000000000..a5a5d6e867 --- /dev/null +++ b/python/sdist/amici/petab/parameter_mapping.py @@ -0,0 +1,696 @@ +from __future__ import annotations + +""" +Parameter mapping +----------------- + +When performing parameter inference, often parameters need to be mapped from +simulation to estimation parameters, and parameters can differ between +conditions. This can be handled using the `ParameterMapping`. + +Note +~~~~ + +While the parameter mapping can be used directly with AMICI, it was developed +for usage together with PEtab, for which the whole workflow of generating +the mapping is automatized. +""" + +import logging +import numbers +import re +from collections.abc import Sequence +from itertools import chain +from typing import Any, Collection, Iterator, Union + +import amici +import numpy as np +import pandas as pd +import petab +import sympy as sp +from amici.sbml_import import get_species_initial +from petab.C import * # noqa: F403 +from petab.C import ( + LIN, + PARAMETER_SCALE, + PREEQUILIBRATION_CONDITION_ID, + SIMULATION_CONDITION_ID, +) +from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML +from sympy.abc import _clash + +from .. import AmiciModel +from . import PREEQ_INDICATOR_ID +from .util import get_states_in_condition_table + +try: + import pysb +except ImportError: + pysb = None + + +logger = logging.getLogger(__name__) + +SingleParameterMapping = dict[str, Union[numbers.Number, str]] +SingleScaleMapping = dict[str, str] + + +class ParameterMappingForCondition: + """Parameter mapping for condition. + + Contains mappings for free parameters, fixed parameters, and fixed + preequilibration parameters, both for parameters and scales. + + In the scale mappings, for each simulation parameter the scale + on which the value is passed (and potentially gradients are to be + returned) is given. In the parameter mappings, for each simulation + parameter a corresponding optimization parameter (or a numeric value) + is given. + + If a mapping is not passed, the parameter mappings are assumed to be empty, + and if a scale mapping is not passed, all scales are set to linear. + + :param map_sim_var: + Mapping for free simulation parameters. + :param scale_map_sim_var: + Scales for free simulation parameters. + :param map_preeq_fix: + Mapping for fixed preequilibration parameters. + :param scale_map_preeq_fix: + Scales for fixed preequilibration parameters. + :param map_sim_fix: + Mapping for fixed simulation parameters. + :param scale_map_sim_fix: + Scales for fixed simulation parameters. + """ + + def __init__( + self, + map_sim_var: SingleParameterMapping = None, + scale_map_sim_var: SingleScaleMapping = None, + map_preeq_fix: SingleParameterMapping = None, + scale_map_preeq_fix: SingleScaleMapping = None, + map_sim_fix: SingleParameterMapping = None, + scale_map_sim_fix: SingleScaleMapping = None, + ): + if map_sim_var is None: + map_sim_var = {} + self.map_sim_var = map_sim_var + + if scale_map_sim_var is None: + scale_map_sim_var = {key: LIN for key in map_sim_var} + self.scale_map_sim_var = scale_map_sim_var + + if map_preeq_fix is None: + map_preeq_fix = {} + self.map_preeq_fix = map_preeq_fix + + if scale_map_preeq_fix is None: + scale_map_preeq_fix = {key: LIN for key in map_preeq_fix} + self.scale_map_preeq_fix = scale_map_preeq_fix + + if map_sim_fix is None: + map_sim_fix = {} + self.map_sim_fix = map_sim_fix + + if scale_map_sim_fix is None: + scale_map_sim_fix = {key: LIN for key in map_sim_fix} + self.scale_map_sim_fix = scale_map_sim_fix + + def __repr__(self): + return ( + f"{self.__class__.__name__}(" + f"map_sim_var={repr(self.map_sim_var)}," + f"scale_map_sim_var={repr(self.scale_map_sim_var)}," + f"map_preeq_fix={repr(self.map_preeq_fix)}," + f"scale_map_preeq_fix={repr(self.scale_map_preeq_fix)}," + f"map_sim_fix={repr(self.map_sim_fix)}," + f"scale_map_sim_fix={repr(self.scale_map_sim_fix)})" + ) + + @property + def free_symbols(self) -> set[str]: + """Get IDs of all (symbolic) parameters present in this mapping""" + return { + p + for p in chain( + self.map_sim_var.values(), + self.map_preeq_fix.values(), + self.map_sim_fix.values(), + ) + if isinstance(p, str) + } + + +class ParameterMapping(Sequence): + r"""Parameter mapping for multiple conditions. + + This can be used like a list of :class:`ParameterMappingForCondition`\ s. + + :param parameter_mappings: + List of parameter mappings for specific conditions. + """ + + def __init__( + self, parameter_mappings: list[ParameterMappingForCondition] = None + ): + super().__init__() + if parameter_mappings is None: + parameter_mappings = [] + self.parameter_mappings = parameter_mappings + + def __iter__(self): + yield from self.parameter_mappings + + def __getitem__( + self, item + ) -> Union[ParameterMapping, ParameterMappingForCondition]: + result = self.parameter_mappings[item] + if isinstance(result, ParameterMappingForCondition): + return result + return ParameterMapping(result) + + def __len__(self): + return len(self.parameter_mappings) + + def append( + self, parameter_mapping_for_condition: ParameterMappingForCondition + ): + """Append a condition specific parameter mapping.""" + self.parameter_mappings.append(parameter_mapping_for_condition) + + def __repr__(self): + return f"{self.__class__.__name__}({repr(self.parameter_mappings)})" + + @property + def free_symbols(self) -> set[str]: + """Get IDs of all (symbolic) parameters present in this mapping""" + return set.union(*(mapping.free_symbols for mapping in self)) + + +def petab_to_amici_scale(petab_scale: str) -> int: + """Convert petab scale id to amici scale id.""" + if petab_scale == LIN: + return amici.ParameterScaling_none + if petab_scale == LOG10: + return amici.ParameterScaling_log10 + if petab_scale == LOG: + return amici.ParameterScaling_ln + raise ValueError(f"PEtab scale not recognized: {petab_scale}") + + +def amici_to_petab_scale(amici_scale: int) -> str: + """Convert amici scale id to petab scale id.""" + if amici_scale == amici.ParameterScaling_none: + return LIN + if amici_scale == amici.ParameterScaling_log10: + return LOG10 + if amici_scale == amici.ParameterScaling_ln: + return LOG + raise ValueError(f"AMICI scale not recognized: {amici_scale}") + + +def scale_parameter(value: numbers.Number, petab_scale: str) -> numbers.Number: + """Bring parameter from linear scale to target scale. + + :param value: + Value to scale + :param petab_scale: + Target scale of ``value`` + + :return: + ``value`` on target scale + """ + if petab_scale == LIN: + return value + if petab_scale == LOG10: + return np.log10(value) + if petab_scale == LOG: + return np.log(value) + raise ValueError( + f"Unknown parameter scale {petab_scale}. " + f"Must be from {(LIN, LOG, LOG10)}" + ) + + +def unscale_parameter( + value: numbers.Number, petab_scale: str +) -> numbers.Number: + """Bring parameter from scale to linear scale. + + :param value: + Value to scale + :param petab_scale: + Target scale of ``value`` + + :return: + ``value`` on linear scale + """ + if petab_scale == LIN: + return value + if petab_scale == LOG10: + return np.power(10, value) + if petab_scale == LOG: + return np.exp(value) + raise ValueError( + f"Unknown parameter scale {petab_scale}. " + f"Must be from {(LIN, LOG, LOG10)}" + ) + + +def scale_parameters_dict( + value_dict: dict[Any, numbers.Number], petab_scale_dict: dict[Any, str] +) -> None: + """ + Bring parameters from linear scale to target scale. + + Bring values in ``value_dict`` from linear scale to the scale + provided in ``petab_scale_dict`` (in-place). + Both arguments are expected to have the same length and matching keys. + + :param value_dict: + Values to scale + + :param petab_scale_dict: + Target scales of ``values`` + """ + if value_dict.keys() != petab_scale_dict.keys(): + raise AssertionError("Keys don't match.") + + for key, value in value_dict.items(): + value_dict[key] = scale_parameter(value, petab_scale_dict[key]) + + +def unscale_parameters_dict( + value_dict: dict[Any, numbers.Number], petab_scale_dict: dict[Any, str] +) -> None: + """ + Bring parameters from target scale to linear scale. + + Bring values in ``value_dict`` from linear scale to the scale + provided in ``petab_scale_dict`` (in-place). + Both arguments are expected to have the same length and matching keys. + + :param value_dict: + Values to scale + + :param petab_scale_dict: + Target scales of ``values`` + """ + if value_dict.keys() != petab_scale_dict.keys(): + raise AssertionError("Keys don't match.") + + for key, value in value_dict.items(): + value_dict[key] = unscale_parameter(value, petab_scale_dict[key]) + + +def create_parameter_mapping( + petab_problem: petab.Problem, + simulation_conditions: Union[pd.DataFrame, list[dict]], + scaled_parameters: bool, + amici_model: AmiciModel, + **parameter_mapping_kwargs, +) -> ParameterMapping: + """Generate AMICI specific parameter mapping. + + :param petab_problem: + PEtab problem + :param simulation_conditions: + Result of :func:`petab.get_simulation_conditions`. Can be provided to + save time if this has been obtained before. + :param scaled_parameters: + If ``True``, problem_parameters are assumed to be on the scale provided + in the PEtab parameter table and will be unscaled. If ``False``, they + are assumed to be in linear scale. + :param amici_model: + AMICI model. + :param parameter_mapping_kwargs: + Optional keyword arguments passed to + :func:`petab.get_optimization_to_simulation_parameter_mapping`. + To allow changing fixed PEtab problem parameters (``estimate=0``), + use ``fill_fixed_parameters=False``. + :return: + List of the parameter mappings. + """ + if simulation_conditions is None: + simulation_conditions = ( + petab_problem.get_simulation_conditions_from_measurement_df() + ) + if isinstance(simulation_conditions, list): + simulation_conditions = pd.DataFrame(data=simulation_conditions) + + # Because AMICI globalizes all local parameters during model import, + # we need to do that here as well to prevent parameter mapping errors + # (PEtab does currently not care about SBML LocalParameters) + if petab_problem.model.type_id == MODEL_TYPE_SBML: + import libsbml + + if petab_problem.sbml_document: + converter_config = ( + libsbml.SBMLLocalParameterConverter().getDefaultProperties() + ) + petab_problem.sbml_document.convert(converter_config) + else: + logger.debug( + "No petab_problem.sbml_document is set. Cannot " + "convert SBML LocalParameters. If the model contains " + "LocalParameters, parameter mapping will fail." + ) + + default_parameter_mapping_kwargs = { + "warn_unmapped": False, + "scaled_parameters": scaled_parameters, + "allow_timepoint_specific_numeric_noise_parameters": not petab.lint.observable_table_has_nontrivial_noise_formula( + petab_problem.observable_df + ), + } + if parameter_mapping_kwargs is None: + parameter_mapping_kwargs = {} + + prelim_parameter_mapping = ( + petab.get_optimization_to_simulation_parameter_mapping( + condition_df=petab_problem.condition_df, + measurement_df=petab_problem.measurement_df, + parameter_df=petab_problem.parameter_df, + observable_df=petab_problem.observable_df, + mapping_df=petab_problem.mapping_df, + model=petab_problem.model, + simulation_conditions=simulation_conditions, + **dict( + default_parameter_mapping_kwargs, **parameter_mapping_kwargs + ), + ) + ) + + parameter_mapping = ParameterMapping() + for (_, condition), prelim_mapping_for_condition in zip( + simulation_conditions.iterrows(), prelim_parameter_mapping + ): + mapping_for_condition = create_parameter_mapping_for_condition( + prelim_mapping_for_condition, condition, petab_problem, amici_model + ) + parameter_mapping.append(mapping_for_condition) + + return parameter_mapping + + +def create_parameter_mapping_for_condition( + parameter_mapping_for_condition: petab.ParMappingDictQuadruple, + condition: Union[pd.Series, dict], + petab_problem: petab.Problem, + amici_model: AmiciModel, +) -> ParameterMappingForCondition: + """Generate AMICI specific parameter mapping for condition. + + :param parameter_mapping_for_condition: + Preliminary parameter mapping for condition. + :param condition: + :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and + ``simulationConditionId``. + :param petab_problem: + Underlying PEtab problem. + :param amici_model: + AMICI model. + + :return: + The parameter and parameter scale mappings, for fixed + preequilibration, fixed simulation, and variable simulation + parameters, and then the respective scalings. + """ + ( + condition_map_preeq, + condition_map_sim, + condition_scale_map_preeq, + condition_scale_map_sim, + ) = parameter_mapping_for_condition + logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") + + if len(condition_map_preeq) != len(condition_scale_map_preeq) or len( + condition_map_sim + ) != len(condition_scale_map_sim): + raise AssertionError( + "Number of parameters and number of parameter " + "scales do not match." + ) + if len(condition_map_preeq) and len(condition_map_preeq) != len( + condition_map_sim + ): + logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") + logger.debug(f"Simulation parameter map: {condition_map_sim}") + raise AssertionError( + "Number of parameters for preequilbration " + "and simulation do not match." + ) + + ########################################################################## + # initial states + # Initial states have been set during model import based on the SBML model. + # If initial states were overwritten in the PEtab condition table, they are + # applied here. + # During model generation, parameters for initial concentrations and + # respective initial assignments have been created for the + # relevant species, here we add these parameters to the parameter mapping. + # In absence of preequilibration this could also be handled via + # ExpData.x0, but in the case of preequilibration this would not allow for + # resetting initial states. + + if states_in_condition_table := get_states_in_condition_table( + petab_problem, condition + ): + # set indicator fixed parameter for preeq + # (we expect here, that this parameter was added during import and + # that it was not added by the user with a different meaning...) + if condition_map_preeq: + condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 + condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN + + condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 + condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN + + for element_id, ( + value, + preeq_value, + ) in states_in_condition_table.items(): + # for preequilibration + init_par_id = f"initial_{element_id}_preeq" + if ( + condition_id := condition.get(PREEQUILIBRATION_CONDITION_ID) + ) is not None: + _set_initial_state( + petab_problem, + condition_id, + element_id, + init_par_id, + condition_map_preeq, + condition_scale_map_preeq, + preeq_value, + ) + else: + # need to set dummy value for preeq parameter anyways, as it + # is expected below (set to 0, not nan, because will be + # multiplied with indicator variable in initial assignment) + condition_map_sim[init_par_id] = 0.0 + condition_scale_map_sim[init_par_id] = LIN + + # for simulation + condition_id = condition[SIMULATION_CONDITION_ID] + init_par_id = f"initial_{element_id}_sim" + _set_initial_state( + petab_problem, + condition_id, + element_id, + init_par_id, + condition_map_sim, + condition_scale_map_sim, + value, + ) + + ########################################################################## + # separate fixed and variable AMICI parameters, because we may have + # different fixed parameters for preeq and sim condition, but we cannot + # have different variable parameters. without splitting, + # merge_preeq_and_sim_pars_condition below may fail. + # TODO: This can be done already in parameter mapping creation. + variable_par_ids = amici_model.getParameterIds() + fixed_par_ids = amici_model.getFixedParameterIds() + + condition_map_preeq_var, condition_map_preeq_fix = _subset_dict( + condition_map_preeq, variable_par_ids, fixed_par_ids + ) + + ( + condition_scale_map_preeq_var, + condition_scale_map_preeq_fix, + ) = _subset_dict( + condition_scale_map_preeq, variable_par_ids, fixed_par_ids + ) + + condition_map_sim_var, condition_map_sim_fix = _subset_dict( + condition_map_sim, variable_par_ids, fixed_par_ids + ) + + condition_scale_map_sim_var, condition_scale_map_sim_fix = _subset_dict( + condition_scale_map_sim, variable_par_ids, fixed_par_ids + ) + + logger.debug( + "Fixed parameters preequilibration: " f"{condition_map_preeq_fix}" + ) + logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") + logger.debug( + "Variable parameters preequilibration: " f"{condition_map_preeq_var}" + ) + logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") + + petab.merge_preeq_and_sim_pars_condition( + condition_map_preeq_var, + condition_map_sim_var, + condition_scale_map_preeq_var, + condition_scale_map_sim_var, + condition, + ) + logger.debug(f"Merged: {condition_map_sim_var}") + + parameter_mapping_for_condition = ParameterMappingForCondition( + map_preeq_fix=condition_map_preeq_fix, + map_sim_fix=condition_map_sim_fix, + map_sim_var=condition_map_sim_var, + scale_map_preeq_fix=condition_scale_map_preeq_fix, + scale_map_sim_fix=condition_scale_map_sim_fix, + scale_map_sim_var=condition_scale_map_sim_var, + ) + + return parameter_mapping_for_condition + + +def _set_initial_state( + petab_problem, + condition_id, + element_id, + init_par_id, + par_map, + scale_map, + value, +): + value = petab.to_float_if_float(value) + if pd.isna(value): + if petab_problem.model.type_id == MODEL_TYPE_SBML: + value = _get_initial_state_sbml(petab_problem, element_id) + elif petab_problem.model.type_id == MODEL_TYPE_PYSB: + value = _get_initial_state_pysb(petab_problem, element_id) + + try: + value = float(value) + except (ValueError, TypeError): + if sp.nsimplify(value).is_Atom and ( + pysb is None or not isinstance(value, pysb.Component) + ): + # Get rid of multiplication with one + value = sp.nsimplify(value) + else: + raise NotImplementedError( + "Cannot handle non-trivial initial state " + f"expression for {element_id}: {value}" + ) + # this should be a parameter ID + value = str(value) + logger.debug( + f"The species {element_id} has no initial value " + f"defined for the condition {condition_id} in " + "the PEtab conditions table. The initial value is " + f"now set to {value}, which is the initial value " + "defined in the SBML model." + ) + par_map[init_par_id] = value + if isinstance(value, float): + # numeric initial state + scale_map[init_par_id] = petab.LIN + else: + # parametric initial state + scale_map[init_par_id] = petab_problem.parameter_df[ + PARAMETER_SCALE + ].get(value, petab.LIN) + + +def _subset_dict( + full: dict[Any, Any], *args: Collection[Any] +) -> Iterator[dict[Any, Any]]: + """Get subset of dictionary based on provided keys + + :param full: + Dictionary to subset + :param args: + Collections of keys to be contained in the different subsets + + :return: + subsetted dictionary + """ + for keys in args: + yield {key: val for (key, val) in full.items() if key in keys} + + +def _get_initial_state_sbml( + petab_problem: petab.Problem, element_id: str +) -> Union[float, sp.Basic]: + import libsbml + + element = petab_problem.sbml_model.getElementBySId(element_id) + type_code = element.getTypeCode() + initial_assignment = petab_problem.sbml_model.getInitialAssignmentBySymbol( + element_id + ) + if initial_assignment: + initial_assignment = sp.sympify( + libsbml.formulaToL3String(initial_assignment.getMath()), + locals=_clash, + ) + if type_code == libsbml.SBML_SPECIES: + value = ( + get_species_initial(element) + if initial_assignment is None + else initial_assignment + ) + elif type_code == libsbml.SBML_PARAMETER: + value = ( + element.getValue() + if initial_assignment is None + else initial_assignment + ) + elif type_code == libsbml.SBML_COMPARTMENT: + value = ( + element.getSize() + if initial_assignment is None + else initial_assignment + ) + else: + raise NotImplementedError( + f"Don't know what how to handle {element_id} in " + "condition table." + ) + return value + + +def _get_initial_state_pysb( + petab_problem: petab.Problem, element_id: str +) -> Union[float, sp.Symbol]: + species_idx = int(re.match(r"__s(\d+)$", element_id)[1]) + species_pattern = petab_problem.model.model.species[species_idx] + from pysb.pattern import match_complex_pattern + + value = next( + ( + initial.value + for initial in petab_problem.model.model.initials + if match_complex_pattern( + initial.pattern, species_pattern, exact=True + ) + ), + 0.0, + ) + if isinstance(value, pysb.Parameter): + if value.name in petab_problem.parameter_df.index: + value = value.name + else: + value = value.value + + return value diff --git a/python/sdist/amici/petab/util.py b/python/sdist/amici/petab/util.py new file mode 100644 index 0000000000..d30c1a6e9b --- /dev/null +++ b/python/sdist/amici/petab/util.py @@ -0,0 +1,103 @@ +"""Various helper functions for working with PEtab problems.""" +import re +from typing import Dict, Tuple, Union + +import libsbml +import pandas as pd +import petab +from petab.C import PREEQUILIBRATION_CONDITION_ID, SIMULATION_CONDITION_ID +from petab.mapping import resolve_mapping +from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML + + +def get_states_in_condition_table( + petab_problem: petab.Problem, + condition: Union[Dict, pd.Series] = None, + return_patterns: bool = False, +) -> Dict[str, Tuple[Union[float, str, None], Union[float, str, None]]]: + """Get states and their initial condition as specified in the condition table. + + Returns: Dictionary: ``stateId -> (initial condition simulation, initial condition preequilibration)`` + """ + if petab_problem.model.type_id not in (MODEL_TYPE_SBML, MODEL_TYPE_PYSB): + raise NotImplementedError() + + species_check_funs = { + MODEL_TYPE_SBML: lambda x: _element_is_sbml_state( + petab_problem.sbml_model, x + ), + MODEL_TYPE_PYSB: lambda x: _element_is_pysb_pattern( + petab_problem.model.model, x + ), + } + states = { + resolve_mapping(petab_problem.mapping_df, col): (None, None) + if condition is None + else ( + petab_problem.condition_df.loc[ + condition[SIMULATION_CONDITION_ID], col + ], + petab_problem.condition_df.loc[ + condition[PREEQUILIBRATION_CONDITION_ID], col + ] + if PREEQUILIBRATION_CONDITION_ID in condition + else None, + ) + for col in petab_problem.condition_df.columns + if species_check_funs[petab_problem.model.type_id]( + resolve_mapping(petab_problem.mapping_df, col) + ) + } + + if petab_problem.model.type_id == MODEL_TYPE_PYSB: + if return_patterns: + return states + import pysb.pattern + + if not petab_problem.model.model.species: + import pysb.bng + + pysb.bng.generate_equations(petab_problem.model.model) + + try: + spm = pysb.pattern.SpeciesPatternMatcher( + model=petab_problem.model.model + ) + except NotImplementedError as e: + raise NotImplementedError( + "Requires https://github.com/pysb/pysb/pull/570. " + "To use this functionality, update pysb via " + "`pip install git+https://github.com/FFroehlich/pysb@fix_pattern_matching`" + ) + + # expose model components as variables so we can evaluate patterns + for c in petab_problem.model.model.components: + globals()[c.name] = c + + states = { + f"__s{ix}": value + for pattern, value in states.items() + for ix in spm.match(eval(pattern), index=True, exact=True) + } + return states + + +def _element_is_pysb_pattern(model: "pysb.Model", element: str) -> bool: + """Check if element is a pysb pattern""" + if match := re.match(r"[a-zA-Z_][\w_]*\(", element): + return match[0][:-1] in [m.name for m in model.monomers] + return False + + +def _element_is_sbml_state(sbml_model: libsbml.Model, sbml_id: str) -> bool: + """Does the element with ID `sbml_id` correspond to a state variable?""" + if sbml_model.getCompartment(sbml_id) is not None: + return True + if sbml_model.getSpecies(sbml_id) is not None: + return True + if ( + rule := sbml_model.getRuleByVariable(sbml_id) + ) is not None and rule.getTypeCode() == libsbml.SBML_RATE_RULE: + return True + + return False diff --git a/python/sdist/amici/petab_import.py b/python/sdist/amici/petab_import.py index 23fe4394f0..a33b419345 100644 --- a/python/sdist/amici/petab_import.py +++ b/python/sdist/amici/petab_import.py @@ -4,7 +4,6 @@ Import a model in the :mod:`petab` (https://github.com/PEtab-dev/PEtab) format into AMICI. """ -import argparse import importlib import logging import math @@ -29,7 +28,8 @@ from petab.parameters import get_valid_parameters_for_parameter_table from sympy.abc import _clash -from .petab_util import PREEQ_INDICATOR_ID, get_states_in_condition_table +from .petab import PREEQ_INDICATOR_ID +from .petab.util import get_states_in_condition_table try: from amici.petab_import_pysb import import_model_pysb @@ -896,159 +896,3 @@ def show_model_info(sbml_model: "libsbml.Model"): "Global parameters: " + str(len(sbml_model.getListOfParameters())) ) logger.info(f"Reactions: {len(sbml_model.getListOfReactions())}") - - -def _parse_cli_args(): - """ - Parse command line arguments - - :return: - Parsed CLI arguments from :mod:`argparse`. - """ - parser = argparse.ArgumentParser( - description="Import PEtab-format model into AMICI." - ) - - # General options: - parser.add_argument( - "-v", - "--verbose", - dest="verbose", - action="store_true", - help="More verbose output", - ) - parser.add_argument( - "-o", - "--output-dir", - dest="model_output_dir", - help="Name of the model directory to create", - ) - parser.add_argument( - "--no-compile", - action="store_false", - dest="compile", - help="Only generate model code, do not compile", - ) - parser.add_argument( - "--no-validate", - action="store_false", - dest="validate", - help="Skip validation of PEtab files", - ) - parser.add_argument( - "--flatten", - dest="flatten", - default=False, - action="store_true", - help="Flatten measurement specific overrides of " - "observable and noise parameters", - ) - parser.add_argument( - "--no-sensitivities", - dest="generate_sensitivity_code", - default=True, - action="store_false", - help="Skip generation of sensitivity code", - ) - - # Call with set of files - parser.add_argument( - "-s", "--sbml", dest="sbml_file_name", help="SBML model filename" - ) - parser.add_argument( - "-m", - "--measurements", - dest="measurement_file_name", - help="Measurement table", - ) - parser.add_argument( - "-c", - "--conditions", - dest="condition_file_name", - help="Conditions table", - ) - parser.add_argument( - "-p", - "--parameters", - dest="parameter_file_name", - help="Parameter table", - ) - parser.add_argument( - "-b", - "--observables", - dest="observable_file_name", - help="Observable table", - ) - - parser.add_argument( - "-y", - "--yaml", - dest="yaml_file_name", - help="PEtab YAML problem filename", - ) - - parser.add_argument( - "-n", - "--model-name", - dest="model_name", - help="Name of the python module generated for the " "model", - ) - - args = parser.parse_args() - - if not args.yaml_file_name and not all( - ( - args.sbml_file_name, - args.condition_file_name, - args.observable_file_name, - ) - ): - parser.error( - "When not specifying a model name or YAML file, then " - "SBML, condition and observable file must be specified" - ) - - return args - - -def _main(): - """ - Command line interface to import a model in the PEtab - (https://github.com/PEtab-dev/PEtab/) format into AMICI. - """ - args = _parse_cli_args() - - if args.yaml_file_name: - pp = petab.Problem.from_yaml(args.yaml_file_name) - else: - pp = petab.Problem.from_files( - sbml_file=args.sbml_file_name, - condition_file=args.condition_file_name, - measurement_file=args.measurement_file_name, - parameter_file=args.parameter_file_name, - observable_files=args.observable_file_name, - ) - - # Check for valid PEtab before potentially modifying it - if args.validate: - petab.lint_problem(pp) - - if args.flatten: - petab.flatten_timepoint_specific_output_overrides(pp) - - import_model( - model_name=args.model_name, - sbml_model=pp.sbml_model, - condition_table=pp.condition_df, - observable_table=pp.observable_df, - measurement_table=pp.measurement_df, - model_output_dir=args.model_output_dir, - compile=args.compile, - generate_sensitivity_code=args.generate_sensitivity_code, - verbose=args.verbose, - validate=False, - ) - - -if __name__ == "__main__": - _main() diff --git a/python/sdist/amici/petab_import_pysb.py b/python/sdist/amici/petab_import_pysb.py index 8036d1358d..ccff072261 100644 --- a/python/sdist/amici/petab_import_pysb.py +++ b/python/sdist/amici/petab_import_pysb.py @@ -18,7 +18,8 @@ from petab.models.pysb_model import PySBModel from .logging import get_logger, log_execution_time, set_log_level -from .petab_util import PREEQ_INDICATOR_ID, get_states_in_condition_table +from .petab import PREEQ_INDICATOR_ID +from .petab.util import get_states_in_condition_table logger = get_logger(__name__, logging.WARNING) diff --git a/python/sdist/amici/petab_objective.py b/python/sdist/amici/petab_objective.py index e3111d3b68..8fbb191bae 100644 --- a/python/sdist/amici/petab_objective.py +++ b/python/sdist/amici/petab_objective.py @@ -4,43 +4,34 @@ Functionality related to running simulations or evaluating the objective function as defined by a PEtab problem """ - import copy import logging -import numbers -import re -from typing import ( - Any, - Collection, - Dict, - Iterator, - List, - Optional, - Sequence, - Tuple, - Union, -) +from typing import Any, Dict, List, Optional, Sequence, Union import amici -import libsbml import numpy as np import pandas as pd import petab -import sympy as sp -from amici.sbml_import import get_species_initial from petab.C import * # noqa: F403 -from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML -from sympy.abc import _clash from . import AmiciExpData, AmiciModel from .logging import get_logger, log_execution_time -from .parameter_mapping import ( - ParameterMapping, - ParameterMappingForCondition, +from .parameter_mapping import ParameterMapping + +# some extra imports for backward-compatibility +from .petab.conditions import ( # noqa # pylint: disable=unused-import + create_edata_for_condition, + create_edatas, + create_parameterized_edatas, fill_in_parameters, ) -from .petab_import import PREEQ_INDICATOR_ID -from .petab_util import get_states_in_condition_table +from .petab.parameter_mapping import ( # noqa # pylint: disable=unused-import + create_parameter_mapping, + create_parameter_mapping_for_condition, +) +from .petab.util import ( # noqa # pylint: disable=unused-import + get_states_in_condition_table, +) try: import pysb @@ -378,696 +369,6 @@ def rescale_sensitivity( return scale[(old_scale, new_scale)](sensitivity) -def create_parameterized_edatas( - amici_model: AmiciModel, - petab_problem: petab.Problem, - problem_parameters: Dict[str, numbers.Number], - scaled_parameters: bool = False, - parameter_mapping: ParameterMapping = None, - simulation_conditions: Union[pd.DataFrame, Dict] = None, -) -> List[amici.ExpData]: - """Create list of :class:amici.ExpData objects with parameters filled in. - - :param amici_model: - AMICI Model assumed to be compatible with ``petab_problem``. - :param petab_problem: - PEtab problem to work on. - :param problem_parameters: - Run simulation with these parameters. If ``None``, PEtab - ``nominalValues`` will be used. To be provided as dict, mapping PEtab - problem parameters to SBML IDs. - :param scaled_parameters: - If ``True``, ``problem_parameters`` are assumed to be on the scale - provided in the PEtab parameter table and will be unscaled. - If ``False``, they are assumed to be in linear scale. - :param parameter_mapping: - Optional precomputed PEtab parameter mapping for efficiency, as - generated by :func:`create_parameter_mapping`. - :param simulation_conditions: - Result of :func:`petab.get_simulation_conditions`. Can be provided to - save time if this has been obtained before. - - :return: - List with one :class:`amici.amici.ExpData` per simulation condition, - with filled in timepoints, data and parameters. - """ - # number of amici simulations will be number of unique - # (preequilibrationConditionId, simulationConditionId) pairs. - # Can be optimized by checking for identical condition vectors. - if simulation_conditions is None: - simulation_conditions = ( - petab_problem.get_simulation_conditions_from_measurement_df() - ) - - # Get parameter mapping - if parameter_mapping is None: - parameter_mapping = create_parameter_mapping( - petab_problem=petab_problem, - simulation_conditions=simulation_conditions, - scaled_parameters=scaled_parameters, - amici_model=amici_model, - ) - - # Generate ExpData with all condition-specific information - edatas = create_edatas( - amici_model=amici_model, - petab_problem=petab_problem, - simulation_conditions=simulation_conditions, - ) - - # Fill parameters in ExpDatas (in-place) - fill_in_parameters( - edatas=edatas, - problem_parameters=problem_parameters, - scaled_parameters=scaled_parameters, - parameter_mapping=parameter_mapping, - amici_model=amici_model, - ) - - return edatas - - -def create_parameter_mapping( - petab_problem: petab.Problem, - simulation_conditions: Union[pd.DataFrame, List[Dict]], - scaled_parameters: bool, - amici_model: AmiciModel, - **parameter_mapping_kwargs, -) -> ParameterMapping: - """Generate AMICI specific parameter mapping. - - :param petab_problem: - PEtab problem - :param simulation_conditions: - Result of :func:`petab.get_simulation_conditions`. Can be provided to - save time if this has been obtained before. - :param scaled_parameters: - If ``True``, problem_parameters are assumed to be on the scale provided - in the PEtab parameter table and will be unscaled. If ``False``, they - are assumed to be in linear scale. - :param amici_model: - AMICI model. - :param parameter_mapping_kwargs: - Optional keyword arguments passed to - :func:`petab.get_optimization_to_simulation_parameter_mapping`. - To allow changing fixed PEtab problem parameters (``estimate=0``), - use ``fill_fixed_parameters=False``. - :return: - List of the parameter mappings. - """ - if simulation_conditions is None: - simulation_conditions = ( - petab_problem.get_simulation_conditions_from_measurement_df() - ) - if isinstance(simulation_conditions, list): - simulation_conditions = pd.DataFrame(data=simulation_conditions) - - # Because AMICI globalizes all local parameters during model import, - # we need to do that here as well to prevent parameter mapping errors - # (PEtab does currently not care about SBML LocalParameters) - if petab_problem.model.type_id == MODEL_TYPE_SBML: - if petab_problem.sbml_document: - converter_config = ( - libsbml.SBMLLocalParameterConverter().getDefaultProperties() - ) - petab_problem.sbml_document.convert(converter_config) - else: - logger.debug( - "No petab_problem.sbml_document is set. Cannot " - "convert SBML LocalParameters. If the model contains " - "LocalParameters, parameter mapping will fail." - ) - - default_parameter_mapping_kwargs = { - "warn_unmapped": False, - "scaled_parameters": scaled_parameters, - "allow_timepoint_specific_numeric_noise_parameters": not petab.lint.observable_table_has_nontrivial_noise_formula( - petab_problem.observable_df - ), - } - if parameter_mapping_kwargs is None: - parameter_mapping_kwargs = {} - - prelim_parameter_mapping = ( - petab.get_optimization_to_simulation_parameter_mapping( - condition_df=petab_problem.condition_df, - measurement_df=petab_problem.measurement_df, - parameter_df=petab_problem.parameter_df, - observable_df=petab_problem.observable_df, - mapping_df=petab_problem.mapping_df, - model=petab_problem.model, - simulation_conditions=simulation_conditions, - **dict( - default_parameter_mapping_kwargs, **parameter_mapping_kwargs - ), - ) - ) - - parameter_mapping = ParameterMapping() - for (_, condition), prelim_mapping_for_condition in zip( - simulation_conditions.iterrows(), prelim_parameter_mapping - ): - mapping_for_condition = create_parameter_mapping_for_condition( - prelim_mapping_for_condition, condition, petab_problem, amici_model - ) - parameter_mapping.append(mapping_for_condition) - - return parameter_mapping - - -def _get_initial_state_sbml( - petab_problem: petab.Problem, element_id: str -) -> Union[float, sp.Basic]: - element = petab_problem.sbml_model.getElementBySId(element_id) - type_code = element.getTypeCode() - initial_assignment = petab_problem.sbml_model.getInitialAssignmentBySymbol( - element_id - ) - if initial_assignment: - initial_assignment = sp.sympify( - libsbml.formulaToL3String(initial_assignment.getMath()), - locals=_clash, - ) - if type_code == libsbml.SBML_SPECIES: - value = ( - get_species_initial(element) - if initial_assignment is None - else initial_assignment - ) - elif type_code == libsbml.SBML_PARAMETER: - value = ( - element.getValue() - if initial_assignment is None - else initial_assignment - ) - elif type_code == libsbml.SBML_COMPARTMENT: - value = ( - element.getSize() - if initial_assignment is None - else initial_assignment - ) - else: - raise NotImplementedError( - f"Don't know what how to handle {element_id} in " - "condition table." - ) - return value - - -def _get_initial_state_pysb( - petab_problem: petab.Problem, element_id: str -) -> Union[float, sp.Symbol]: - species_idx = int(re.match(r"__s(\d+)$", element_id)[1]) - species_pattern = petab_problem.model.model.species[species_idx] - from pysb.pattern import match_complex_pattern - - value = next( - ( - initial.value - for initial in petab_problem.model.model.initials - if match_complex_pattern( - initial.pattern, species_pattern, exact=True - ) - ), - 0.0, - ) - if isinstance(value, pysb.Parameter): - if value.name in petab_problem.parameter_df.index: - value = value.name - else: - value = value.value - - return value - - -def _set_initial_state( - petab_problem, - condition_id, - element_id, - init_par_id, - par_map, - scale_map, - value, -): - value = petab.to_float_if_float(value) - if pd.isna(value): - if petab_problem.model.type_id == MODEL_TYPE_SBML: - value = _get_initial_state_sbml(petab_problem, element_id) - elif petab_problem.model.type_id == MODEL_TYPE_PYSB: - value = _get_initial_state_pysb(petab_problem, element_id) - - try: - value = float(value) - except (ValueError, TypeError): - if sp.nsimplify(value).is_Atom and ( - pysb is None or not isinstance(value, pysb.Component) - ): - # Get rid of multiplication with one - value = sp.nsimplify(value) - else: - raise NotImplementedError( - "Cannot handle non-trivial initial state " - f"expression for {element_id}: {value}" - ) - # this should be a parameter ID - value = str(value) - logger.debug( - f"The species {element_id} has no initial value " - f"defined for the condition {condition_id} in " - "the PEtab conditions table. The initial value is " - f"now set to {value}, which is the initial value " - "defined in the SBML model." - ) - par_map[init_par_id] = value - if isinstance(value, float): - # numeric initial state - scale_map[init_par_id] = petab.LIN - else: - # parametric initial state - scale_map[init_par_id] = petab_problem.parameter_df[ - PARAMETER_SCALE - ].get(value, petab.LIN) - - -def create_parameter_mapping_for_condition( - parameter_mapping_for_condition: petab.ParMappingDictQuadruple, - condition: Union[pd.Series, Dict], - petab_problem: petab.Problem, - amici_model: AmiciModel, -) -> ParameterMappingForCondition: - """Generate AMICI specific parameter mapping for condition. - - :param parameter_mapping_for_condition: - Preliminary parameter mapping for condition. - :param condition: - :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and - ``simulationConditionId``. - :param petab_problem: - Underlying PEtab problem. - :param amici_model: - AMICI model. - - :return: - The parameter and parameter scale mappings, for fixed - preequilibration, fixed simulation, and variable simulation - parameters, and then the respective scalings. - """ - ( - condition_map_preeq, - condition_map_sim, - condition_scale_map_preeq, - condition_scale_map_sim, - ) = parameter_mapping_for_condition - logger.debug(f"PEtab mapping: {parameter_mapping_for_condition}") - - if len(condition_map_preeq) != len(condition_scale_map_preeq) or len( - condition_map_sim - ) != len(condition_scale_map_sim): - raise AssertionError( - "Number of parameters and number of parameter " - "scales do not match." - ) - if len(condition_map_preeq) and len(condition_map_preeq) != len( - condition_map_sim - ): - logger.debug(f"Preequilibration parameter map: {condition_map_preeq}") - logger.debug(f"Simulation parameter map: {condition_map_sim}") - raise AssertionError( - "Number of parameters for preequilbration " - "and simulation do not match." - ) - - ########################################################################## - # initial states - # Initial states have been set during model import based on the SBML model. - # If initial states were overwritten in the PEtab condition table, they are - # applied here. - # During model generation, parameters for initial concentrations and - # respective initial assignments have been created for the - # relevant species, here we add these parameters to the parameter mapping. - # In absence of preequilibration this could also be handled via - # ExpData.x0, but in the case of preequilibration this would not allow for - # resetting initial states. - - if states_in_condition_table := get_states_in_condition_table( - petab_problem, condition - ): - # set indicator fixed parameter for preeq - # (we expect here, that this parameter was added during import and - # that it was not added by the user with a different meaning...) - if condition_map_preeq: - condition_map_preeq[PREEQ_INDICATOR_ID] = 1.0 - condition_scale_map_preeq[PREEQ_INDICATOR_ID] = LIN - - condition_map_sim[PREEQ_INDICATOR_ID] = 0.0 - condition_scale_map_sim[PREEQ_INDICATOR_ID] = LIN - - for element_id, ( - value, - preeq_value, - ) in states_in_condition_table.items(): - # for preequilibration - init_par_id = f"initial_{element_id}_preeq" - if ( - condition_id := condition.get(PREEQUILIBRATION_CONDITION_ID) - ) is not None: - _set_initial_state( - petab_problem, - condition_id, - element_id, - init_par_id, - condition_map_preeq, - condition_scale_map_preeq, - preeq_value, - ) - else: - # need to set dummy value for preeq parameter anyways, as it - # is expected below (set to 0, not nan, because will be - # multiplied with indicator variable in initial assignment) - condition_map_sim[init_par_id] = 0.0 - condition_scale_map_sim[init_par_id] = LIN - - # for simulation - condition_id = condition[SIMULATION_CONDITION_ID] - init_par_id = f"initial_{element_id}_sim" - _set_initial_state( - petab_problem, - condition_id, - element_id, - init_par_id, - condition_map_sim, - condition_scale_map_sim, - value, - ) - - ########################################################################## - # separate fixed and variable AMICI parameters, because we may have - # different fixed parameters for preeq and sim condition, but we cannot - # have different variable parameters. without splitting, - # merge_preeq_and_sim_pars_condition below may fail. - # TODO: This can be done already in parameter mapping creation. - variable_par_ids = amici_model.getParameterIds() - fixed_par_ids = amici_model.getFixedParameterIds() - - condition_map_preeq_var, condition_map_preeq_fix = _subset_dict( - condition_map_preeq, variable_par_ids, fixed_par_ids - ) - - ( - condition_scale_map_preeq_var, - condition_scale_map_preeq_fix, - ) = _subset_dict( - condition_scale_map_preeq, variable_par_ids, fixed_par_ids - ) - - condition_map_sim_var, condition_map_sim_fix = _subset_dict( - condition_map_sim, variable_par_ids, fixed_par_ids - ) - - condition_scale_map_sim_var, condition_scale_map_sim_fix = _subset_dict( - condition_scale_map_sim, variable_par_ids, fixed_par_ids - ) - - logger.debug( - "Fixed parameters preequilibration: " f"{condition_map_preeq_fix}" - ) - logger.debug("Fixed parameters simulation: " f"{condition_map_sim_fix}") - logger.debug( - "Variable parameters preequilibration: " f"{condition_map_preeq_var}" - ) - logger.debug("Variable parameters simulation: " f"{condition_map_sim_var}") - - petab.merge_preeq_and_sim_pars_condition( - condition_map_preeq_var, - condition_map_sim_var, - condition_scale_map_preeq_var, - condition_scale_map_sim_var, - condition, - ) - logger.debug(f"Merged: {condition_map_sim_var}") - - parameter_mapping_for_condition = ParameterMappingForCondition( - map_preeq_fix=condition_map_preeq_fix, - map_sim_fix=condition_map_sim_fix, - map_sim_var=condition_map_sim_var, - scale_map_preeq_fix=condition_scale_map_preeq_fix, - scale_map_sim_fix=condition_scale_map_sim_fix, - scale_map_sim_var=condition_scale_map_sim_var, - ) - - return parameter_mapping_for_condition - - -def create_edatas( - amici_model: AmiciModel, - petab_problem: petab.Problem, - simulation_conditions: Union[pd.DataFrame, Dict] = None, -) -> List[amici.ExpData]: - """Create list of :class:`amici.amici.ExpData` objects for PEtab problem. - - :param amici_model: - AMICI model. - :param petab_problem: - Underlying PEtab problem. - :param simulation_conditions: - Result of :func:`petab.get_simulation_conditions`. Can be provided to - save time if this has be obtained before. - - :return: - List with one :class:`amici.amici.ExpData` per simulation condition, - with filled in timepoints and data. - """ - if simulation_conditions is None: - simulation_conditions = ( - petab_problem.get_simulation_conditions_from_measurement_df() - ) - - observable_ids = amici_model.getObservableIds() - - measurement_groupvar = [SIMULATION_CONDITION_ID] - if PREEQUILIBRATION_CONDITION_ID in simulation_conditions: - measurement_groupvar.append(petab.PREEQUILIBRATION_CONDITION_ID) - measurement_dfs = dict( - list(petab_problem.measurement_df.groupby(measurement_groupvar)) - ) - - edatas = [] - for _, condition in simulation_conditions.iterrows(): - # Create amici.ExpData for each simulation - if PREEQUILIBRATION_CONDITION_ID in condition: - measurement_index = ( - condition.get(SIMULATION_CONDITION_ID), - condition.get(PREEQUILIBRATION_CONDITION_ID), - ) - else: - measurement_index = (condition.get(SIMULATION_CONDITION_ID),) - edata = create_edata_for_condition( - condition=condition, - amici_model=amici_model, - measurement_df=measurement_dfs[measurement_index], - petab_problem=petab_problem, - observable_ids=observable_ids, - ) - edatas.append(edata) - - return edatas - - -def create_edata_for_condition( - condition: Union[Dict, pd.Series], - measurement_df: pd.DataFrame, - amici_model: AmiciModel, - petab_problem: petab.Problem, - observable_ids: List[str], -) -> amici.ExpData: - """Get :class:`amici.amici.ExpData` for the given PEtab condition. - - Sets timepoints, observed data and sigmas. - - :param condition: - :class:`pandas.DataFrame` row with ``preequilibrationConditionId`` and - ``simulationConditionId``. - :param measurement_df: - :class:`pandas.DataFrame` with measurements for the given condition. - :param amici_model: - AMICI model - :param petab_problem: - Underlying PEtab problem - :param observable_ids: - List of observable IDs - - :return: - ExpData instance. - """ - if amici_model.nytrue != len(observable_ids): - raise AssertionError( - "Number of AMICI model observables does not " - "match number of PEtab observables." - ) - - # create an ExpData object - edata = amici.ExpData(amici_model) - edata.id = condition[SIMULATION_CONDITION_ID] - if condition.get(PREEQUILIBRATION_CONDITION_ID): - edata.id += "+" + condition.get(PREEQUILIBRATION_CONDITION_ID) - ########################################################################## - # enable initial parameters reinitialization - - states_in_condition_table = get_states_in_condition_table( - petab_problem, condition=condition - ) - if ( - condition.get(PREEQUILIBRATION_CONDITION_ID) - and states_in_condition_table - ): - state_ids = amici_model.getStateIds() - state_idx_reinitalization = [ - state_ids.index(s) - for s, (v, v_preeq) in states_in_condition_table.items() - if not np.isnan(v) - ] - edata.reinitialization_state_idxs_sim = state_idx_reinitalization - logger.debug( - "Enabling state reinitialization for condition " - f"{condition.get(PREEQUILIBRATION_CONDITION_ID, '')} - " - f"{condition.get(SIMULATION_CONDITION_ID)} " - f"{states_in_condition_table}" - ) - - ########################################################################## - # timepoints - - # find replicate numbers of time points - timepoints_w_reps = _get_timepoints_with_replicates( - df_for_condition=measurement_df - ) - edata.setTimepoints(timepoints_w_reps) - - ########################################################################## - # measurements and sigmas - y, sigma_y = _get_measurements_and_sigmas( - df_for_condition=measurement_df, - timepoints_w_reps=timepoints_w_reps, - observable_ids=observable_ids, - ) - edata.setObservedData(y.flatten()) - edata.setObservedDataStdDev(sigma_y.flatten()) - - return edata - - -def _subset_dict( - full: Dict[Any, Any], *args: Collection[Any] -) -> Iterator[Dict[Any, Any]]: - """Get subset of dictionary based on provided keys - - :param full: - Dictionary to subset - :param args: - Collections of keys to be contained in the different subsets - - :return: - subsetted dictionary - """ - for keys in args: - yield {key: val for (key, val) in full.items() if key in keys} - - -def _get_timepoints_with_replicates( - df_for_condition: pd.DataFrame, -) -> List[numbers.Number]: - """ - Get list of timepoints including replicate measurements - - :param df_for_condition: - PEtab measurement table subset for a single condition. - - :return: - Sorted list of timepoints, including multiple timepoints accounting - for replicate measurements. - """ - # create sorted list of all timepoints for which measurements exist - timepoints = sorted(df_for_condition[TIME].unique().astype(float)) - - # find replicate numbers of time points - timepoints_w_reps = [] - for time in timepoints: - # subselect for time - df_for_time = df_for_condition[ - df_for_condition.time.astype(float) == time - ] - # rep number is maximum over rep numbers for observables - n_reps = max(df_for_time.groupby([OBSERVABLE_ID, TIME]).size()) - # append time point n_rep times - timepoints_w_reps.extend([time] * n_reps) - - return timepoints_w_reps - - -def _get_measurements_and_sigmas( - df_for_condition: pd.DataFrame, - timepoints_w_reps: Sequence[numbers.Number], - observable_ids: Sequence[str], -) -> Tuple[np.array, np.array]: - """ - Get measurements and sigmas - - Generate arrays with measurements and sigmas in AMICI format from a - PEtab measurement table subset for a single condition. - - :param df_for_condition: - Subset of PEtab measurement table for one condition - - :param timepoints_w_reps: - Timepoints for which there exist measurements, including replicates - - :param observable_ids: - List of observable IDs for mapping IDs to indices. - - :return: - arrays for measurement and sigmas - """ - # prepare measurement matrix - y = np.full( - shape=(len(timepoints_w_reps), len(observable_ids)), fill_value=np.nan - ) - # prepare sigma matrix - sigma_y = y.copy() - - timepoints = sorted(df_for_condition[TIME].unique().astype(float)) - - for time in timepoints: - # subselect for time - df_for_time = df_for_condition[df_for_condition[TIME] == time] - time_ix_0 = timepoints_w_reps.index(time) - - # remember used time indices for each observable - time_ix_for_obs_ix = {} - - # iterate over measurements - for _, measurement in df_for_time.iterrows(): - # extract observable index - observable_ix = observable_ids.index(measurement[OBSERVABLE_ID]) - - # update time index for observable - if observable_ix in time_ix_for_obs_ix: - time_ix_for_obs_ix[observable_ix] += 1 - else: - time_ix_for_obs_ix[observable_ix] = time_ix_0 - - # fill observable and possibly noise parameter - y[time_ix_for_obs_ix[observable_ix], observable_ix] = measurement[ - MEASUREMENT - ] - if isinstance( - measurement.get(NOISE_PARAMETERS, None), numbers.Number - ): - sigma_y[ - time_ix_for_obs_ix[observable_ix], observable_ix - ] = measurement[NOISE_PARAMETERS] - return y, sigma_y - - def rdatas_to_measurement_df( rdatas: Sequence[amici.ReturnData], model: AmiciModel, diff --git a/python/sdist/amici/petab_util.py b/python/sdist/amici/petab_util.py index 9108b108bc..6baf41d455 100644 --- a/python/sdist/amici/petab_util.py +++ b/python/sdist/amici/petab_util.py @@ -1,107 +1,9 @@ """Various helper functions for working with PEtab problems.""" -import re -from typing import Dict, Tuple, Union +import warnings -import libsbml -import pandas as pd -import petab -from petab.C import PREEQUILIBRATION_CONDITION_ID, SIMULATION_CONDITION_ID -from petab.mapping import resolve_mapping -from petab.models import MODEL_TYPE_PYSB, MODEL_TYPE_SBML +from .petab.util import PREEQ_INDICATOR_ID, get_states_in_condition_table -# ID of model parameter that is to be added to SBML model to indicate -# preequilibration -PREEQ_INDICATOR_ID = "preequilibration_indicator" - - -def get_states_in_condition_table( - petab_problem: petab.Problem, - condition: Union[Dict, pd.Series] = None, - return_patterns: bool = False, -) -> Dict[str, Tuple[Union[float, str, None], Union[float, str, None]]]: - """Get states and their initial condition as specified in the condition table. - - Returns: Dictionary: ``stateId -> (initial condition simulation, initial condition preequilibration)`` - """ - if petab_problem.model.type_id not in (MODEL_TYPE_SBML, MODEL_TYPE_PYSB): - raise NotImplementedError() - - species_check_funs = { - MODEL_TYPE_SBML: lambda x: _element_is_sbml_state( - petab_problem.sbml_model, x - ), - MODEL_TYPE_PYSB: lambda x: _element_is_pysb_pattern( - petab_problem.model.model, x - ), - } - states = { - resolve_mapping(petab_problem.mapping_df, col): (None, None) - if condition is None - else ( - petab_problem.condition_df.loc[ - condition[SIMULATION_CONDITION_ID], col - ], - petab_problem.condition_df.loc[ - condition[PREEQUILIBRATION_CONDITION_ID], col - ] - if PREEQUILIBRATION_CONDITION_ID in condition - else None, - ) - for col in petab_problem.condition_df.columns - if species_check_funs[petab_problem.model.type_id]( - resolve_mapping(petab_problem.mapping_df, col) - ) - } - - if petab_problem.model.type_id == MODEL_TYPE_PYSB: - if return_patterns: - return states - import pysb.pattern - - if not petab_problem.model.model.species: - import pysb.bng - - pysb.bng.generate_equations(petab_problem.model.model) - - try: - spm = pysb.pattern.SpeciesPatternMatcher( - model=petab_problem.model.model - ) - except NotImplementedError as e: - raise NotImplementedError( - "Requires https://github.com/pysb/pysb/pull/570. " - "To use this functionality, update pysb via " - "`pip install git+https://github.com/FFroehlich/pysb@fix_pattern_matching`" - ) - - # expose model components as variables so we can evaluate patterns - for c in petab_problem.model.model.components: - globals()[c.name] = c - - states = { - f"__s{ix}": value - for pattern, value in states.items() - for ix in spm.match(eval(pattern), index=True, exact=True) - } - return states - - -def _element_is_pysb_pattern(model: "pysb.Model", element: str) -> bool: - """Check if element is a pysb pattern""" - if match := re.match(r"[a-zA-Z_][\w_]*\(", element): - return match[0][:-1] in [m.name for m in model.monomers] - return False - - -def _element_is_sbml_state(sbml_model: libsbml.Model, sbml_id: str) -> bool: - """Does the element with ID `sbml_id` correspond to a state variable?""" - if sbml_model.getCompartment(sbml_id) is not None: - return True - if sbml_model.getSpecies(sbml_id) is not None: - return True - if ( - rule := sbml_model.getRuleByVariable(sbml_id) - ) is not None and rule.getTypeCode() == libsbml.SBML_RATE_RULE: - return True - - return False +warnings.warn( + f"Importing {__name__} is deprecated. Use `amici.petab.util` instead.", + DeprecationWarning, +) diff --git a/python/sdist/setup.cfg b/python/sdist/setup.cfg index f419578754..b0945eca6e 100644 --- a/python/sdist/setup.cfg +++ b/python/sdist/setup.cfg @@ -74,5 +74,5 @@ amici = ; amici_import_petab.py is kept for backwards compatibility console_scripts = - amici_import_petab = amici.petab_import:_main - amici_import_petab.py = amici.petab_import:_main + amici_import_petab = amici.petab.cli.import_petab:_main + amici_import_petab.py = amici.petab.cli.import_petab:_main