diff --git a/examples/scripts/spme_max_energy.py b/examples/scripts/spme_max_energy.py index f5b7c827..db521b68 100644 --- a/examples/scripts/spme_max_energy.py +++ b/examples/scripts/spme_max_energy.py @@ -45,13 +45,15 @@ model, parameters, experiment, signal=signal, init_soc=init_soc ) -# Generate cost function and optimisation class: -cost = pybop.GravimetricEnergyDensity(problem) +# Generate multiple cost functions and combine them. +cost1 = pybop.GravimetricEnergyDensity(problem, update_capacity=True) +cost2 = pybop.VolumetricEnergyDensity(problem, update_capacity=True) +cost = pybop.WeightedCost(cost1, cost2, weights=[1, 1]) + +# Run optimisation optim = pybop.PSO( cost, verbose=True, allow_infeasible_solutions=False, max_iterations=15 ) - -# Run optimisation x, final_cost = optim.run() print("Estimated parameters:", x) print(f"Initial gravimetric energy density: {cost(optim.x0):.2f} Wh.kg-1") diff --git a/pybop/__init__.py b/pybop/__init__.py index 4a99ebc4..922a6480 100644 --- a/pybop/__init__.py +++ b/pybop/__init__.py @@ -81,7 +81,7 @@ # # Cost function class # -from .costs.base_cost import BaseCost, WeightedCost +from .costs.base_cost import BaseCost from .costs.fitting_costs import ( RootMeanSquaredError, SumSquaredError, @@ -100,6 +100,7 @@ GaussianLogLikelihoodKnownSigma, MAP, ) +from .costs._weighted_cost import WeightedCost # # Optimiser class diff --git a/pybop/costs/_weighted_cost.py b/pybop/costs/_weighted_cost.py new file mode 100644 index 00000000..9555ece8 --- /dev/null +++ b/pybop/costs/_weighted_cost.py @@ -0,0 +1,137 @@ +import copy +from typing import Optional + +import numpy as np + +from pybop import BaseCost, BaseLikelihood, DesignCost +from pybop.parameters.parameter import Inputs + + +class WeightedCost(BaseCost): + """ + A subclass for constructing a linear combination of cost functions as + a single weighted cost function. + + Inherits all parameters and attributes from ``BaseCost``. + + Attributes + --------------------- + costs : list[pybop.BaseCost] + A list of PyBOP cost objects. + weights : list[float] + A list of values with which to weight the cost values. + _has_different_problems : bool + If True, the problem for each cost is evaluated independently during + each evaluation of the cost (default: False). + """ + + def __init__(self, *costs, weights: Optional[list[float]] = None): + if not all(isinstance(cost, BaseCost) for cost in costs): + raise TypeError("All costs must be instances of BaseCost.") + self.costs = [copy.copy(cost) for cost in costs] + self._has_different_problems = False + self.minimising = not any( + isinstance(cost, (BaseLikelihood, DesignCost)) for cost in self.costs + ) + if len(set(type(cost.problem) for cost in self.costs)) > 1: + raise TypeError("All problems must be of the same class type.") + + # Check if weights are provided + if weights is not None: + try: + self.weights = np.asarray(weights, dtype=float) + except ValueError: + raise ValueError("Weights must be numeric values.") from None + + if self.weights.size != len(self.costs): + raise ValueError("Number of weights must match number of costs.") + else: + self.weights = np.ones(len(self.costs)) + + # Check if all costs depend on the same problem + self._has_different_problems = any( + hasattr(cost, "problem") and cost.problem is not self.costs[0].problem + for cost in self.costs[1:] + ) + + if self._has_different_problems: + super().__init__() + for cost in self.costs: + self.parameters.join(cost.parameters) + else: + super().__init__(self.costs[0].problem) + self._predict = False + for cost in self.costs: + cost._predict = False + + # Check if any cost function requires capacity update + if any(cost.update_capacity for cost in self.costs): + self.update_capacity = True + + def _evaluate(self, inputs: Inputs, grad=None): + """ + Calculate the weighted cost for a given set of parameters. + + Parameters + ---------- + inputs : Inputs + The parameters for which to compute the cost. + grad : array-like, optional + An array to store the gradient of the cost function with respect + to the parameters. + + Returns + ------- + float + The weighted cost value. + """ + e = np.empty_like(self.costs) + + if not self._predict: + if self._has_different_problems: + self.parameters.update(values=list(inputs.values())) + else: + self.y = self.problem.evaluate( + inputs, update_capacity=self.update_capacity + ) + + for i, cost in enumerate(self.costs): + if not self._has_different_problems: + cost.y = self.y + e[i] = cost.evaluate(inputs) + + return np.dot(e, self.weights) + + def _evaluateS1(self, inputs: Inputs): + """ + Compute the weighted cost and its gradient with respect to the parameters. + + Parameters + ---------- + inputs : Inputs + The parameters for which to compute the cost and gradient. + + Returns + ------- + tuple + A tuple containing the cost and the gradient. The cost is a float, + and the gradient is an array-like of the same length as `x`. + """ + e = np.empty_like(self.costs) + de = np.empty((len(self.parameters), len(self.costs))) + + if not self._predict: + if self._has_different_problems: + self.parameters.update(values=list(inputs.values())) + else: + self.y, self.dy = self.problem.evaluateS1(inputs) + + for i, cost in enumerate(self.costs): + if not self._has_different_problems: + cost.y, cost.dy = (self.y, self.dy) + e[i], de[:, i] = cost.evaluateS1(inputs) + + e = np.dot(e, self.weights) + de = np.dot(de, self.weights) + + return e, de diff --git a/pybop/costs/base_cost.py b/pybop/costs/base_cost.py index 974227f9..3ba818f5 100644 --- a/pybop/costs/base_cost.py +++ b/pybop/costs/base_cost.py @@ -1,10 +1,6 @@ -import copy -import warnings from typing import Optional, Union -import numpy as np - -from pybop import BaseProblem, DesignProblem +from pybop import BaseProblem from pybop.parameters.parameter import Inputs, Parameters @@ -36,6 +32,7 @@ def __init__(self, problem: Optional[BaseProblem] = None): self.problem = problem self.verbose = False self._predict = False + self.update_capacity = False self.y = None self.dy = None self.set_fail_gradient() @@ -210,137 +207,3 @@ def verify_prediction(self, y): return False return True - - -class WeightedCost(BaseCost): - """ - A subclass for constructing a linear combination of cost functions as - a single weighted cost function. - - Inherits all parameters and attributes from ``BaseCost``. - - Attributes - --------------------- - costs : list[pybop.BaseCost] - A list of PyBOP cost objects. - weights : list[float] - A list of values with which to weight the cost values. - _has_different_problems : bool - If True, the problem for each cost is evaluated independently during - each evaluation of the cost (default: False). - """ - - def __init__(self, *costs, weights: Optional[list[float]] = None): - if not all(isinstance(cost, BaseCost) for cost in costs): - raise TypeError("All costs must be instances of BaseCost.") - self.costs = [copy.copy(cost) for cost in costs] - self._has_different_problems = False - self.minimising = not any( - isinstance(cost.problem, DesignProblem) for cost in self.costs - ) - if len(set(type(cost.problem) for cost in self.costs)) > 1: - raise TypeError("All problems must be of the same class type.") - - # Check if weights are provided - if weights is not None: - try: - self.weights = np.asarray(weights, dtype=float) - except ValueError: - raise ValueError("Weights must be numeric values.") from None - - if self.weights.size != len(self.costs): - raise ValueError("Number of weights must match number of costs.") - else: - self.weights = np.ones(len(self.costs)) - - # Check if all costs depend on the same problem - self._has_different_problems = any( - hasattr(cost, "problem") and cost.problem is not self.costs[0].problem - for cost in self.costs[1:] - ) - - if self._has_different_problems: - super().__init__() - for cost in self.costs: - self.parameters.join(cost.parameters) - else: - super().__init__(self.costs[0].problem) - self._predict = False - for cost in self.costs: - cost._predict = False - - # Catch UserWarnings as exceptions - if not self.minimising: - warnings.filterwarnings("error", category=UserWarning) - - def _evaluate(self, inputs: Inputs, grad=None): - """ - Calculate the weighted cost for a given set of parameters. - - Parameters - ---------- - inputs : Inputs - The parameters for which to compute the cost. - grad : array-like, optional - An array to store the gradient of the cost function with respect - to the parameters. - - Returns - ------- - float - The weighted cost value. - """ - e = np.empty_like(self.costs) - - if not self._predict: - if self._has_different_problems: - self.parameters.update(values=list(inputs.values())) - else: - try: - with warnings.catch_warnings(): - self.y = self.problem.evaluate(inputs) - except UserWarning as e: - if self.verbose: - print(f"Ignoring this sample due to: {e}") - return -np.inf - - for i, cost in enumerate(self.costs): - if not self._has_different_problems: - cost.y = self.y - e[i] = cost.evaluate(inputs) - - return np.dot(e, self.weights) - - def _evaluateS1(self, inputs: Inputs): - """ - Compute the weighted cost and its gradient with respect to the parameters. - - Parameters - ---------- - inputs : Inputs - The parameters for which to compute the cost and gradient. - - Returns - ------- - tuple - A tuple containing the cost and the gradient. The cost is a float, - and the gradient is an array-like of the same length as `x`. - """ - e = np.empty_like(self.costs) - de = np.empty((len(self.parameters), len(self.costs))) - - if not self._predict: - if self._has_different_problems: - self.parameters.update(values=list(inputs.values())) - else: - self.y, self.dy = self.problem.evaluateS1(inputs) - - for i, cost in enumerate(self.costs): - if not self._has_different_problems: - cost.y, cost.dy = (self.y, self.dy) - e[i], de[:, i] = cost.evaluateS1(inputs) - - e = np.dot(e, self.weights) - de = np.dot(de, self.weights) - - return e, de diff --git a/pybop/costs/design_costs.py b/pybop/costs/design_costs.py index 06649cb6..290f1efb 100644 --- a/pybop/costs/design_costs.py +++ b/pybop/costs/design_costs.py @@ -91,32 +91,17 @@ def evaluate(self, inputs: Union[Inputs, list], grad=None): inputs = self.parameters.verify(inputs) try: - with warnings.catch_warnings(): - # Convert UserWarning to an exception - warnings.filterwarnings("error", category=UserWarning) - if self._predict: - if self.update_capacity: - self.problem.model.approximate_capacity(inputs) - self.y = self.problem.evaluate(inputs) + if self._predict: + self.y = self.problem.evaluate( + inputs, update_capacity=self.update_capacity + ) - return self._evaluate(inputs, grad) + return self._evaluate(inputs, grad) # Catch NotImplementedError and raise it except NotImplementedError as e: raise e - # Catch infeasible solutions and return infinity - except UserWarning as e: - if self.verbose: - print(f"Ignoring this sample due to: {e}") - return -np.inf - - # Catch any other exception and return infinity - except Exception as e: - if self.verbose: - print(f"An error occurred during the evaluation: {e}") - return -np.inf - class GravimetricEnergyDensity(DesignCost): """ @@ -147,6 +132,9 @@ def _evaluate(self, inputs: Inputs, grad=None): float The gravimetric energy density or -infinity in case of infeasible parameters. """ + if not any(np.isfinite(self.y[signal][0]) for signal in self.signal): + return -np.inf + voltage, current = self.y["Voltage [V]"], self.y["Current [A]"] energy_density = np.trapz(voltage * current, dx=self.dt) / ( 3600 * self.problem.model.cell_mass(self.parameter_set) @@ -184,6 +172,9 @@ def _evaluate(self, inputs: Inputs, grad=None): float The volumetric energy density or -infinity in case of infeasible parameters. """ + if not any(np.isfinite(self.y[signal][0]) for signal in self.signal): + return -np.inf + voltage, current = self.y["Voltage [V]"], self.y["Current [A]"] energy_density = np.trapz(voltage * current, dx=self.dt) / ( 3600 * self.problem.model.cell_volume(self.parameter_set) diff --git a/pybop/problems/base_problem.py b/pybop/problems/base_problem.py index ee36a6bb..cb3bc597 100644 --- a/pybop/problems/base_problem.py +++ b/pybop/problems/base_problem.py @@ -64,6 +64,7 @@ def __init__( self.n_outputs = len(self.signal) self._time_data = None self._target = None + self.verbose = False if isinstance(model, BaseModel): self.additional_variables = additional_variables diff --git a/pybop/problems/design_problem.py b/pybop/problems/design_problem.py index 30e2d9c3..74b8ae71 100644 --- a/pybop/problems/design_problem.py +++ b/pybop/problems/design_problem.py @@ -1,3 +1,5 @@ +import warnings + import numpy as np from pybop import BaseProblem @@ -45,7 +47,10 @@ def __init__( signal = ["Voltage [V]"] additional_variables.extend(["Time [s]", "Current [A]"]) additional_variables = list(set(additional_variables)) - + self.warning_patterns = [ + "Ah is greater than", + "Non-physical point encountered", + ] super().__init__( parameters, model, @@ -75,7 +80,7 @@ def __init__( self._target = {key: sol[key] for key in self.signal} self._dataset = None - def evaluate(self, inputs: Inputs): + def evaluate(self, inputs: Inputs, update_capacity=False): """ Evaluate the model with the given parameters and return the signal. @@ -90,19 +95,33 @@ def evaluate(self, inputs: Inputs): The model output y(t) simulated with inputs. """ inputs = self.parameters.verify(inputs) - - sol = self._model.predict( - inputs=inputs, - experiment=self.experiment, - init_soc=self.init_soc, - ) - - if sol == [np.inf]: - return sol - - else: - predictions = {} - for signal in self.signal + self.additional_variables: - predictions[signal] = sol[signal].data + if update_capacity: + self.model.approximate_capacity(inputs) + + try: + with warnings.catch_warnings(): + for pattern in self.warning_patterns: + warnings.filterwarnings( + "error", category=UserWarning, message=pattern + ) + + sol = self._model.predict( + inputs=inputs, + experiment=self.experiment, + init_soc=self.init_soc, + ) + + # Catch infeasible solutions and return infinity + except (UserWarning, Exception) as e: + if self.verbose: + print(f"Ignoring this sample due to: {e}") + return { + signal: np.asarray(np.ones(2) * -np.inf) + for signal in [*self.signal, *self.additional_variables] + } + + predictions = {} + for signal in self.signal + self.additional_variables: + predictions[signal] = sol[signal].data return predictions diff --git a/pybop/problems/fitting_problem.py b/pybop/problems/fitting_problem.py index 58d59e9e..0ad636ce 100644 --- a/pybop/problems/fitting_problem.py +++ b/pybop/problems/fitting_problem.py @@ -79,7 +79,7 @@ def __init__( init_soc=self.init_soc, ) - def evaluate(self, inputs: Inputs): + def evaluate(self, inputs: Inputs, update_capacity=False): """ Evaluate the model with the given parameters and return the signal. diff --git a/tests/integration/test_weighted_cost.py b/tests/integration/test_weighted_cost.py new file mode 100644 index 00000000..a533bd98 --- /dev/null +++ b/tests/integration/test_weighted_cost.py @@ -0,0 +1,176 @@ +import numpy as np +import pytest + +import pybop + + +class TestWeightedCost: + """ + A class to test the weighted cost function. + """ + + @pytest.fixture(autouse=True) + def setup(self): + self.sigma0 = 0.002 + self.ground_truth = np.asarray([0.55, 0.55]) + np.random.normal( + loc=0.0, scale=0.05, size=2 + ) + + @pytest.fixture + def model(self): + parameter_set = pybop.ParameterSet.pybamm("Chen2020") + return pybop.lithium_ion.SPM(parameter_set=parameter_set) + + @pytest.fixture + def parameters(self): + return pybop.Parameters( + pybop.Parameter( + "Negative electrode active material volume fraction", + prior=pybop.Uniform(0.4, 0.75), + bounds=[0.375, 0.75], + ), + pybop.Parameter( + "Positive electrode active material volume fraction", + prior=pybop.Uniform(0.4, 0.75), + # no bounds + ), + ) + + @pytest.fixture(params=[0.4]) + def init_soc(self, request): + return request.param + + @pytest.fixture( + params=[ + ( + pybop.GaussianLogLikelihoodKnownSigma, + pybop.RootMeanSquaredError, + pybop.SumSquaredError, + pybop.MAP, + ) + ] + ) + def cost_class(self, request): + return request.param + + def noise(self, sigma, values): + return np.random.normal(0, sigma, values) + + @pytest.fixture + def weighted_fitting_cost(self, model, parameters, cost_class, init_soc): + # Form dataset + solution = self.get_data(model, parameters, self.ground_truth, init_soc) + dataset = pybop.Dataset( + { + "Time [s]": solution["Time [s]"].data, + "Current function [A]": solution["Current [A]"].data, + "Voltage [V]": solution["Voltage [V]"].data + + self.noise(self.sigma0, len(solution["Time [s]"].data)), + } + ) + + # Define the cost to optimise + problem = pybop.FittingProblem(model, parameters, dataset, init_soc=init_soc) + costs = [] + for cost in cost_class: + if issubclass(cost, pybop.MAP): + costs.append( + cost( + problem, + pybop.GaussianLogLikelihoodKnownSigma, + sigma0=self.sigma0, + ) + ) + elif issubclass(cost, pybop.BaseLikelihood): + costs.append(cost(problem, sigma0=self.sigma0)) + else: + costs.append(cost(problem)) + + return pybop.WeightedCost(*costs, weights=[0.1, 1, 0.5, 0.6]) + + @pytest.mark.integration + def test_fitting_costs(self, weighted_fitting_cost): + x0 = weighted_fitting_cost.parameters.initial_value() + optim = pybop.CuckooSearch( + cost=weighted_fitting_cost, + sigma0=0.03, + max_iterations=250, + max_unchanged_iterations=35, + ) + + initial_cost = optim.cost(optim.parameters.initial_value()) + x, final_cost = optim.run() + + # Assertions + if not np.allclose(x0, self.ground_truth, atol=1e-5): + if optim.minimising: + assert initial_cost > final_cost + else: + assert initial_cost < final_cost + np.testing.assert_allclose(x, self.ground_truth, atol=1.5e-2) + + @pytest.fixture( + params=[ + ( + pybop.GravimetricEnergyDensity, + pybop.VolumetricEnergyDensity, + ) + ] + ) + def design_cost(self, request): + return request.param + + @pytest.fixture + def weighted_design_cost(self, model, design_cost): + init_soc = 1.0 + parameters = pybop.Parameters( + pybop.Parameter( + "Positive electrode thickness [m]", + prior=pybop.Gaussian(5e-05, 0.1e-05), + bounds=[50e-06, 10e-05], + ), + pybop.Parameter( + "Positive particle radius [m]", + prior=pybop.Gaussian(5.22e-06, 0.1e-06), + bounds=[2e-06, 9e-06], + ), + ) + experiment = pybop.Experiment( + ["Discharge at 1C until 3.3 V (5 seconds period)"], + ) + + problem = pybop.DesignProblem( + model, parameters, experiment=experiment, init_soc=init_soc + ) + costs = [] + for cost in design_cost: + costs.append(cost(problem, update_capacity=True)) + + return pybop.WeightedCost(*costs, weights=[100, 1]) + + @pytest.mark.integration + def test_design_costs(self, weighted_design_cost): + optim = pybop.CuckooSearch( + weighted_design_cost, + max_iterations=15, + allow_infeasible_solutions=False, + ) + + initial_cost = optim.cost(optim.parameters.initial_value()) + _, final_cost = optim.run() + + # Assertions + assert initial_cost < final_cost + + def get_data(self, model, parameters, x, init_soc): + model.classify_and_update_parameters(parameters) + experiment = pybop.Experiment( + [ + ( + "Discharge at 0.5C for 3 minutes (4 second period)", + "Charge at 0.5C for 3 minutes (4 second period)", + ), + ] + ) + sim = model.predict(init_soc=init_soc, experiment=experiment, inputs=x) + return sim