Skip to content

Commit

Permalink
fix(imscsv): fix issue with csv solver output in parallel
Browse files Browse the repository at this point in the history
* add function to append processor id to file names (`append_processor_id()`)
* update mf6core to use `append_processor_id()` to append processor id to `mfsim.lst`
* add test for inner and outer solver csv output
  • Loading branch information
jdhughes-usgs committed Aug 4, 2023
1 parent b2ee6cf commit 48a19cc
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 36 deletions.
39 changes: 30 additions & 9 deletions autotest/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from flopy.utils.compare import compare_heads
from modflow_devtools.misc import is_in_ci

DNODATA = 3.e+30
DNODATA = 3.0e30
sfmt = "{:25s} - {}"
extdict = {
"hds": "head",
Expand Down Expand Up @@ -164,7 +164,6 @@ def setup(self, src, dst):
return

def setup_comparison(self, src, dst, testModel=True):

# evaluate if comparison should be made
if not self.make_comparison:
return
Expand Down Expand Up @@ -221,7 +220,7 @@ def run(self):
success, buff = self.run_parallel(
exe,
)
except Exception as exc:
except Exception as exc:
msg = sfmt.format("MODFLOW 6 run", self.name)
print(msg)
print(exc)
Expand Down Expand Up @@ -332,12 +331,37 @@ def run(self):
return

def run_parallel(self, exe):
normal_msg="normal termination"
physical_cpus = os.cpu_count()
print(f"CPUs: {physical_cpus}")
if self.ncpus > physical_cpus:
print(
f"simulation is oversubscribed to {self.ncpus} CPUs "
+ f"but there are only {physical_cpus} CPUs. "
+ "Expect degraded performance."
)
is_oversubscribed = True
with open(f"{self.simpath}/localhost", "w") as f:
f.write(f"localhost slots={self.ncpus}\n")
else:
is_oversubscribed = False

normal_msg = "normal termination"
success = False
nr_success = 0
buff = []

mpiexec_cmd = ["mpiexec", "--oversubscribe", "-np", str(self.ncpus), exe, "-p"]
# add initial parallel commands
mpiexec_cmd = ["mpiexec", "-np", str(self.ncpus)]

# add oversubscribed commands
if is_oversubscribed:
mpiexec_cmd.append("--hostfile")
mpiexec_cmd.append("localhost")

# add remainder of parallel commands
mpiexec_cmd.append(exe)
mpiexec_cmd.append("-p")

proc = Popen(mpiexec_cmd, stdout=PIPE, stderr=STDOUT, cwd=self.simpath)

while True:
Expand All @@ -348,7 +372,7 @@ def run_parallel(self, exe):
# success is when the success message appears
# in every process of the parallel simulation
if normal_msg in line.lower():
nr_success = nr_success + 1
nr_success += 1
if nr_success == self.ncpus:
success = True
line = line.rstrip("\r\n")
Expand All @@ -359,7 +383,6 @@ def run_parallel(self, exe):

return success, buff


def compare(self):
"""
Compare the model results
Expand Down Expand Up @@ -411,7 +434,6 @@ def compare(self):
ext = os.path.splitext(file1)[1][1:]

if ext.lower() in head_extensions:

# simulation file
pth = os.path.join(self.simpath, file1)
files1.append(pth)
Expand All @@ -425,7 +447,6 @@ def compare(self):

# Check to see if there is a corresponding compare file
if files_cmp is not None:

if file1 + ".cmp" in files_cmp:
# compare file
idx = files_cmp.index(file1 + ".cmp")
Expand Down
3 changes: 2 additions & 1 deletion autotest/test_par_gwf02.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from simulation import TestSimulation

# Test for parallel MODFLOW running a simple
# multi-model setup on different partitionings
# multi-model setup with different numbers
# of partitions
#
#
# [M1ny] | ... | ... | [Mnxny]
Expand Down
263 changes: 263 additions & 0 deletions autotest/test_par_gwf_ims_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import os

import flopy
import numpy as np
import pytest
from framework import TestFramework
from simulation import TestSimulation

# Test for parallel MODFLOW running on two cpus.
# It contains two coupled models with
#
# 1d: (nlay,nrow,ncol) = (1,1,5),
#
# constant head boundaries left=1.0, right=10.0.
# The result should be a uniform flow field.

ex = ["par_gwf_csv"]
dis_shape = [(1, 1, 5)]

# global convenience...
name_left = "leftmodel"
name_right = "rightmodel"

# solver data
nouter, ninner = 100, 300
hclose, rclose, relax = 10e-6, 1e-3, 0.97


def get_model(idx, dir):
name = ex[idx]

# parameters and spd
# tdis
nper = 1
tdis_rc = []
for i in range(nper):
tdis_rc.append((1.0, 1, 1))

# model spatial discretization
nlay = dis_shape[idx][0]
nrow = dis_shape[idx][1]
ncol = dis_shape[idx][2]

# cell spacing
delr = 100.0
delc = 100.0
area = delr * delc

# shift
shift_x = 5 * delr
shift_y = 0.0

# top/bot of the aquifer
tops = [0.0, -100.0, -200.0, -300.0, -400.0, -500.0]

# hydraulic conductivity
k11 = 1.0

# boundary stress period data
h_left = 1.0
h_right = 10.0

# initial head
h_start = 0.0

sim = flopy.mf6.MFSimulation(
sim_name=name,
version="mf6",
exe_name="mf6",
sim_ws=dir,
)

tdis = flopy.mf6.ModflowTdis(
sim, time_units="DAYS", nper=nper, perioddata=tdis_rc
)

ims = flopy.mf6.ModflowIms(
sim,
print_option="ALL",
csv_outer_output_filerecord=f"{name}.outer.csv",
csv_inner_output_filerecord=f"{name}.inner.csv",
outer_dvclose=hclose,
outer_maximum=nouter,
inner_maximum=ninner,
inner_dvclose=hclose,
rcloserecord=rclose,
linear_acceleration="BICGSTAB",
relaxation_factor=relax,
)

# submodel on the left:
left_chd = [
[(ilay, irow, 0), h_left]
for irow in range(nrow)
for ilay in range(nlay)
]
chd_spd_left = {0: left_chd}

gwf = flopy.mf6.ModflowGwf(
sim,
modelname=name_left,
)
dis = flopy.mf6.ModflowGwfdis(
gwf,
nlay=nlay,
nrow=nrow,
ncol=ncol,
delr=delr,
delc=delc,
top=tops[0],
botm=tops[1 : nlay + 1],
)
ic = flopy.mf6.ModflowGwfic(
gwf,
strt=h_start,
)
npf = flopy.mf6.ModflowGwfnpf(
gwf,
icelltype=0,
k=k11,
)
chd = flopy.mf6.ModflowGwfchd(
gwf,
stress_period_data=chd_spd_left,
)
oc = flopy.mf6.ModflowGwfoc(
gwf,
head_filerecord=f"{name_left}.hds",
printrecord=[("HEAD", "LAST"), ("BUDGET", "LAST")],
saverecord=[("HEAD", "LAST")],
)

# submodel on the right:
right_chd = [
[(ilay, irow, ncol - 1), h_right]
for irow in range(nrow)
for ilay in range(nlay)
]
chd_spd_right = {0: right_chd}

gwf = flopy.mf6.ModflowGwf(
sim,
modelname=name_right,
)
dis = flopy.mf6.ModflowGwfdis(
gwf,
nlay=nlay,
nrow=nrow,
ncol=ncol,
delr=delr,
delc=delc,
xorigin=shift_x,
yorigin=shift_y,
top=tops[0],
botm=tops[1 : nlay + 1],
)
ic = flopy.mf6.ModflowGwfic(
gwf,
strt=h_start,
)
npf = flopy.mf6.ModflowGwfnpf(
gwf,
icelltype=0,
k=k11,
)
chd = flopy.mf6.ModflowGwfchd(
gwf,
stress_period_data=chd_spd_right,
)
oc = flopy.mf6.ModflowGwfoc(
gwf,
head_filerecord=f"{name_right}.hds",
printrecord=[("HEAD", "LAST"), ("BUDGET", "LAST")],
saverecord=[("HEAD", "LAST")],
)

# exchangedata
angldegx = 0.0
cdist = delr
gwfgwf_data = [
[
(ilay, irow, ncol - 1),
(ilay, irow, 0),
1,
delr / 2.0,
delr / 2.0,
delc,
angldegx,
cdist,
]
for irow in range(nrow)
for ilay in range(nlay)
]
gwfgwf = flopy.mf6.ModflowGwfgwf(
sim,
exgtype="GWF6-GWF6",
nexg=len(gwfgwf_data),
exgmnamea=name_left,
exgmnameb=name_right,
exchangedata=gwfgwf_data,
auxiliary=["ANGLDEGX", "CDIST"],
)

return sim


def build_petsc_db(exdir):
petsc_db_file = os.path.join(exdir, ".petscrc")
with open(petsc_db_file, "w") as petsc_file:
petsc_file.write("-ksp_type cg\n")
petsc_file.write("-pc_type bjacobi\n")
petsc_file.write("-sub_pc_type ilu\n")
petsc_file.write("-sub_pc_factor_levels 2\n")
petsc_file.write(f"-dvclose {hclose}\n")
petsc_file.write(f"-ksp_max_it {nouter}\n")
petsc_file.write("-options_left no\n")
# petsc_file.write("-log_view\n")


def build_model(idx, exdir):
sim = get_model(idx, exdir)
build_petsc_db(exdir)
return sim, None


def eval_model(sim):
# two coupled models with a uniform flow field,
# here we assert the known head values at the
# cell centers
fpth = os.path.join(sim.simpath, f"{name_left}.hds")
hds = flopy.utils.HeadFile(fpth)
heads_left = hds.get_data().flatten()
fpth = os.path.join(sim.simpath, f"{name_right}.hds")
hds = flopy.utils.HeadFile(fpth)
heads_right = hds.get_data().flatten()
np.testing.assert_array_almost_equal(
heads_left[0:5], [1.0, 2.0, 3.0, 4.0, 5.0]
)
np.testing.assert_array_almost_equal(
heads_right[0:5], [6.0, 7.0, 8.0, 9.0, 10.0]
)


@pytest.mark.parallel
@pytest.mark.parametrize(
"idx, name",
list(enumerate(ex)),
)
def test_mf6model(idx, name, function_tmpdir, targets):
test = TestFramework()
test.build(build_model, idx, str(function_tmpdir))
test.run(
TestSimulation(
name=name,
exe_dict=targets,
exfunc=eval_model,
idxsim=idx,
make_comparison=False,
parallel=True,
ncpus=2,
),
str(function_tmpdir),
)
7 changes: 6 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ if fc_id == 'gcc'
'-Wno-maybe-uninitialized', # "Uninitialized" flags produce false positives with allocatables
'-Wno-uninitialized',
]

if not get_option('parallel')
link_args += [
'-static-libgfortran',
]
endif

# Options specific to profile
if profile == 'release'
compile_args += ['-ffpe-summary=overflow', '-ffpe-trap=overflow,zero,invalid']
Expand Down
Loading

0 comments on commit 48a19cc

Please sign in to comment.