From d92ca8227bf187e44c876ac9509fbf1fe5ed8147 Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Tue, 3 Sep 2024 17:20:30 +0200 Subject: [PATCH 1/5] Add zenkur model --- examples/zenkur.py | 15 ++++ src/circulation/zenkur.py | 176 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 examples/zenkur.py create mode 100644 src/circulation/zenkur.py diff --git a/examples/zenkur.py b/examples/zenkur.py new file mode 100644 index 0000000..a998bd2 --- /dev/null +++ b/examples/zenkur.py @@ -0,0 +1,15 @@ +from circulation.zenkur import Zenkur +import matplotlib.pyplot as plt + + +circulation = Zenkur() +history = circulation.solve(T=3600.0, dt=1e-3) + +fig, ax = plt.subplots(2, 1) +ax[0].plot(history["V_LV"], history["p_LV"], label="0D") +ax[1].plot(history["V_LV"][-1000:], history["p_LV"][-1000:], label="0D") +ax[0].legend(bbox_to_anchor=(1, 1), loc=1, frameon=False) +ax[0].set_xlabel("V [mL]") +ax[0].set_ylabel("p [mmHg]") + +plt.show() diff --git a/src/circulation/zenkur.py b/src/circulation/zenkur.py new file mode 100644 index 0000000..51b48ab --- /dev/null +++ b/src/circulation/zenkur.py @@ -0,0 +1,176 @@ +import numpy as np +# from scipy.integrate import RK45, solve_ivp + + +from . import base +from . import units + +mL = units.ureg("mL") +mmHg = units.ureg("mmHg") +s = units.ureg("s") + + +class Zenkur(base.CirculationModel): + """ + 0D model of the left ventricle only. + + """ + + def __init__(self, parameters: dict[str, float] | None = None, add_units=False): + super().__init__(parameters, add_units=add_units) + + ############ Chambers + # chambers = self.parameters["chambers"] + + # self.E_LV = self.time_varying_elastance(**chambers["LV"]) + # self.p_LV_func = lambda V, t: self.E_LV(t) * (V - chambers["LV"]["V0"]) + self.var = {} + self._initialize() + + @staticmethod + def default_parameters(): + return { + "kE_LV": 0.066 * mL**-1, + "V_ED0": 7.14 * mL, + "P0_LV": 2.03 * mmHg, + "tau_Baro": 20.0 * s, + "k_width": 0.1838 * mmHg**-1, + "Pa_set": 70.0 * mmHg, + "Ca": 4.0 * mL / mmHg, + "Cv": 111.11 * mL / mmHg, + "Va0": 700 * mL, + "Vv0_min": 2_700 * mL, + "Vv0_max": 3_100 * mL, + "R_TPR_min": 0.5335 * mmHg * s / mL, + "R_TPR_max": 2.134 * mmHg * s / mL, + "T_sys": 4 / 15 * s, + "f_HR_min": 2 / 3 * 1 / s, + "f_HR_max": 3.0 * 1 / s, + "R_valve": 0.0025 * mmHg * s / mL, + "C_PRSW_min": 25.9 * mmHg, + "C_PRSW_max": 103.8 * mmHg, + "BPM": 75.0 * units.ureg("1/minutes"), + } + + @staticmethod + def default_initial_conditions() -> dict[str, float]: + V0lv = 7.144 * mL + return { + "V_ES": 2 * V0lv, + "V_ED": 3 * V0lv, + "S": 0.5, + "Va": 700 * mL, + "Vv": 3_000 * mL, + } + + def fHR(self, S): + fHR_min = self.parameters["f_HR_min"] + fHR_max = self.parameters["f_HR_max"] + return fHR_min + (fHR_max - fHR_min) * S + + def R_TPR(self, S): + R_TPR_min = self.parameters["R_TPR_min"] + R_TPR_max = self.parameters["R_TPR_max"] + return R_TPR_min + (R_TPR_max - R_TPR_min) * S + + def C_PRSW(self, S): + C_PRSW_min = self.parameters["C_PRSW_min"] + C_PRSW_max = self.parameters["C_PRSW_max"] + return C_PRSW_min + (C_PRSW_max - C_PRSW_min) * S + + def Vv0(self, S): + Vv0_min = self.parameters["Vv0_min"] + Vv0_max = self.parameters["Vv0_max"] + return Vv0_min + (Vv0_max - Vv0_min) * (1 - S) + + def update_static_variables(self, t): + ... + # self.var["p_LV"] = self.p_LV_func(self.state["V_LV"], t) + + def p_LV_func(self, V_LV, t=0.0): + P0_LV = self.parameters["P0_LV"] + kE_LV = self.parameters["kE_LV"] + V_ED0 = self.parameters["V_ED0"] + + # print(f"V_LV = {V_LV}", "V_ED0 = ", V_ED0) + # Eq 7 + return P0_LV * (np.exp(kE_LV * (V_LV - V_ED0)) - 1) + + def V_ES(self, V_ED, C_PRSW, Pa): + P_ED = self.p_LV_func(V_ED) + V_ED0 = self.parameters["V_ED0"] + + # Eq 5 + return V_ED - ((C_PRSW * (V_ED - V_ED0)) / (Pa - P_ED)) + + def V_ED(self, V_ES, fHR, Pcvp): + # Eq 13 + p_LV_ES = self.p_LV_func(V_ES) + T_sys = self.parameters["T_sys"] + + if Pcvp > p_LV_ES: + # Eq 12 + return self.V((1 / fHR) - T_sys, Pcvp) + else: + return V_ES + + def V(self, t, Pcvp): + V_ES = self.state["V_ES"] + R_Valve = self.parameters["R_valve"] + P0_LV = self.parameters["P0_LV"] + kE_LV = self.parameters["kE_LV"] + V_ED0 = self.parameters["V_ED0"] + + # Eq 9 + k1 = -(P0_LV / R_Valve) * np.exp(-kE_LV * V_ED0) + k2 = kE_LV + k3 = -(Pcvp + P0_LV) / R_Valve + + return -(1 / k2) * np.log( + (k1 / k3) * (np.exp(-k2 * k3 * t) - 1) + np.exp(-k2 * (V_ES + k3 * t)) + ) + + def step(self, t, dt): + self.update_static_variables(t) + + Ca = self.parameters["Ca"] + Cv = self.parameters["Cv"] + Va0 = self.parameters["Va0"] + tau_Baro = self.parameters["tau_Baro"] + k_width = self.parameters["k_width"] + Pa_set = self.parameters["Pa_set"] + + I_ext = 0.0 + + V_ES = self.state["V_ES"] + V_ED = self.state["V_ED"] + S = self.state["S"] + Va = self.state["Va"] + Vv = self.state["Vv"] + + fHR = self.fHR(S) + R_TPR = self.R_TPR(S) + C_PRSW = self.C_PRSW(S) + Vv0 = self.Vv0(S) + + Pa = (Va - Va0) / Ca # Eq 17 + Pcvp = (Vv - Vv0) / Cv # Eq 17 + + IC = (Pa - Pcvp) / R_TPR # Eq 18 + ICO = fHR * (V_ED - V_ES) # Eq 19 + + dVa_dt = IC - ICO # Eq 20 + + self.state["Va"] += dt * dVa_dt + self.state["Vv"] += dt * (-dVa_dt + I_ext) # Eq 20 + self.state["V_ES"] += dt * (self.V_ES(V_ED, C_PRSW, Pa) - V_ES) * fHR + self.state["V_ED"] += dt * (self.V_ED(V_ES, Vv, Pcvp) - V_ED) * fHR + self.state["S"] += ( + dt * (1 / tau_Baro) * (1 - (1 / (1 + np.exp(-k_width * (Pa - Pa_set)))) - S) + ) + + self.results["time"].append(t) + self.results["V_LV"].append(self.state["Vv"]) + self.results["p_LV"].append(self.p_LV_func(Vv, t)) + + # print(self.state) From 4b61412ce166128d01a6f936b28e56c288f6d767 Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Thu, 5 Sep 2024 09:11:06 +0200 Subject: [PATCH 2/5] Add example for zenkur model --- examples/zenkur.py | 40 +++++++++--- src/circulation/zenkur.py | 130 +++++++++++++++++++++++--------------- 2 files changed, 110 insertions(+), 60 deletions(-) diff --git a/examples/zenkur.py b/examples/zenkur.py index a998bd2..ccb3b93 100644 --- a/examples/zenkur.py +++ b/examples/zenkur.py @@ -1,15 +1,39 @@ from circulation.zenkur import Zenkur import matplotlib.pyplot as plt +import numpy as np circulation = Zenkur() -history = circulation.solve(T=3600.0, dt=1e-3) - -fig, ax = plt.subplots(2, 1) -ax[0].plot(history["V_LV"], history["p_LV"], label="0D") -ax[1].plot(history["V_LV"][-1000:], history["p_LV"][-1000:], label="0D") -ax[0].legend(bbox_to_anchor=(1, 1), loc=1, frameon=False) -ax[0].set_xlabel("V [mL]") -ax[0].set_ylabel("p [mmHg]") +history = circulation.solve(T=1000.0, dt=1e-3, dt_eval=0.1) +start_plot = 0 +time = history["time"][start_plot:] + +fig, ax = plt.subplots(5, 2, sharex=True, figsize=(10, 10)) +ax[0, 0].plot(time, history["Vv"][start_plot:]) +ax[0, 0].set_ylabel("Vv [mL]") +ax[1, 0].plot(time, history["Va"][start_plot:]) +ax[1, 0].set_ylabel("Va [mL]") +ax[2, 0].plot(time, history["V_ED"][start_plot:], label="V_ED") +ax[2, 0].set_ylabel("V_ED [mL]") +ax[3, 0].plot(time, history["V_ES"][start_plot:], label="V_ES") +ax[3, 0].set_ylabel("V_ES [mL]") + +SV = np.subtract(history["V_ED"],history["V_ES"]) +ax[4, 0].plot(time, SV[start_plot:], label="SV") +ax[4, 0].set_ylabel("Stroke volume [mL]") + +ax[0, 1].plot(time, history["fHR"][start_plot:]) +ax[0, 1].set_ylabel("fHR [Hz]") +CO = SV * history["fHR"] +ax[1, 1].plot(time, CO[start_plot:], label="CO") +ax[1, 1].set_ylabel("Cardiac output [mL/s]") + + +ax[2, 1].plot(time, history["Pa"][start_plot:]) +ax[2, 1].set_ylabel("Pa [mmHg]") +ax[3, 1].plot(time, history["S"][start_plot:]) +ax[3, 1].set_ylabel("S") +ax[4, 1].plot(time, history["Pcvp"][start_plot:]) +ax[4, 1].set_ylabel("Pcvp [mmHg]") plt.show() diff --git a/src/circulation/zenkur.py b/src/circulation/zenkur.py index 51b48ab..6406c73 100644 --- a/src/circulation/zenkur.py +++ b/src/circulation/zenkur.py @@ -18,13 +18,6 @@ class Zenkur(base.CirculationModel): def __init__(self, parameters: dict[str, float] | None = None, add_units=False): super().__init__(parameters, add_units=add_units) - - ############ Chambers - # chambers = self.parameters["chambers"] - - # self.E_LV = self.time_varying_elastance(**chambers["LV"]) - # self.p_LV_func = lambda V, t: self.E_LV(t) * (V - chambers["LV"]["V0"]) - self.var = {} self._initialize() @staticmethod @@ -50,17 +43,29 @@ def default_parameters(): "C_PRSW_min": 25.9 * mmHg, "C_PRSW_max": 103.8 * mmHg, "BPM": 75.0 * units.ureg("1/minutes"), + "start_withdrawal": 200.0 * s, + "end_withdrawal": 400.0 * s, + "start_infusion": 500.0 * s, + "end_infusion": 700.0 * s, + "flow_withdrawal": -1.0 * mL / s, + "flow_infusion": 1.0 * mL / s, } @staticmethod def default_initial_conditions() -> dict[str, float]: V0lv = 7.144 * mL + totalVolume = 4825 * mL + + meanVvU = (2700 + 3100) / 2 + Va = totalVolume / (700 + meanVvU) * 700 + Vv = totalVolume / (700 + meanVvU) * meanVvU + return { "V_ES": 2 * V0lv, "V_ED": 3 * V0lv, "S": 0.5, - "Va": 700 * mL, - "Vv": 3_000 * mL, + "Va": Va, + "Vv": Vv, } def fHR(self, S): @@ -84,38 +89,72 @@ def Vv0(self, S): return Vv0_min + (Vv0_max - Vv0_min) * (1 - S) def update_static_variables(self, t): - ... - # self.var["p_LV"] = self.p_LV_func(self.state["V_LV"], t) + V_ES = self.state["V_ES"] + V_ED = self.state["V_ED"] + S = self.state["S"] + Va = self.state["Va"] + Vv = self.state["Vv"] + + Ca = self.parameters["Ca"] + Cv = self.parameters["Cv"] + Va0 = self.parameters["Va0"] + Vv0 = self.Vv0(S) + fHR = self.fHR(S) + R_TPR = self.R_TPR(S) + C_PRSW = self.C_PRSW(S) + Pa = (Va - Va0) / Ca # Eq 17 + Pcvp = (Vv - Vv0) / Cv # Eq 17 + IC = (Pa - Pcvp) / R_TPR # Eq 18 + ICO = fHR * (V_ED - V_ES) # Eq 19 + + self.var["fHR"] = fHR + self.var["R_TPR"] = R_TPR + self.var["C_PRSW"] = C_PRSW + self.var["Vv0"] = Vv0 + + self.var["Pa"] = Pa # Eq 17 + self.var["Pcvp"] = Pcvp # Eq 17 + self.var["IC"] = IC # Eq 18 + self.var["ICO"] = ICO # Eq 19 + self.var["p_LV"] = self.p_LV_func(Vv, t) + # breakpoint() + + if t > self.parameters["start_withdrawal"] and t < self.parameters["end_withdrawal"]: + self.var["I_ext"] = self.parameters["flow_withdrawal"] + elif t > self.parameters["start_infusion"] and t < self.parameters["end_infusion"]: + self.var["I_ext"] = self.parameters["flow_infusion"] + else: + self.var["I_ext"] = 0.0 def p_LV_func(self, V_LV, t=0.0): P0_LV = self.parameters["P0_LV"] kE_LV = self.parameters["kE_LV"] V_ED0 = self.parameters["V_ED0"] - # print(f"V_LV = {V_LV}", "V_ED0 = ", V_ED0) # Eq 7 return P0_LV * (np.exp(kE_LV * (V_LV - V_ED0)) - 1) - def V_ES(self, V_ED, C_PRSW, Pa): - P_ED = self.p_LV_func(V_ED) + def V_ES(self, V_ED, C_PRSW, Pa, t): + P_ED = self.p_LV_func(V_ED, t) V_ED0 = self.parameters["V_ED0"] + if Pa > P_ED: + return V_ED0 # Eq 5 return V_ED - ((C_PRSW * (V_ED - V_ED0)) / (Pa - P_ED)) - def V_ED(self, V_ES, fHR, Pcvp): + def V_ED(self, V_ES, fHR, Pcvp, t): # Eq 13 - p_LV_ES = self.p_LV_func(V_ES) + p_LV_ES = self.p_LV_func(V_ES, t) T_sys = self.parameters["T_sys"] if Pcvp > p_LV_ES: # Eq 12 - return self.V((1 / fHR) - T_sys, Pcvp) + return self.V((1 / fHR) - T_sys, Pcvp, V_ES=V_ES) else: return V_ES - def V(self, t, Pcvp): - V_ES = self.state["V_ES"] + def V(self, t, Pcvp, V_ES): R_Valve = self.parameters["R_valve"] P0_LV = self.parameters["P0_LV"] kE_LV = self.parameters["kE_LV"] @@ -124,7 +163,10 @@ def V(self, t, Pcvp): # Eq 9 k1 = -(P0_LV / R_Valve) * np.exp(-kE_LV * V_ED0) k2 = kE_LV - k3 = -(Pcvp + P0_LV) / R_Valve + # REMOVE MINUS SIGN + k3 = (Pcvp + P0_LV) / R_Valve + + # breakpoint() return -(1 / k2) * np.log( (k1 / k3) * (np.exp(-k2 * k3 * t) - 1) + np.exp(-k2 * (V_ES + k3 * t)) @@ -133,44 +175,28 @@ def V(self, t, Pcvp): def step(self, t, dt): self.update_static_variables(t) - Ca = self.parameters["Ca"] - Cv = self.parameters["Cv"] - Va0 = self.parameters["Va0"] - tau_Baro = self.parameters["tau_Baro"] - k_width = self.parameters["k_width"] - Pa_set = self.parameters["Pa_set"] - - I_ext = 0.0 - V_ES = self.state["V_ES"] V_ED = self.state["V_ED"] S = self.state["S"] - Va = self.state["Va"] - Vv = self.state["Vv"] - - fHR = self.fHR(S) - R_TPR = self.R_TPR(S) - C_PRSW = self.C_PRSW(S) - Vv0 = self.Vv0(S) + tau_Baro = self.parameters["tau_Baro"] + k_width = self.parameters["k_width"] + Pa_set = self.parameters["Pa_set"] - Pa = (Va - Va0) / Ca # Eq 17 - Pcvp = (Vv - Vv0) / Cv # Eq 17 + fHR = self.var["fHR"] + C_PRSW = self.var["C_PRSW"] + Pa = self.var["Pa"] + Pcvp = self.var["Pcvp"] + IC = self.var["IC"] + ICO = self.var["ICO"] + I_ext = self.var["I_ext"] - IC = (Pa - Pcvp) / R_TPR # Eq 18 - ICO = fHR * (V_ED - V_ES) # Eq 19 + # Added minus sign to match the other code + dVa_dt = -(IC - ICO) # Eq 20 - dVa_dt = IC - ICO # Eq 20 + dS_dt = (1 / tau_Baro) * (1 - (1 / (1 + np.exp(-k_width * (Pa - Pa_set)))) - S) self.state["Va"] += dt * dVa_dt self.state["Vv"] += dt * (-dVa_dt + I_ext) # Eq 20 - self.state["V_ES"] += dt * (self.V_ES(V_ED, C_PRSW, Pa) - V_ES) * fHR - self.state["V_ED"] += dt * (self.V_ED(V_ES, Vv, Pcvp) - V_ED) * fHR - self.state["S"] += ( - dt * (1 / tau_Baro) * (1 - (1 / (1 + np.exp(-k_width * (Pa - Pa_set)))) - S) - ) - - self.results["time"].append(t) - self.results["V_LV"].append(self.state["Vv"]) - self.results["p_LV"].append(self.p_LV_func(Vv, t)) - - # print(self.state) + self.state["V_ES"] += dt * (self.V_ES(V_ED, C_PRSW, Pa, t) - V_ES) * fHR + self.state["V_ED"] += dt * (self.V_ED(V_ES, fHR, Pcvp, t) - V_ED) * fHR + self.state["S"] += dt * dS_dt From 878b0a83f1845d50aa93ceac580d917dd9679fcf Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Thu, 5 Sep 2024 09:23:16 +0200 Subject: [PATCH 3/5] Add some tests --- src/circulation/__init__.py | 3 +++ src/circulation/zenkur.py | 4 +--- tests/test_models.py | 28 ++++++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/circulation/__init__.py b/src/circulation/__init__.py index e69de29..d948a38 100644 --- a/src/circulation/__init__.py +++ b/src/circulation/__init__.py @@ -0,0 +1,3 @@ +from . import base, log, regazzoni2020, zenkur, units, time_varying_elastance + +__all__ = ["base", "log", "regazzoni2020", "zenkur", "units", "time_varying_elastance"] diff --git a/src/circulation/zenkur.py b/src/circulation/zenkur.py index 6406c73..e585c5d 100644 --- a/src/circulation/zenkur.py +++ b/src/circulation/zenkur.py @@ -63,7 +63,7 @@ def default_initial_conditions() -> dict[str, float]: return { "V_ES": 2 * V0lv, "V_ED": 3 * V0lv, - "S": 0.5, + "S": 0.5 * units.ureg("dimensionless"), "Va": Va, "Vv": Vv, } @@ -116,8 +116,6 @@ def update_static_variables(self, t): self.var["Pcvp"] = Pcvp # Eq 17 self.var["IC"] = IC # Eq 18 self.var["ICO"] = ICO # Eq 19 - self.var["p_LV"] = self.p_LV_func(Vv, t) - # breakpoint() if t > self.parameters["start_withdrawal"] and t < self.parameters["end_withdrawal"]: self.var["I_ext"] = self.parameters["flow_withdrawal"] diff --git a/tests/test_models.py b/tests/test_models.py index 82959de..20a6650 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,2 +1,26 @@ -def test_import(): - pass +import circulation + + +def test_Zenkur(): + model = circulation.zenkur.Zenkur() + results = model.solve(T=1.0, dt=1e-3, dt_eval=0.1) + + for k, v in results.items(): + assert len(v) == len(results["time"]), k + + for k, v in model.default_initial_conditions().items(): + assert k in results + print(v) + assert results[k][0] == v.magnitude + + +def test_Regazzoni2020(): + model = circulation.regazzoni2020.Regazzoni2020() + results = model.solve(T=1.0, dt=1e-3, dt_eval=0.1) + + for k, v in results.items(): + assert len(v) == len(results["time"]), k + + for k, v in model.default_initial_conditions().items(): + assert k in results + assert results[k][0] == v.magnitude From a803656e849cc2fb4cc0f83e3fc8c3fb1c020137 Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Thu, 5 Sep 2024 09:32:34 +0200 Subject: [PATCH 4/5] from future import annotations --- src/circulation/base.py | 1 + src/circulation/regazzoni2020.py | 3 ++- src/circulation/zenkur.py | 4 +--- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/circulation/base.py b/src/circulation/base.py index b388345..daf3e87 100644 --- a/src/circulation/base.py +++ b/src/circulation/base.py @@ -1,3 +1,4 @@ +from __future__ import annotations from typing import Callable, Any from abc import ABC, abstractmethod import json diff --git a/src/circulation/regazzoni2020.py b/src/circulation/regazzoni2020.py index 372c4de..277b0da 100644 --- a/src/circulation/regazzoni2020.py +++ b/src/circulation/regazzoni2020.py @@ -1,5 +1,6 @@ -from typing import Callable, Any +from __future__ import annotations +from typing import Callable, Any from . import base from . import units diff --git a/src/circulation/zenkur.py b/src/circulation/zenkur.py index e585c5d..ad5b959 100644 --- a/src/circulation/zenkur.py +++ b/src/circulation/zenkur.py @@ -1,7 +1,5 @@ +from __future__ import annotations import numpy as np -# from scipy.integrate import RK45, solve_ivp - - from . import base from . import units From 9ab496e8d91f4991466b79e44a00a69e140146fc Mon Sep 17 00:00:00 2001 From: Henrik Finsberg Date: Thu, 5 Sep 2024 09:42:34 +0200 Subject: [PATCH 5/5] Add matplotlib dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d845438..7ea061d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ [project.optional-dependencies] test = ["pytest", "pytest-cov"] demos = ["matplotlib"] -docs = ["jupyter-book", "jupytext", "circulation[plot]"] +docs = ["jupyter-book", "jupytext", "circulation[demos]"] all = [ "circulation[test]", "circulation[plot]",