Skip to content

Commit

Permalink
File based simulations (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
prisae authored Oct 27, 2021
1 parent e3aba42 commit 5c1e132
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 76 deletions.
19 changes: 13 additions & 6 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,30 @@ Changelog
""""""""""


latest
------
v1.3.0: File-based computations
-------------------------------

**2021-10-27**

- ``electrodes``:

- New source ``TxMagneticPoint`` (requires ``discretize``; mainly used as
adjoint source for magnetic receivers; does not work in the presence of
magnetic permeabilities in the vincinity of the source).
magnetic permeabilities in the vicinity of the source).
- Both receivers (``Rx{Electric;Magnetic}Point``) can now produce their
proper adjoint (thanks to @sgkang!).

- Changes in Simulation and parallel execution.

- Parallel computation is not sharing the simulation any longer.
- ``get_model`` and ``get_hfield`` are now done on the fly, not stored in a
dict anymore (``simulation._dict_model`` and ``simulation._dict_hfield``
do not exist any longer).
- Parallel computation can new be done both file-based or all in memory.
The new possibility for file-based computation should make it possible
to compute responses for any amount of source-frequency pairs. See
parameter ``file_dir`` in the Simulation class (or corresponding parameter
in the CLI parameter file).
- ``get_model`` and ``get_hfield`` are now done on the fly, they are not
stored in a dict; ``simulation._dict_model`` and
``simulation._dict_hfield`` do not exist any longer.
- New methods ``jvec`` (sensitivity times a vector) and ``jtvec``
(sensitivity transpose times a vector). These methods are currently
experimental; documentation and examples are lacking behind.
Expand Down
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,6 @@
'https://doi.org/10.1111/j.1365-246X.2010.04544.x',
'https://doi.org/10.1088/0266-5611/24/3/034012',
'https://doi.org/10.1093/gji/ggab171',
'https://www.terrasysgeo.com',
'https://www.pygimli.org',
]
5 changes: 4 additions & 1 deletion docs/manual/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ remove the comment signs to use them.
# gridding = single
# min_offset = 0.0 # Only relevant if `observed=True` (r<r_min set to NaN).
# name = MyTestSimulation
# file_dir = None # For file-based comp; absolute or relative path.
# receiver_interpolation = cubic # Set it to <linear> for the gradient.

# Solver options
# --------------
Expand All @@ -69,7 +71,7 @@ remove the comment signs to use them.
# sslsolver = # bool
# semicoarsening = # bool
# linerelaxation = # bool
# cycl = # string
# cycle = # string
# tol = # float
# verb = # int
# maxit = # int
Expand Down Expand Up @@ -97,6 +99,7 @@ remove the comment signs to use them.
# min_width_pps = # list, e.g.: 5, 3, 3
# expand = # list, e.g.: 0.3, 1e8
# domain = # list of lists, e.g.: -10000, 10000; None; None
# distance = # list of lists, e.g., None; None; -10000, 10000
# stretching = # list of lists, e.g.: None; None; 1.05, 1.5
# min_width_limits = # list of lists, e.g.: 10, 100; None; 50
# mapping = # string, e.g.: Resistivity
Expand Down
5 changes: 5 additions & 0 deletions emg3d/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ def parse_config_file(args_dict):
_ = all_sim.pop(key)
simulation[key] = cfg.getfloat('simulation', key)

key = 'file_dir'
if cfg.has_option('simulation', key):
_ = all_sim.pop(key)
simulation[key] = cfg.get('simulation', key)

key = 'receiver_interpolation'
if cfg.has_option('simulation', key):
_ = all_sim.pop(key)
Expand Down
141 changes: 110 additions & 31 deletions emg3d/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
# License for the specific language governing permissions and limitations under
# the License.

import os
import warnings
import itertools
from pathlib import Path
from copy import deepcopy

import numpy as np
Expand Down Expand Up @@ -164,6 +166,21 @@ class Simulation:
proper adjoint for the gradient the receiver has to be interpolated
linearly too.
file_dir : str, default: None
Absolute or relative path, where temporary files should be stored. By
default, everything is done in memory. However, for large models with
many sources and frequencies this can become memory consuming.
Providing a ``file_dir`` can help with this. Fields and models are then
stored to disk, and each process accesses the files it needs. There is
only a gain if there are more source-frequency pairs than concurrent
running processes.
Note that the directory is created if it does not exist. However, the
parent directory must exist.
Also note that the files are stored as .h5-files, and you need to have
``h5py`` installed to use this feature.
"""

# Gridding descriptions (for repr's).
Expand Down Expand Up @@ -206,6 +223,13 @@ def __init__(self, survey, model, max_workers=4, gridding='single',
self._gradient = None
self._misfit = None

# Initiate file_dir
self.file_dir = kwargs.pop('file_dir', None)
if self.file_dir:
self.file_dir = os.path.abspath(self.file_dir)
# Create directory if it doesn't exist yet.
Path(self.file_dir).mkdir(exist_ok=True)

# Get model taking gridding_opts into account.
# Sets self.model and self.gridding_opts.
self._set_model(model, kwargs)
Expand All @@ -218,10 +242,11 @@ def __init__(self, survey, model, max_workers=4, gridding='single',
# `tqdm`-options; undocumented.
# Can be used to, e.g., disable tqdm completely via:
# > emg3d.Simulation(args, kwargs, tqdm_opts={'disable': True})
self._tqdm_opts = kwargs.pop(
'tqdm_opts',
{'bar_format': '{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]'}
)
tqdm_opts = kwargs.pop('tqdm_opts', {})
self._tqdm_opts = {
**tqdm_opts,
**{'bar_format': '{desc}: {bar}{n_fmt}/{total_fmt} [{elapsed}]'},
}

# Ensure no kwargs left.
if kwargs:
Expand Down Expand Up @@ -306,6 +331,11 @@ def clean(self, what='computed'):
if hasattr(self, name):
delattr(self, name)

# Remove files if they exist.
if self.file_dir:
for p in Path(self.file_dir).glob('[ebg]field_*.h5'):
p.unlink()

# Clean data.
if what in ['computed', 'all']:
for key in ['residual', 'weight']:
Expand Down Expand Up @@ -352,6 +382,9 @@ def to_dict(self, what='computed', copy=False):
'verb': self.verb,
'name': self.name,
'info': self.info,
'tqdm_opts': self._tqdm_opts,
'receiver_interpolation': self.receiver_interpolation,
'file_dir': self.file_dir,
'_input_sc2': self._input_sc2,
}

Expand Down Expand Up @@ -412,6 +445,10 @@ def from_dict(cls, inp):
input_sc2 = inp.pop('_input_sc2', False)
if input_sc2:
cls_inp['_input_sc2'] = input_sc2
cls_inp['receiver_interpolation'] = inp.pop(
'receiver_interpolation', 'cubic')
cls_inp['file_dir'] = inp.pop('file_dir', None)
cls_inp['tqdm_opts'] = inp.pop('tqdm_opts', {})

# Instantiate the class.
out = cls(**cls_inp)
Expand Down Expand Up @@ -455,7 +492,8 @@ def to_file(self, fname, what='computed', name='simulation', **kwargs):
- '``results'``:
Stores only the response at receiver locations.
- ``'all'``:
Stores everything.
Stores everything. Note that if ``file_dir`` is set, these files
will remain there.
- ``'plain'``:
Only stores the plain Simulation (as initiated).
Expand Down Expand Up @@ -599,28 +637,55 @@ def get_efield(self, source, frequency):
freq = self._freq_inp2key(frequency)

# If it doesn't exist yet, compute it.
if self._dict_efield[source][freq] is None:
if self._dict_get('efield', source, freq) is None:
self.compute(source=source, frequency=freq)

return self._dict_efield[source][freq]
return self._dict_get('efield', source, freq)

def get_hfield(self, source, frequency):
"""Return magnetic field for given source and frequency."""
freq = self._freq_inp2key(frequency)

# If electric field not computed yet compute it.
if self._dict_efield[source][freq] is None:
if self._dict_get('efield', source, freq) is None:
self.compute(source=source, frequency=freq)

# Return magnetic field.
return fields.get_magnetic_field(
self.get_model(source, freq),
self._dict_efield[source][freq],
self._dict_get('efield', source, freq),
)

def get_efield_info(self, source, frequency):
"""Return the solver information of the corresponding computation."""
return self._dict_efield_info[source][self._freq_inp2key(frequency)]
freq = self._freq_inp2key(frequency)
return self._dict_get('efield_info', source, freq)

def _dict_get(self, which, source, frequency):
"""Return source-frequency pair from dictionary `which`.
Thin wrapper for ``self._dict_{which}[{source}][{frequency}]``, that
works as well for file-based computations.
"""
value = getattr(self, f"_dict_{which}")[source][frequency]
return self._load(value, ['efield', 'info']['info' in which])

def _load(self, value, what):
"""Returns `value` (memory) or loads `value[what]` (files)."""
if self.file_dir and value is not None:
return io.load(value, verb=0)[what]
else:
return value

def _data_or_file(self, what, source, frequency, data):
"""Return data or file-name for given what, source, and frequency."""
if self.file_dir:
fname = os.path.join(
self.file_dir, f"{what}_{source}_{frequency}.h5")
io.save(fname, data=data, verb=0)
return fname
else:
return data

def _get_responses(self, source, frequency, efield=None):
"""Return electric and magnetic fields at receiver locations."""
Expand All @@ -634,7 +699,7 @@ def _get_responses(self, source, frequency, efield=None):

# efield of this source/frequency if not provided.
if efield is None:
efield = self._dict_efield[source][frequency]
efield = self._dict_get('efield', source, frequency)

if erec.size:

Expand Down Expand Up @@ -688,12 +753,17 @@ def compute(self, observed=False, **kwargs):
def collect_efield_inputs(inp):
"""Collect inputs."""
source, freq = inp
grid = self.get_grid(source, freq)
src = self.survey.sources[source]
frequency = self.survey.frequencies[freq]
# efield is None if not computed yet; otherwise it is the solution.
efield = self._dict_efield[source][freq]
return self.model, grid, src, frequency, efield, self.solver_opts

data = {
'model': self.model,
'grid': self.get_grid(source, freq),
'source': self.survey.sources[source],
'frequency': self.survey.frequencies[freq],
# efield is None if not comp. yet; else it is the solution.
'efield': self._dict_get('efield', source, freq),
'solver_opts': self.solver_opts,
}
return self._data_or_file('efield', source, freq, data)

# Use process_map to compute fields in parallel.
out = utils._process_map(
Expand Down Expand Up @@ -820,8 +890,8 @@ def gradient(self):
# This is the actual Equation (10), with:
# del S / del p = iwu0 V sigma / sigma,
# where lambda and E are already volume averaged.
efield = self._dict_efield[src][freq] # Forward electric field
bfield = self._dict_bfield[src][freq] # Conj. backprop. field
efield = self._dict_get('efield', src, freq)
bfield = self._dict_get('bfield', src, freq)
gfield = fields.Field(
grid=efield.grid,
data=-np.real(bfield.field * efield.smu0 * efield.field),
Expand Down Expand Up @@ -970,10 +1040,15 @@ def _bcompute(self):
def collect_bfield_inputs(inp):
"""Collect inputs."""
source, freq = inp
rfield = self._get_rfield(source, freq)
# bfield is None unless it was explicitly set.
bfield = self._dict_bfield[source][freq]
return self.model, rfield, bfield, self.solver_opts

data = {
'model': self.model,
'sfield': self._get_rfield(source, freq),
# bfield is None unless it was explicitly set.
'efield': self._dict_get('bfield', source, freq),
'solver_opts': self.solver_opts
}
return self._data_or_file('bfield', source, freq, data)

# Use process_map to compute fields in parallel.
out = utils._process_map(
Expand Down Expand Up @@ -1059,10 +1134,10 @@ def jvec(self, vector):
# Create iterable from src/freq-list to call the process_map.
def collect_gfield_inputs(inp, vector=vector):
"""Collect inputs."""
source, frequency = inp
source, freq = inp

# Forward electric field
efield = self._dict_efield[source][frequency]
efield = self._dict_get('efield', source, freq)

# Compute gvec = G * vector (using discretize)
gvec = efield.grid.get_edge_inner_product_deriv(
Expand All @@ -1078,7 +1153,13 @@ def collect_gfield_inputs(inp, vector=vector):
frequency=efield.frequency
)

return self.model, gfield, None, self.solver_opts
data = {
'model': self.model,
'sfield': gfield,
'efield': None,
'solver_opts': self.solver_opts,
}
return self._data_or_file('gfield', source, freq, data)

# Compute and return A^-1 * G * vector
out = utils._process_map(
Expand All @@ -1095,9 +1176,8 @@ def collect_gfield_inputs(inp, vector=vector):

# Loop over src-freq combinations to extract and store.
for i, (src, freq) in enumerate(self._srcfreq):

# Store responses at receivers.
resp = self._get_responses(src, freq, out[i][0])
gfield = self._load(out[i][0], 'efield')
resp = self._get_responses(src, freq, gfield)
self.data['jvec'].loc[src, :, freq] = resp

return self.data['jvec'].data
Expand Down Expand Up @@ -1260,12 +1340,11 @@ def print_solver_info(self, field='efield', verb=1, return_info=False):
return

# Get info dict.
info = getattr(self, f"_dict_{field}_info", {})
out = ""

# Loop over sources and frequencies.
for src, freq in self._srcfreq:
cinfo = info[src][freq]
cinfo = self._dict_get(f"{field}_info", src, freq)

# Print if verbose or not converged.
if cinfo is not None and (verb > 0 or cinfo['exit'] != 0):
Expand Down
Loading

0 comments on commit 5c1e132

Please sign in to comment.