Skip to content

Commit

Permalink
Refactor of concentration code (#1929)
Browse files Browse the repository at this point in the history
Fixes #1905

---------

Co-authored-by: Maarten Pronk <[email protected]>
  • Loading branch information
SouthEndMusic and evetion authored Nov 25, 2024
1 parent 06fe982 commit 4e9f479
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 248 deletions.
3 changes: 2 additions & 1 deletion core/src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ using MetaGraphsNext:
using EnumX: EnumX, @enumx

# Easily change an immutable field of an object.
using Accessors: @set
using Accessors: @set, @reset

# Iteration utilities, used to partition and group tables.
import IterTools
Expand Down Expand Up @@ -167,6 +167,7 @@ include("read.jl")
include("write.jl")
include("bmi.jl")
include("callback.jl")
include("concentration.jl")
include("main.jl")
include("libribasim.jl")

Expand Down
201 changes: 66 additions & 135 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ function create_callbacks(
FunctionCallingCallback(update_cumulative_flows!; func_start = false)
push!(callbacks, cumulative_flows_cb)

# Update concentrations
concentrations_cb = FunctionCallingCallback(update_concentrations!; func_start = false)
push!(callbacks, concentrations_cb)

# Update Basin forcings
tstops = get_tstops(basin.time.time, starttime)
basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false))
Expand Down Expand Up @@ -113,30 +117,16 @@ Specifically, we first use all the inflows to update the mass of the Basins, rec
the Basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, uprev, tprev, dt) = integrator
(;
basin,
state_inflow_edge,
state_outflow_edge,
user_demand,
level_boundary,
flow_boundary,
allocation,
) = p
(; p, tprev, dt) = integrator
(; basin, flow_boundary, allocation) = p
(; vertical_flux) = basin

# Update tprev
p.tprev[] = t

# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
basin.cumulative_in .= 0.0

# Update cumulative forcings which are integrated exactly
@. basin.cumulative_drainage += vertical_flux.drainage * dt
@. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt
basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt
basin.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
Expand All @@ -145,9 +135,6 @@ function update_cumulative_flows!(u, t, integrator)::Nothing

basin.cumulative_precipitation[node_id.idx] += added_precipitation
basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation
basin.mass[node_id.idx, :] .+=
basin.concentration[2, node_id.idx, :] .* added_precipitation
basin.cumulative_in[node_id.idx] += added_precipitation
end

# Exact boundary flow over time step
Expand All @@ -162,9 +149,6 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
volume = integral(flow_rate, tprev, t)
flow_boundary.cumulative_flow[id.idx] += volume
flow_boundary.cumulative_flow_saveat[id.idx] += volume
basin.mass[outflow_id.idx, :] .+=
flow_boundary.concentration[id.idx, :] .* volume
basin.cumulative_in[outflow_id.idx] += volume
end
end

Expand All @@ -189,138 +173,83 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
end
end
end
return nothing
end

# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end
function update_concentrations!(u, t, integrator)::Nothing
(; uprev, p, tprev, dt) = integrator
(; basin, flow_boundary) = p
(; vertical_flux, concentration_data) = basin
(; evaporate_mass, cumulative_in, concentration_state, concentration, mass) =
concentration_data

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=
user_demand.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.Terminal && to_node.value == 0
# UserDemand inflow is discoupled from its outflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end
# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
cumulative_in .= 0.0

if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"
end
end
end
mass .+= concentration[1, :, :] .* vertical_flux.drainage * dt
basin.concentration_data.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
fixed_area = basin_areas(basin, node_id.idx)[end]
added_precipitation = fixed_area * vertical_flux.precipitation[node_id.idx] * dt

mass[node_id.idx, :] .+= concentration[2, node_id.idx, :] .* added_precipitation
cumulative_in[node_id.idx] += added_precipitation
end

# Update the Basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from Basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
# Exact boundary flow over time step
for (id, flow_rate, active, edge) in zip(
flow_boundary.node_id,
flow_boundary.flow_rate,
flow_boundary.active,
flow_boundary.outflow_edges,
)
if active
outflow_id = edge[1].edge[2]
volume = integral(flow_rate, tprev, t)
mass[outflow_id.idx, :] .+= flow_boundary.concentration[id.idx, :] .* volume
cumulative_in[outflow_id.idx] += volume
end
end

mass_updates_user_demand!(integrator)
mass_inflows_basin!(integrator)

# Update the Basin concentrations based on the added mass and flows
concentration_state .= mass ./ (basin.storage_prev .+ cumulative_in)

mass_outflows_basin!(integrator)

# Evaporate mass to keep the mass balance, if enabled in model config
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
if evaporate_mass
mass .-= concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)
mass .-= concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
for I in eachindex(basin.concentration_data.mass)
if (-eps(Float64)) < mass[I] < (eps(Float64))
mass[I] = 0.0
end
end

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
if any(<(0), mass)
R = CartesianIndices(mass)
locations = findall(<(0), mass)
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.concentration_data.mass[I]) for substance $(basin.concentration_data.substances[substance_idx])"
end
error("Negative mass(es) detected")
end

# Update the Basin concentrations again based on the removed mass
basin.concentration_state .=
basin.mass ./ basin.current_properties.current_storage[parent(u)]
concentration_state .= mass ./ basin.current_properties.current_storage[parent(u)]
basin.storage_prev .= basin.current_properties.current_storage[parent(u)]
basin.level_prev .= basin.current_properties.current_level[parent(u)]

return nothing
end

Expand Down Expand Up @@ -427,7 +356,7 @@ function save_flow(u, t, integrator)
@. basin.cumulative_precipitation_saveat = 0.0
@. basin.cumulative_drainage_saveat = 0.0

concentration = copy(basin.concentration_state)
concentration = copy(basin.concentration_data.concentration_state)
saved_flow = SavedFlow(;
flow = flow_mean,
inflow = inflow_mean,
Expand Down Expand Up @@ -682,11 +611,12 @@ function get_value(subvariable::NamedTuple, p::Parameters, du::AbstractVector, t
end

elseif startswith(variable, "concentration_external.")
value = basin.concentration_external[listen_node_id.idx][variable](t)
value =
basin.concentration_data.concentration_external[listen_node_id.idx][variable](t)
elseif startswith(variable, "concentration.")
substance = Symbol(last(split(variable, ".")))
var_idx = findfirst(==(substance), basin.substances)
value = basin.concentration_state[listen_node_id.idx, var_idx]
var_idx = findfirst(==(substance), basin.concentration_data.substances)
value = basin.concentration_data.concentration_state[listen_node_id.idx, var_idx]
else
error("Unsupported condition variable $variable.")
end
Expand Down Expand Up @@ -781,7 +711,8 @@ end
function update_basin_conc!(integrator)::Nothing
(; p) = integrator
(; basin) = p
(; node_id, concentration, concentration_time, substances) = basin
(; node_id, concentration_data, concentration_time) = basin
(; concentration, substances) = concentration_data
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
Expand All @@ -802,7 +733,7 @@ function update_conc!(integrator, parameter, nodetype)::Nothing
node = getproperty(p, parameter)
(; basin) = p
(; node_id, concentration, concentration_time) = node
(; substances) = basin
(; substances) = basin.concentration_data
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
Expand Down
Loading

0 comments on commit 4e9f479

Please sign in to comment.