diff --git a/autotest/regression/test_mf6.py b/autotest/regression/test_mf6.py index f46e47193..a8be56ac5 100644 --- a/autotest/regression/test_mf6.py +++ b/autotest/regression/test_mf6.py @@ -918,7 +918,10 @@ def test021_twri(function_tmpdir, example_data_path): version="mf6", exe_name="mf6", sim_ws=data_folder, + memory_print_option="SUMMARY", ) + sim.nocheck = True + sim.set_sim_path(function_tmpdir) tdis_rc = [(86400.0, 1, 1.0)] tdis_package = ModflowTdis( @@ -927,6 +930,8 @@ def test021_twri(function_tmpdir, example_data_path): model = ModflowGwf( sim, modelname=model_name, model_nam_file=f"{model_name}.nam" ) + model.print_input = True + ims_package = ModflowIms( sim, print_option="SUMMARY", @@ -1098,6 +1103,9 @@ def test021_twri(function_tmpdir, example_data_path): strt2 = ic2.strt.get_data() drn2 = model2.get_package("drn") drn_spd = drn2.stress_period_data.get_data() + assert sim2.memory_print_option.get_data().lower() == "summary" + assert sim2.nocheck.get_data() is True + assert model2.print_input.get_data() is True assert strt2[0, 0, 0] == 0.0 assert strt2[1, 0, 0] == 1.0 assert strt2[2, 0, 0] == 2.0 diff --git a/autotest/test_copy.py b/autotest/test_copy.py index 10668bbdd..ee8cdf926 100644 --- a/autotest/test_copy.py +++ b/autotest/test_copy.py @@ -10,7 +10,8 @@ from flopy.mbase import ModelInterface from flopy.mf6.data.mfdatalist import MFList, MFTransientList from flopy.mf6.mfpackage import MFChildPackages, MFPackage -from flopy.mf6.modflow.mfsimulation import MFSimulation, MFSimulationData +from flopy.mf6.mfsimbase import MFSimulationData +from flopy.mf6.modflow.mfsimulation import MFSimulation from flopy.modflow import Modflow from flopy.utils import TemporalReference diff --git a/autotest/test_mf6.py b/autotest/test_mf6.py index 39207dfee..f5c40834b 100644 --- a/autotest/test_mf6.py +++ b/autotest/test_mf6.py @@ -1,6 +1,5 @@ import os import platform -from os.path import join from pathlib import Path from shutil import copytree, which @@ -58,7 +57,7 @@ ) from flopy.mf6.data.mffileaccess import MFFileAccessArray from flopy.mf6.data.mfstructure import MFDataItemStructure, MFDataStructure -from flopy.mf6.mfbase import MFFileMgmt +from flopy.mf6.mfsimbase import MFSimulationData from flopy.mf6.modflow import ( mfgwf, mfgwfdis, @@ -72,7 +71,6 @@ mfims, mftdis, ) -from flopy.mf6.modflow.mfsimulation import MFSimulationData from flopy.utils import ( CellBudgetFile, HeadFile, diff --git a/flopy/mf6/data/mfdatalist.py b/flopy/mf6/data/mfdatalist.py index c9635b458..a62916c32 100644 --- a/flopy/mf6/data/mfdatalist.py +++ b/flopy/mf6/data/mfdatalist.py @@ -1944,6 +1944,8 @@ def _set_data_record( self._simulation_data.debug, ) if key is None: + if data is None: + return # search for a key new_key_index = self.structure.first_non_keyword_index() if new_key_index is not None and len(data) > new_key_index: diff --git a/flopy/mf6/data/mfdatascalar.py b/flopy/mf6/data/mfdatascalar.py index 0e625cf33..7c8421167 100644 --- a/flopy/mf6/data/mfdatascalar.py +++ b/flopy/mf6/data/mfdatascalar.py @@ -822,6 +822,8 @@ def set_data(self, data, key=None): if `data` is a dictionary. """ + if data is None and key is None: + return if isinstance(data, dict): # each item in the dictionary is a list for one stress period # the dictionary key is the stress period the list is for diff --git a/flopy/mf6/data/mfdatautil.py b/flopy/mf6/data/mfdatautil.py index dfe847060..390903ebe 100644 --- a/flopy/mf6/data/mfdatautil.py +++ b/flopy/mf6/data/mfdatautil.py @@ -103,33 +103,34 @@ def convert_data(data, data_dimensions, data_type, data_item=None, sub_amt=1): False, ) elif data_type == DatumType.integer: - if data_item is not None and data_item.numeric_index: - return int(PyListUtil.clean_numeric(data)) - sub_amt - try: - return int(data) - except (ValueError, TypeError): + if data is not None: + if data_item is not None and data_item.numeric_index: + return int(PyListUtil.clean_numeric(data)) - sub_amt try: - return int(PyListUtil.clean_numeric(data)) + return int(data) except (ValueError, TypeError): - message = ( - 'Data "{}" with value "{}" can not be ' - "converted to int" - ".".format(data_dimensions.structure.name, data) - ) - type_, value_, traceback_ = sys.exc_info() - raise MFDataException( - data_dimensions.structure.get_model(), - data_dimensions.structure.get_package(), - data_dimensions.structure.path, - "converting data", - data_dimensions.structure.name, - inspect.stack()[0][3], - type_, - value_, - traceback_, - message, - False, - ) + try: + return int(PyListUtil.clean_numeric(data)) + except (ValueError, TypeError): + message = ( + 'Data "{}" with value "{}" can not be ' + "converted to int" + ".".format(data_dimensions.structure.name, data) + ) + type_, value_, traceback_ = sys.exc_info() + raise MFDataException( + data_dimensions.structure.get_model(), + data_dimensions.structure.get_package(), + data_dimensions.structure.path, + "converting data", + data_dimensions.structure.name, + inspect.stack()[0][3], + type_, + value_, + traceback_, + message, + False, + ) elif data_type == DatumType.string and data is not None: if data_item is None or not data_item.preserve_case: # keep strings lower case @@ -892,7 +893,7 @@ def add_parameter( if model_parameter: self.model_parameters.append(param_descr) - def get_doc_string(self, model_doc_string=False): + def get_doc_string(self, model_doc_string=False, sim_doc_string=False): doc_string = '{}"""\n{}{}\n\n{}\n'.format( self.indent, self.indent, self.description, self.parameter_header ) @@ -912,7 +913,17 @@ def get_doc_string(self, model_doc_string=False): else: param_list = self.parameters for parameter in param_list: + if sim_doc_string: + pclean = parameter.strip() + if ( + pclean.startswith("simulation") + or pclean.startswith("loading_package") + or pclean.startswith("filename") + or pclean.startswith("pname") + or pclean.startswith("parent_file") + ): + continue doc_string += f"{parameter}\n" - if not model_doc_string: + if not (model_doc_string or sim_doc_string): doc_string += f'\n{self.indent}"""' return doc_string diff --git a/flopy/mf6/mfmodel.py b/flopy/mf6/mfmodel.py index a47cf106c..be20e0c84 100644 --- a/flopy/mf6/mfmodel.py +++ b/flopy/mf6/mfmodel.py @@ -14,7 +14,7 @@ from ..utils import datautil from ..utils.check import mf6check from .coordinates import modeldimensions -from .data import mfstructure +from .data import mfdata, mfdatalist, mfstructure from .data.mfdatautil import DataSearchOutput, iterable from .mfbase import ( ExtFileAction, @@ -182,6 +182,24 @@ def __getattr__(self, item): return package raise AttributeError(item) + def __setattr__(self, name, value): + if hasattr(self, name) and getattr(self, name) is not None: + attribute = object.__getattribute__(self, name) + if attribute is not None and isinstance(attribute, mfdata.MFData): + try: + if isinstance(attribute, mfdatalist.MFList): + attribute.set_data(value, autofill=True) + else: + attribute.set_data(value) + except MFDataException as mfde: + raise MFDataException( + mfdata_except=mfde, + model=self.name, + package="", + ) + return + super().__setattr__(name, value) + def __repr__(self): return self._get_data_str(True) @@ -677,9 +695,9 @@ def check(self, f=None, verbose=True, level=1): return self._check(chk, level) - @classmethod + @staticmethod def load_base( - cls, + cls_child, simulation, structure, modelname="NewModel", @@ -729,9 +747,8 @@ def load_base( Examples -------- """ - instance = cls( + instance = cls_child( simulation, - mtype, modelname, model_nam_file=model_nam_file, version=version, @@ -863,7 +880,7 @@ def inspect_cells( -------- >>> import flopy - >>> sim = flopy.mf6.MFSimulation.load("name", "mf6", "mf6", ".") + >>> sim = flopy.mf6.MFSimulationBase.load("name", "mf6", "mf6", ".") >>> model = sim.get_model() >>> inspect_list = [(2, 3, 2), (0, 4, 2), (0, 2, 4)] >>> out_file = os.path.join("temp", "inspect_AdvGW_tidal.csv") diff --git a/flopy/mf6/mfsimbase.py b/flopy/mf6/mfsimbase.py new file mode 100644 index 000000000..dc2da65c8 --- /dev/null +++ b/flopy/mf6/mfsimbase.py @@ -0,0 +1,2530 @@ +import errno +import inspect +import os.path +import sys +import warnings +from pathlib import Path +from typing import List, Optional, Union + +import numpy as np + +from flopy.mbase import run_model +from flopy.mf6.data import mfdata, mfdatalist, mfstructure +from flopy.mf6.data.mfdatautil import MFComment +from flopy.mf6.data.mfstructure import DatumType +from flopy.mf6.mfbase import ( + ExtFileAction, + FlopyException, + MFDataException, + MFFileMgmt, + PackageContainer, + PackageContainerType, + VerbosityLevel, +) +from flopy.mf6.mfpackage import MFPackage +from flopy.mf6.modflow import mfnam, mftdis +from flopy.mf6.utils import binaryfile_utils, mfobservation + + +class SimulationDict(dict): + """ + Class containing custom dictionary for MODFLOW simulations. Dictionary + contains model data. Dictionary keys are "paths" to the data that include + the model and package containing the data. + + Behaves as an dict with some additional features described below. + + Parameters + ---------- + path : MFFileMgmt + Object containing path information for the simulation + + """ + + def __init__(self, path=None): + dict.__init__(self) + self._path = path + + def __getitem__(self, key): + """ + Define the __getitem__ magic method. + + Parameters + ---------- + key (string): Part or all of a dictionary key + + Returns: + MFData or numpy.ndarray + + """ + if key == "_path" or not hasattr(self, "_path"): + raise AttributeError(key) + + # FIX: Transport - Include transport output files + if key[1] in ("CBC", "HDS", "DDN", "UCN"): + val = binaryfile_utils.MFOutput(self, self._path, key) + return val.data + + elif key[-1] == "Observations": + val = mfobservation.MFObservation(self, self._path, key) + return val.data + + if key in self: + val = dict.__getitem__(self, key) + return val + return AttributeError(key) + + def __setitem__(self, key, val): + """ + Define the __setitem__ magic method. + + Parameters + ---------- + key : str + Dictionary key + val : MFData + MFData to store in dictionary + + """ + dict.__setitem__(self, key, val) + + def find_in_path(self, key_path, key_leaf): + """ + Attempt to find key_leaf in a partial key path key_path. + + Parameters + ---------- + key_path : str + partial path to the data + key_leaf : str + name of the data + + Returns + ------- + Data: MFData, + index: int + + """ + key_path_size = len(key_path) + for key, item in self.items(): + if key[:key_path_size] == key_path: + if key[-1] == key_leaf: + # found key_leaf as a key in the dictionary + return item, None + if not isinstance(item, MFComment): + data_item_index = 0 + data_item_structures = item.structure.data_item_structures + for data_item_struct in data_item_structures: + if data_item_struct.name == key_leaf: + # found key_leaf as a data item name in the data + # in the dictionary + return item, data_item_index + if data_item_struct.type != DatumType.keyword: + data_item_index += 1 + return None, None + + def output_keys(self, print_keys=True): + """ + Return a list of output data keys supported by the dictionary. + + Parameters + ---------- + print_keys : bool + print keys to console + + Returns + ------- + output keys : list + + """ + # get keys to request binary output + x = binaryfile_utils.MFOutputRequester.getkeys( + self, self._path, print_keys=print_keys + ) + return [key for key in x.dataDict] + + def input_keys(self): + """ + Return a list of input data keys. + + Returns + ------- + input keys : list + + """ + # get keys to request input ie. package data + for key in self: + print(key) + + def observation_keys(self): + """ + Return a list of observation keys. + + Returns + ------- + observation keys : list + + """ + # get keys to request observation file output + mfobservation.MFObservationRequester.getkeys(self, self._path) + + def keys(self): + """ + Return a list of all keys. + + Returns + ------- + all keys : list + + """ + # overrides the built in keys to print all keys, input and output + self.input_keys() + try: + self.output_keys() + except OSError as e: + if e.errno == errno.EEXIST: + pass + try: + self.observation_keys() + except KeyError: + pass + + +class MFSimulationData: + """ + Class containing MODFLOW simulation data and file formatting data. Use + MFSimulationData to set simulation-wide settings which include data + formatting and file location settings. + + Parameters + ---------- + path : str + path on disk to the simulation + + Attributes + ---------- + indent_string : str + String used to define how much indent to use (file formatting) + internal_formatting : list + List defining string to use for internal formatting + external_formatting : list + List defining string to use for external formatting + open_close_formatting : list + List defining string to use for open/close + max_columns_of_data : int + Maximum columns of data before line wraps. For structured grids this + is set to ncol by default. For all other grids the default is 20. + wrap_multidim_arrays : bool + Whether to wrap line for multi-dimensional arrays at the end of a + row/column/layer + _float_precision : int + Number of decimal points to write for a floating point number + _float_characters : int + Number of characters a floating point number takes up + write_headers: bool + When true flopy writes a header to each package file indicating that + it was created by flopy + sci_note_upper_thres : float + Numbers greater than this threshold are written in scientific notation + sci_note_lower_thres : float + Numbers less than this threshold are written in scientific notation + mfpath : MFFileMgmt + File path location information for the simulation + model_dimensions : dict + Dictionary containing discretization information for each model + mfdata : SimulationDict + Custom dictionary containing all model data for the simulation + + """ + + def __init__(self, path: Union[str, os.PathLike], mfsim): + # --- formatting variables --- + self.indent_string = " " + self.constant_formatting = ["constant", ""] + self._max_columns_of_data = 20 + self.wrap_multidim_arrays = True + self._float_precision = 8 + self._float_characters = 15 + self.write_headers = True + self._sci_note_upper_thres = 100000 + self._sci_note_lower_thres = 0.001 + self.fast_write = True + self.comments_on = False + self.auto_set_sizes = True + self.verify_data = True + self.debug = False + self.verbose = True + self.verbosity_level = VerbosityLevel.normal + self.max_columns_user_set = False + self.max_columns_auto_set = False + + self._update_str_format() + + # --- file path --- + self.mfpath = MFFileMgmt(path, mfsim) + + # --- ease of use variables to make working with modflow input and + # output data easier --- model dimension class for each model + self.model_dimensions = {} + + # --- model data --- + self.mfdata = SimulationDict(self.mfpath) + + # --- temporary variables --- + # other external files referenced + self.referenced_files = {} + + @property + def lazy_io(self): + if not self.auto_set_sizes and not self.verify_data: + return True + return False + + @lazy_io.setter + def lazy_io(self, val): + if val: + self.auto_set_sizes = False + self.verify_data = False + else: + self.auto_set_sizes = True + self.verify_data = True + + @property + def max_columns_of_data(self): + return self._max_columns_of_data + + @max_columns_of_data.setter + def max_columns_of_data(self, val): + if not self.max_columns_user_set and ( + not self.max_columns_auto_set or val > self._max_columns_of_data + ): + self._max_columns_of_data = val + self.max_columns_user_set = True + + @property + def float_precision(self): + """ + Gets precision of floating point numbers. + """ + return self._float_precision + + @float_precision.setter + def float_precision(self, value): + """ + Sets precision of floating point numbers. + + Parameters + ---------- + value: float + floating point precision + + """ + self._float_precision = value + self._update_str_format() + + @property + def float_characters(self): + """ + Gets max characters used in floating point numbers. + """ + return self._float_characters + + @float_characters.setter + def float_characters(self, value): + """ + Sets max characters used in floating point numbers. + + Parameters + ---------- + value: float + floating point max characters + + """ + self._float_characters = value + self._update_str_format() + + def set_sci_note_upper_thres(self, value): + """ + Sets threshold number where any number larger than threshold + is represented in scientific notation. + + Parameters + ---------- + value: float + threshold value + + """ + self._sci_note_upper_thres = value + self._update_str_format() + + def set_sci_note_lower_thres(self, value): + """ + Sets threshold number where any number smaller than threshold + is represented in scientific notation. + + Parameters + ---------- + value: float + threshold value + + """ + self._sci_note_lower_thres = value + self._update_str_format() + + def _update_str_format(self): + """ + Update floating point formatting strings.""" + self.reg_format_str = f"{{:.{self._float_precision}E}}" + self.sci_format_str = ( + f"{{:{self._float_characters}.{self._float_precision}f}}" + ) + + +class MFSimulationBase(PackageContainer): + """ + Entry point into any MODFLOW simulation. + + MFSimulation is used to load, build, and/or save a MODFLOW 6 simulation. + A MFSimulation object must be created before creating any of the MODFLOW 6 + model objects. + + Parameters + ---------- + sim_name : str + Name of the simulation. + version : str + Version of MODFLOW 6 executable + exe_name : str + Path to MODFLOW 6 executable + sim_ws : str + Path to MODFLOW 6 simulation working folder. This is the folder + containing the simulation name file. + verbosity_level : int + Verbosity level of standard output from 0 to 2. When 0 is specified no + standard output is written. When 1 is specified standard + error/warning messages with some informational messages are written. + When 2 is specified full error/warning/informational messages are + written (this is ideal for debugging). + continue_ : bool + Sets the continue option in the simulation name file. The continue + option is a keyword flag to indicate that the simulation should + continue even if one or more solutions do not converge. + nocheck : bool + Sets the nocheck option in the simulation name file. The nocheck + option is a keyword flag to indicate that the model input check + routines should not be called prior to each time step. Checks + are performed by default. + memory_print_option : str + Sets memory_print_option in the simulation name file. + Memory_print_option is a flag that controls printing of detailed + memory manager usage to the end of the simulation list file. NONE + means do not print detailed information. SUMMARY means print only + the total memory for each simulation component. ALL means print + information for each variable stored in the memory manager. NONE is + default if memory_print_option is not specified. + write_headers: bool + When true flopy writes a header to each package file indicating that + it was created by flopy. + lazy_io: bool + When true flopy only reads external data when the data is requested + and only writes external data if the data has changed. This option + automatically overrides the verify_data and auto_set_sizes, turning + both off. + Examples + -------- + >>> s = MFSimulationBase.load('my simulation', 'simulation.nam') + + Attributes + ---------- + sim_name : str + Name of the simulation + name_file : MFPackage + Simulation name file package + + """ + + def __init__( + self, + sim_name="sim", + version="mf6", + exe_name: Union[str, os.PathLike] = "mf6", + sim_ws: Union[str, os.PathLike] = os.curdir, + verbosity_level=1, + continue_=None, + nocheck=None, + memory_print_option=None, + write_headers=True, + lazy_io=False, + ): + super().__init__(MFSimulationData(sim_ws, self), sim_name) + self.simulation_data.verbosity_level = self._resolve_verbosity_level( + verbosity_level + ) + self.simulation_data.write_headers = write_headers + if lazy_io: + self.simulation_data.lazy_io = True + + # verify metadata + fpdata = mfstructure.MFStructure() + if not fpdata.valid: + excpt_str = ( + "Invalid package metadata. Unable to load MODFLOW " + "file structure metadata." + ) + raise FlopyException(excpt_str) + + # initialize + self.dimensions = None + self.type = "Simulation" + + self.version = version + self.exe_name = exe_name + self._models = {} + self._tdis_file = None + self._exchange_files = {} + self._solution_files = {} + self._other_files = {} + self.structure = fpdata.sim_struct + self.model_type = None + + self._exg_file_num = {} + + self.simulation_data.mfpath.set_last_accessed_path() + + # build simulation name file + self.name_file = mfnam.ModflowNam( + self, + filename="mfsim.nam", + continue_=continue_, + nocheck=nocheck, + memory_print_option=memory_print_option, + _internal_package=True, + ) + + # try to build directory structure + sim_path = self.simulation_data.mfpath.get_sim_path() + if not os.path.isdir(sim_path): + try: + os.makedirs(sim_path) + except OSError as e: + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.quiet.value + ): + print( + "An error occurred when trying to create the " + "directory {}: {}".format(sim_path, e.strerror) + ) + + # set simulation validity initially to false since the user must first + # add at least one model to the simulation and fill out the name and + # tdis files + self.valid = False + + def __getattr__(self, item): + """ + Override __getattr__ to allow retrieving models. + + __getattr__ is used to allow for getting models and packages as if + they are attributes + + Parameters + ---------- + item : str + model or package name + + + Returns + ------- + md : Model or package object + Model or package object of type :class:flopy6.mfmodel or + :class:flopy6.mfpackage + + """ + if item == "valid" or not hasattr(self, "valid"): + raise AttributeError(item) + + models = [] + if item in self.structure.model_types: + # get all models of this type + for model in self._models.values(): + if model.model_type == item or model.model_type[:-1] == item: + models.append(model) + + if len(models) > 0: + return models + elif item in self._models: + model = self.get_model(item) + if model is not None: + return model + raise AttributeError(item) + else: + package = self.get_package(item) + if package is not None: + return package + raise AttributeError(item) + + def __setattr__(self, name, value): + if hasattr(self, name) and getattr(self, name) is not None: + attribute = object.__getattribute__(self, name) + if attribute is not None and isinstance(attribute, mfdata.MFData): + try: + if isinstance(attribute, mfdatalist.MFList): + attribute.set_data(value, autofill=True) + else: + attribute.set_data(value) + except MFDataException as mfde: + raise MFDataException( + mfdata_except=mfde, + model="", + package="", + ) + return + super().__setattr__(name, value) + + def __repr__(self): + """ + Override __repr__ to print custom string. + + Returns + -------- + repr string : str + string describing object + + """ + return self._get_data_str(True) + + def __str__(self): + """ + Override __str__ to print custom string. + + Returns + -------- + str string : str + string describing object + + """ + return self._get_data_str(False) + + def _get_data_str(self, formal): + file_mgt = self.simulation_data.mfpath + data_str = ( + "sim_name = {}\nsim_path = {}\nexe_name = " + "{}\n" + "\n".format(self.name, file_mgt.get_sim_path(), self.exe_name) + ) + + for package in self._packagelist: + pk_str = package._get_data_str(formal, False) + if formal: + if len(pk_str.strip()) > 0: + data_str = ( + "{}###################\nPackage {}\n" + "###################\n\n" + "{}\n".format(data_str, package._get_pname(), pk_str) + ) + else: + if len(pk_str.strip()) > 0: + data_str = ( + "{}###################\nPackage {}\n" + "###################\n\n" + "{}\n".format(data_str, package._get_pname(), pk_str) + ) + for model in self._models.values(): + if formal: + mod_repr = repr(model) + if len(mod_repr.strip()) > 0: + data_str = ( + "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" + "@@@@@@@@@@@@@@@@@@@@\n\n" + "{}\n".format(data_str, model.name, mod_repr) + ) + else: + mod_str = str(model) + if len(mod_str.strip()) > 0: + data_str = ( + "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" + "@@@@@@@@@@@@@@@@@@@@\n\n" + "{}\n".format(data_str, model.name, mod_str) + ) + return data_str + + @property + def model_names(self): + """ + Return a list of model names associated with this simulation. + + Returns + -------- + list: list of model names + + """ + return list(self._models.keys()) + + @property + def exchange_files(self): + """ + Return list of exchange files associated with this simulation. + + Returns + -------- + list: list of exchange names + + """ + return self._exchange_files.values() + + @staticmethod + def load( + cls_child, + sim_name="modflowsim", + version="mf6", + exe_name: Union[str, os.PathLike] = "mf6", + sim_ws: Union[str, os.PathLike] = os.curdir, + strict=True, + verbosity_level=1, + load_only=None, + verify_data=False, + write_headers=True, + lazy_io=False, + ): + """ + Load an existing model. Do not call this method directly. Should only + be called by child class. + + Parameters + ---------- + cls_child : + cls object of child class calling load + sim_name : str + Name of the simulation. + version : str + MODFLOW version + exe_name : str or PathLike + Path to MODFLOW executable (relative to the simulation workspace or absolute) + sim_ws : str or PathLike + Path to simulation workspace + strict : bool + Strict enforcement of file formatting + verbosity_level : int + Verbosity level of standard output + 0: No standard output + 1: Standard error/warning messages with some informational + messages + 2: Verbose mode with full error/warning/informational + messages. This is ideal for debugging. + load_only : list + List of package abbreviations or package names corresponding to + packages that flopy will load. default is None, which loads all + packages. the discretization packages will load regardless of this + setting. subpackages, like time series and observations, will also + load regardless of this setting. + example list: ['ic', 'maw', 'npf', 'oc', 'ims', 'gwf6-gwf6'] + verify_data : bool + Verify data when it is loaded. this can slow down loading + write_headers: bool + When true flopy writes a header to each package file indicating + that it was created by flopy + lazy_io: bool + When true flopy only reads external data when the data is requested + and only writes external data if the data has changed. This option + automatically overrides the verify_data and auto_set_sizes, turning + both off. + Returns + ------- + sim : MFSimulation object + + Examples + -------- + >>> s = flopy.mf6.mfsimulation.load('my simulation') + + """ + # initialize + instance = cls_child( + sim_name, + version, + exe_name, + sim_ws, + verbosity_level, + write_headers=write_headers, + ) + verbosity_level = instance.simulation_data.verbosity_level + + instance.simulation_data.verify_data = verify_data + if lazy_io: + instance.simulation_data.lazy_io = True + + if verbosity_level.value >= VerbosityLevel.normal.value: + print("loading simulation...") + + # build case consistent load_only dictionary for quick lookups + load_only = instance._load_only_dict(load_only) + + # load simulation name file + if verbosity_level.value >= VerbosityLevel.normal.value: + print(" loading simulation name file...") + instance.name_file.load(strict) + + # load TDIS file + tdis_pkg = f"tdis{mfstructure.MFStructure().get_version_string()}" + tdis_attr = getattr(instance.name_file, tdis_pkg) + instance._tdis_file = mftdis.ModflowTdis( + instance, filename=tdis_attr.get_data() + ) + + instance._tdis_file._filename = instance.simulation_data.mfdata[ + ("nam", "timing", tdis_pkg) + ].get_data() + if verbosity_level.value >= VerbosityLevel.normal.value: + print(" loading tdis package...") + instance._tdis_file.load(strict) + + # load models + try: + model_recarray = instance.simulation_data.mfdata[ + ("nam", "models", "models") + ] + models = model_recarray.get_data() + except MFDataException as mfde: + message = ( + "Error occurred while loading model names from the " + "simulation name file." + ) + raise MFDataException( + mfdata_except=mfde, + model=instance.name, + package="nam", + message=message, + ) + for item in models: + # resolve model working folder and name file + path, name_file = os.path.split(item[1]) + model_obj = PackageContainer.model_factory(item[0][:-1].lower()) + # load model + if verbosity_level.value >= VerbosityLevel.normal.value: + print(f" loading model {item[0].lower()}...") + instance._models[item[2]] = model_obj.load( + instance, + instance.structure.model_struct_objs[item[0].lower()], + item[2], + name_file, + version, + exe_name, + strict, + path, + load_only, + ) + + # load exchange packages and dependent packages + try: + exchange_recarray = instance.name_file.exchanges + has_exch_data = exchange_recarray.has_data() + except MFDataException as mfde: + message = ( + "Error occurred while loading exchange names from the " + "simulation name file." + ) + raise MFDataException( + mfdata_except=mfde, + model=instance.name, + package="nam", + message=message, + ) + if has_exch_data: + try: + exch_data = exchange_recarray.get_data() + except MFDataException as mfde: + message = ( + "Error occurred while loading exchange names from " + "the simulation name file." + ) + raise MFDataException( + mfdata_except=mfde, + model=instance.name, + package="nam", + message=message, + ) + for exgfile in exch_data: + if load_only is not None and not instance._in_pkg_list( + load_only, exgfile[0], exgfile[2] + ): + if ( + instance.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print(f" skipping package {exgfile[0].lower()}...") + continue + # get exchange type by removing numbers from exgtype + exchange_type = "".join( + [char for char in exgfile[0] if not char.isdigit()] + ).upper() + # get exchange number for this type + if exchange_type not in instance._exg_file_num: + exchange_file_num = 0 + instance._exg_file_num[exchange_type] = 1 + else: + exchange_file_num = instance._exg_file_num[exchange_type] + instance._exg_file_num[exchange_type] += 1 + + exchange_name = f"{exchange_type}_EXG_{exchange_file_num}" + # find package class the corresponds to this exchange type + package_obj = instance.package_factory( + exchange_type.replace("-", "").lower(), "" + ) + if not package_obj: + message = ( + "An error occurred while loading the " + "simulation name file. Invalid exchange type " + '"{}" specified.'.format(exchange_type) + ) + type_, value_, traceback_ = sys.exc_info() + raise MFDataException( + instance.name, + "nam", + "nam", + "loading simulation name file", + exchange_recarray.structure.name, + inspect.stack()[0][3], + type_, + value_, + traceback_, + message, + instance._simulation_data.debug, + ) + + # build and load exchange package object + exchange_file = package_obj( + instance, + exgtype=exgfile[0], + exgmnamea=exgfile[2], + exgmnameb=exgfile[3], + filename=exgfile[1], + pname=exchange_name, + loading_package=True, + ) + if verbosity_level.value >= VerbosityLevel.normal.value: + print( + f" loading exchange package {exchange_file._get_pname()}..." + ) + exchange_file.load(strict) + instance._exchange_files[exgfile[1]] = exchange_file + + # load simulation packages + solution_recarray = instance.simulation_data.mfdata[ + ("nam", "solutiongroup", "solutiongroup") + ] + + try: + solution_group_dict = solution_recarray.get_data() + except MFDataException as mfde: + message = ( + "Error occurred while loading solution groups from " + "the simulation name file." + ) + raise MFDataException( + mfdata_except=mfde, + model=instance.name, + package="nam", + message=message, + ) + for solution_group in solution_group_dict.values(): + for solution_info in solution_group: + if load_only is not None and not instance._in_pkg_list( + load_only, solution_info[0], solution_info[2] + ): + if ( + instance.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + f" skipping package {solution_info[0].lower()}..." + ) + continue + # create solution package + sln_package_obj = instance.package_factory( + solution_info[0][:-1].lower(), "" + ) + sln_package = sln_package_obj( + instance, + filename=solution_info[1], + pname=solution_info[2], + ) + + if verbosity_level.value >= VerbosityLevel.normal.value: + print( + f" loading solution package " + f"{sln_package._get_pname()}..." + ) + sln_package.load(strict) + + instance.simulation_data.mfpath.set_last_accessed_path() + if verify_data: + instance.check() + return instance + + def check( + self, + f: Optional[Union[str, os.PathLike]] = None, + verbose=True, + level=1, + ): + """ + Check model data for common errors. + + Parameters + ---------- + f : str or PathLike, optional + String defining file name or file handle for summary file + of check method output. If str or pathlike, a file handle + is created. If None, the method does not write results to + a summary file. (default is None) + verbose : bool + Boolean flag used to determine if check method results are + written to the screen + level : int + Check method analysis level. If level=0, summary checks are + performed. If level=1, full checks are performed. + + Returns + ------- + check list: list + Python list containing simulation check results + + Examples + -------- + + >>> import flopy + >>> m = flopy.modflow.Modflow.load('model.nam') + >>> m.check() + """ + # check instance for simulation-level check + chk_list = [] + + # check models + for model in self._models.values(): + print(f'Checking model "{model.name}"...') + chk_list.append(model.check(f, verbose, level)) + + print("Checking for missing simulation packages...") + if self._tdis_file is None: + if chk_list: + chk_list[0]._add_to_summary( + "Error", desc="\r No tdis package", package="model" + ) + print("Error: no tdis package") + if len(self._solution_files) == 0: + if chk_list: + chk_list[0]._add_to_summary( + "Error", desc="\r No solver package", package="model" + ) + print("Error: no solution package") + return chk_list + + @property + def sim_path(self) -> Path: + return Path(self.simulation_data.mfpath.get_sim_path()) + + @property + def sim_package_list(self): + """List of all "simulation level" packages""" + package_list = [] + if self._tdis_file is not None: + package_list.append(self._tdis_file) + for sim_package in self._solution_files.values(): + package_list.append(sim_package) + for sim_package in self._exchange_files.values(): + package_list.append(sim_package) + for sim_package in self._other_files.values(): + package_list.append(sim_package) + return package_list + + def load_package( + self, + ftype, + fname: Union[str, os.PathLike], + pname, + strict, + ref_path: Union[str, os.PathLike], + dict_package_name=None, + parent_package=None, + ): + """ + Load a package from a file. + + Parameters + ---------- + ftype : str + the file type + fname : str or PathLike + the path of the file containing the package input + pname : str + the user-defined name for the package + strict : bool + strict mode when loading the file + ref_path : str + path to the file. uses local path if set to None + dict_package_name : str + package name for dictionary lookup + parent_package : MFPackage + parent package + + """ + if ( + ftype in self.structure.package_struct_objs + and self.structure.package_struct_objs[ftype].multi_package_support + ) or ( + ftype in self.structure.utl_struct_objs + and self.structure.utl_struct_objs[ftype].multi_package_support + ): + # resolve dictionary name for package + if dict_package_name is not None: + if parent_package is not None: + dict_package_name = f"{parent_package.path[-1]}_{ftype}" + else: + # use dict_package_name as the base name + if ftype in self._ftype_num_dict: + self._ftype_num_dict[dict_package_name] += 1 + else: + self._ftype_num_dict[dict_package_name] = 0 + dict_package_name = "{}_{}".format( + dict_package_name, + self._ftype_num_dict[dict_package_name], + ) + else: + # use ftype as the base name + if ftype in self._ftype_num_dict: + self._ftype_num_dict[ftype] += 1 + else: + self._ftype_num_dict[ftype] = 0 + if pname is not None: + dict_package_name = pname + else: + dict_package_name = ( + f"{ftype}_{self._ftype_num_dict[ftype]}" + ) + else: + dict_package_name = ftype + + # get package object from file type + package_obj = self.package_factory(ftype, "") + # create package + package = package_obj( + self, + filename=fname, + pname=dict_package_name, + parent_file=parent_package, + loading_package=True, + ) + package.load(strict) + self._other_files[package.filename] = package + # register child package with the simulation + self._add_package(package, package.path) + if parent_package is not None: + # register child package with the parent package + parent_package._add_package(package, package.path) + return package + + def register_ims_package( + self, solution_file: MFPackage, model_list: Union[str, List[str]] + ): + self.register_solution_package(solution_file, model_list) + + def register_solution_package( + self, solution_file: MFPackage, model_list: Union[str, List[str]] + ): + """ + Register a solution package with the simulation. + + Parameters + solution_file : MFPackage + solution package to register + model_list : list of strings + list of models using the solution package to be registered + + """ + if isinstance(model_list, str): + model_list = [model_list] + + if ( + solution_file.package_type + not in mfstructure.MFStructure().flopy_dict["solution_packages"] + ): + comment = ( + 'Parameter "solution_file" is not a valid solution file. ' + 'Expected solution file, but type "{}" was given' + ".".format(type(solution_file)) + ) + type_, value_, traceback_ = sys.exc_info() + raise MFDataException( + None, + "solution", + "", + "registering solution package", + "", + inspect.stack()[0][3], + type_, + value_, + traceback_, + comment, + self.simulation_data.debug, + ) + valid_model_types = mfstructure.MFStructure().flopy_dict[ + "solution_packages" + ][solution_file.package_type] + # remove models from existing solution groups + if model_list is not None: + for model in model_list: + md = self.get_model(model) + if md is not None and ( + md.model_type not in valid_model_types + and "*" not in valid_model_types + ): + comment = ( + f"Model type {md.model_type} is not a valid type " + f"for solution file {solution_file.filename} solution " + f"file type {solution_file.package_type}. Valid model " + f"types are {valid_model_types}" + ) + type_, value_, traceback_ = sys.exc_info() + raise MFDataException( + None, + "solution", + "", + "registering solution package", + "", + inspect.stack()[0][3], + type_, + value_, + traceback_, + comment, + self.simulation_data.debug, + ) + self._remove_from_all_solution_groups(model) + + # register solution package with model list + in_simulation = False + pkg_with_same_name = None + for file in self._solution_files.values(): + if file is solution_file: + in_simulation = True + if ( + file.package_name == solution_file.package_name + and file != solution_file + ): + pkg_with_same_name = file + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + "WARNING: solution package with name {} already exists. " + "New solution package will replace old package" + ".".format(file.package_name) + ) + self._remove_package(self._solution_files[file.filename]) + del self._solution_files[file.filename] + break + # register solution package + if not in_simulation: + self._add_package( + solution_file, self._get_package_path(solution_file) + ) + # do not allow a solution package to be registered twice with the + # same simulation + if not in_simulation: + # create unique file/package name + if solution_file.package_name is None: + file_num = len(self._solution_files) - 1 + solution_file.package_name = ( + f"{solution_file.package_type}_{file_num}" + ) + if solution_file.filename in self._solution_files: + solution_file.filename = MFFileMgmt.unique_file_name( + solution_file.filename, self._solution_files + ) + # add solution package to simulation + self._solution_files[solution_file.filename] = solution_file + + # If solution file is being replaced, replace solution filename in + # solution group + if pkg_with_same_name is not None and self._is_in_solution_group( + pkg_with_same_name.filename, 1 + ): + # change existing solution group to reflect new solution file + self._replace_solution_in_solution_group( + pkg_with_same_name.filename, 1, solution_file.filename + ) + # only allow solution package to be registered to one solution group + elif model_list is not None: + sln_file_in_group = self._is_in_solution_group( + solution_file.filename, 1 + ) + # add solution group to the simulation name file + solution_recarray = self.name_file.solutiongroup + solution_group_list = solution_recarray.get_active_key_list() + if len(solution_group_list) == 0: + solution_group_num = 0 + else: + solution_group_num = solution_group_list[-1][0] + + if sln_file_in_group: + self._append_to_solution_group( + solution_file.filename, model_list + ) + else: + if self.name_file.mxiter.get_data(solution_group_num) is None: + self.name_file.mxiter.add_transient_key(solution_group_num) + + # associate any models in the model list to this + # simulation file + version_string = mfstructure.MFStructure().get_version_string() + solution_pkg = f"{solution_file.package_abbr}{version_string}" + new_record = [solution_pkg, solution_file.filename] + for model in model_list: + new_record.append(model) + try: + solution_recarray.append_list_as_record( + new_record, solution_group_num + ) + except MFDataException as mfde: + message = ( + "Error occurred while updating the " + "simulation name file with the solution package " + 'file "{}".'.format(solution_file.filename) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + + @staticmethod + def _rename_package_group(group_dict, name): + package_type_count = {} + # first build an array to avoid key modification errors + package_array = [] + for package in group_dict.values(): + package_array.append(package) + # update package file names and count + for package in package_array: + if package.package_type not in package_type_count: + file_name = f"{name}.{package.package_type}" + package_type_count[package.package_type] = 1 + else: + package_type_count[package.package_type] += 1 + ptc = package_type_count[package.package_type] + file_name = f"{name}_{ptc}.{package.package_type}" + base_filepath = os.path.split(package.filename)[0] + if base_filepath != "": + # include existing relative path in new file name + file_name = os.path.join(base_filepath, file_name) + package.filename = file_name + + def _rename_exchange_file(self, package, new_filename): + self._exchange_files[package.filename] = package + try: + exchange_recarray_data = self.name_file.exchanges.get_data() + except MFDataException as mfde: + message = ( + "An error occurred while retrieving exchange " + "data from the simulation name file. The error " + "occurred while registering exchange file " + f'"{package.filename}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + if exchange_recarray_data is not None: + for index, exchange in zip( + range(0, len(exchange_recarray_data)), + exchange_recarray_data, + ): + if exchange[1] == package.filename: + # update existing exchange + exchange_recarray_data[index][1] = new_filename + ex_recarray = self.name_file.exchanges + try: + ex_recarray.set_data(exchange_recarray_data) + except MFDataException as mfde: + message = ( + "An error occurred while setting " + "exchange data in the simulation name " + "file. The error occurred while " + "registering the following " + "values (exgtype, filename, " + f'exgmnamea, exgmnameb): "{package.exgtype} ' + f"{package.filename} {package.exgmnamea}" + f'{package.exgmnameb}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + return + + def _set_timing_block(self, file_name): + struct_root = mfstructure.MFStructure() + tdis_pkg = f"tdis{struct_root.get_version_string()}" + tdis_attr = getattr(self.name_file, tdis_pkg) + try: + tdis_attr.set_data(file_name) + except MFDataException as mfde: + message = ( + "An error occurred while setting the tdis package " + f'file name "{file_name}". The error occurred while ' + "registering the tdis package with the " + "simulation" + ) + raise MFDataException( + mfdata_except=mfde, + package=file_name, + message=message, + ) + + def update_package_filename(self, package, new_name): + """ + Updates internal arrays to be consistent with a new file name. + This is for internal flopy library use only. + + Parameters + ---------- + package: MFPackage + Package with new name + new_name: str + Package's new name + + """ + if ( + self._tdis_file is not None + and package.filename == self._tdis_file.filename + ): + self._set_timing_block(new_name) + elif package.filename in self._exchange_files: + self._exchange_files[new_name] = self._exchange_files.pop( + package.filename + ) + self._rename_exchange_file(package, new_name) + elif package.filename in self._solution_files: + self._solution_files[new_name] = self._solution_files.pop( + package.filename + ) + self._update_solution_group(package.filename, new_name) + else: + self._other_files[new_name] = self._other_files.pop( + package.filename + ) + + def rename_all_packages(self, name): + """ + Rename all packages with name as prefix. + + Parameters + ---------- + name: str + Prefix of package names + + """ + if self._tdis_file is not None: + self._tdis_file.filename = f"{name}.{self._tdis_file.package_type}" + + self._rename_package_group(self._exchange_files, name) + self._rename_package_group(self._solution_files, name) + self._rename_package_group(self._other_files, name) + for model in self._models.values(): + model.rename_all_packages(name) + + def set_all_data_external( + self, check_data=True, external_data_folder=None + ): + """Sets the simulation's list and array data to be stored externally. + + Parameters + ---------- + check_data: bool + Determines if data error checking is enabled during this + process. Data error checking can be slow on large datasets. + external_data_folder: str or PathLike + Path relative to the simulation path or model relative path + (see use_model_relative_path parameter), where external data + will be stored + """ + # copy any files whose paths have changed + self.simulation_data.mfpath.copy_files() + # set data external for all packages in all models + for model in self._models.values(): + model.set_all_data_external(check_data, external_data_folder) + # set data external for solution packages + for package in self._solution_files.values(): + package.set_all_data_external(check_data, external_data_folder) + # set data external for other packages + for package in self._other_files.values(): + package.set_all_data_external(check_data, external_data_folder) + for package in self._exchange_files.values(): + package.set_all_data_external(check_data, external_data_folder) + + def set_all_data_internal(self, check_data=True): + # set data external for all packages in all models + for model in self._models.values(): + model.set_all_data_internal(check_data) + # set data external for solution packages + for package in self._solution_files.values(): + package.set_all_data_internal(check_data) + # set data external for other packages + for package in self._other_files.values(): + package.set_all_data_internal(check_data) + # set data external for exchange packages + for package in self._exchange_files.values(): + package.set_all_data_internal(check_data) + + def write_simulation( + self, ext_file_action=ExtFileAction.copy_relative_paths, silent=False + ): + """ + Write the simulation to files. + + Parameters + ext_file_action : ExtFileAction + Defines what to do with external files when the simulation + path has changed. Defaults to copy_relative_paths which + copies only files with relative paths, leaving files defined + by absolute paths fixed. + silent : bool + Writes out the simulation in silent mode (verbosity_level = 0) + + """ + sim_data = self.simulation_data + if not sim_data.max_columns_user_set: + # search for dis packages + for model in self._models.values(): + dis = model.get_package("dis") + if dis is not None and hasattr(dis, "ncol"): + sim_data.max_columns_of_data = dis.ncol.get_data() + sim_data.max_columns_user_set = False + sim_data.max_columns_auto_set = True + + saved_verb_lvl = self.simulation_data.verbosity_level + if silent: + self.simulation_data.verbosity_level = VerbosityLevel.quiet + + # write simulation name file + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print("writing simulation...") + print(" writing simulation name file...") + self.name_file.write(ext_file_action=ext_file_action) + + # write TDIS file + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print(" writing simulation tdis package...") + self._tdis_file.write(ext_file_action=ext_file_action) + + # write solution files + for solution_file in self._solution_files.values(): + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + f" writing solution package " + f"{solution_file._get_pname()}..." + ) + solution_file.write(ext_file_action=ext_file_action) + + # write exchange files + for exchange_file in self._exchange_files.values(): + exchange_file.write() + + # write other packages + for pp in self._other_files.values(): + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print(f" writing package {pp._get_pname()}...") + pp.write(ext_file_action=ext_file_action) + + # FIX: model working folder should be model name file folder + + # write models + for model in self._models.values(): + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print(f" writing model {model.name}...") + model.write(ext_file_action=ext_file_action) + + self.simulation_data.mfpath.set_last_accessed_path() + + if silent: + self.simulation_data.verbosity_level = saved_verb_lvl + + def set_sim_path(self, path: Union[str, os.PathLike]): + """Return a list of output data keys. + + Parameters + ---------- + path : str + Relative or absolute path to simulation root folder. + + """ + # set all data internal + self.set_all_data_internal() + + # set simulation path + self.simulation_data.mfpath.set_sim_path(path, True) + + if not os.path.exists(path): + # create new simulation folder + os.makedirs(path) + + def run_simulation( + self, + silent=None, + pause=False, + report=False, + processors=None, + normal_msg="normal termination", + use_async=False, + cargs=None, + ): + """ + Run the simulation. + + Parameters + ---------- + silent: bool + Run in silent mode + pause: bool + Pause at end of run + report: bool + Save stdout lines to a list (buff) + processors: int + Number of processors. Parallel simulations are only supported + for MODFLOW 6 simulations. (default is None) + normal_msg: str or list + Normal termination message used to determine if the run + terminated normally. More than one message can be provided + using a list. (default is 'normal termination') + use_async : bool + Asynchronously read model stdout and report with timestamps. + good for models that take long time to run. not good for + models that run really fast + cargs : str or list of strings + Additional command line arguments to pass to the executable. + default is None + + Returns + -------- + success : bool + buff : list of lines of stdout + + """ + if silent is None: + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + silent = False + else: + silent = True + return run_model( + self.exe_name, + None, + self.simulation_data.mfpath.get_sim_path(), + silent=silent, + pause=pause, + report=report, + processors=processors, + normal_msg=normal_msg, + use_async=use_async, + cargs=cargs, + ) + + def delete_output_files(self): + """Deletes simulation output files.""" + output_req = binaryfile_utils.MFOutputRequester + output_file_keys = output_req.getkeys( + self.simulation_data.mfdata, self.simulation_data.mfpath, False + ) + for path in output_file_keys.binarypathdict.values(): + if os.path.isfile(path): + os.remove(path) + + def remove_package(self, package_name): + """ + Removes package from the simulation. `package_name` can be the + package's name, type, or package object to be removed from the model. + + Parameters + ---------- + package_name : str + Name of package to be removed + + """ + if isinstance(package_name, MFPackage): + packages = [package_name] + else: + packages = self.get_package(package_name) + if not isinstance(packages, list): + packages = [packages] + for package in packages: + if ( + self._tdis_file is not None + and package.path == self._tdis_file.path + ): + self._tdis_file = None + if package.filename in self._exchange_files: + del self._exchange_files[package.filename] + if package.filename in self._solution_files: + del self._solution_files[package.filename] + self._update_solution_group(package.filename) + if package.filename in self._other_files: + del self._other_files[package.filename] + + self._remove_package(package) + + @property + def model_dict(self): + """ + Return a dictionary of models associated with this simulation. + + Returns + -------- + model dict : dict + dictionary of models + + """ + return self._models.copy() + + def get_model(self, model_name=None): + """ + Returns the models in the simulation with a given model name, name + file name, or model type. + + Parameters + ---------- + model_name : str + Name of the model to get. Passing in None or an empty list + will get the first model. + + Returns + -------- + model : MFModel + + """ + if len(self._models) == 0: + return None + + if model_name is None: + for model in self._models.values(): + return model + if model_name in self._models: + return self._models[model_name] + # do case-insensitive lookup + for name, model in self._models.items(): + if model_name.lower() == name.lower(): + return model + return None + + def get_exchange_file(self, filename): + """ + Get a specified exchange file. + + Parameters + ---------- + filename : str + Name of exchange file to get + + Returns + -------- + exchange package : MFPackage + + """ + if filename in self._exchange_files: + return self._exchange_files[filename] + else: + excpt_str = f'Exchange file "{filename}" can not be found.' + raise FlopyException(excpt_str) + + def get_file(self, filename): + """ + Get a specified file. + + Parameters + ---------- + filename : str + Name of mover file to get + + Returns + -------- + mover package : MFPackage + + """ + if filename in self._other_files: + return self._other_files[filename] + else: + excpt_str = f'file "{filename}" can not be found.' + raise FlopyException(excpt_str) + + def get_mvr_file(self, filename): + """ + Get a specified mover file. + + Parameters + ---------- + filename : str + Name of mover file to get + + Returns + -------- + mover package : MFPackage + + """ + warnings.warn( + "get_mvr_file will be deprecated and will be removed in version " + "3.3.6. Use get_file", + PendingDeprecationWarning, + ) + if filename in self._other_files: + return self._other_files[filename] + else: + excpt_str = f'MVR file "{filename}" can not be found.' + raise FlopyException(excpt_str) + + def get_mvt_file(self, filename): + """ + Get a specified mvt file. + + Parameters + ---------- + filename : str + Name of mover transport file to get + + Returns + -------- + mover transport package : MFPackage + + """ + warnings.warn( + "get_mvt_file will be deprecated and will be removed in version " + "3.3.6. Use get_file", + PendingDeprecationWarning, + ) + if filename in self._other_files: + return self._other_files[filename] + else: + excpt_str = f'MVT file "{filename}" can not be found.' + raise FlopyException(excpt_str) + + def get_gnc_file(self, filename): + """ + Get a specified gnc file. + + Parameters + ---------- + filename : str + Name of gnc file to get + + Returns + -------- + gnc package : MFPackage + + """ + warnings.warn( + "get_gnc_file will be deprecated and will be removed in version " + "3.3.6. Use get_file", + PendingDeprecationWarning, + ) + if filename in self._other_files: + return self._other_files[filename] + else: + excpt_str = f'GNC file "{filename}" can not be found.' + raise FlopyException(excpt_str) + + def remove_exchange_file(self, package): + """ + Removes the exchange file "package". This is for internal flopy + library use only. + + Parameters + ---------- + package: MFPackage + Exchange package to be removed + + """ + self._exchange_files[package.filename] = package + try: + exchange_recarray_data = self.name_file.exchanges.get_data() + except MFDataException as mfde: + message = ( + "An error occurred while retrieving exchange " + "data from the simulation name file. The error " + "occurred while registering exchange file " + f'"{package.filename}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + remove_indices = [] + if exchange_recarray_data is not None: + for index, exchange in zip( + range(0, len(exchange_recarray_data)), + exchange_recarray_data, + ): + if ( + package.filename is not None + and exchange[1] == package.filename + ): + remove_indices.append(index) + if len(remove_indices) > 0: + self.name_file.exchanges.set_data( + np.delete(exchange_recarray_data, remove_indices) + ) + + def register_exchange_file(self, package): + """ + Register an exchange package file with the simulation. This is a + call-back method made from the package and should not be called + directly. + + Parameters + ---------- + package : MFPackage + Exchange package object to register + + """ + if package.filename not in self._exchange_files: + exgtype = package.exgtype + exgmnamea = package.exgmnamea + exgmnameb = package.exgmnameb + + if exgtype is None or exgmnamea is None or exgmnameb is None: + excpt_str = ( + "Exchange packages require that exgtype, " + "exgmnamea, and exgmnameb are specified." + ) + raise FlopyException(excpt_str) + + self._exchange_files[package.filename] = package + try: + exchange_recarray_data = self.name_file.exchanges.get_data() + except MFDataException as mfde: + message = ( + "An error occurred while retrieving exchange " + "data from the simulation name file. The error " + "occurred while registering exchange file " + f'"{package.filename}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + if exchange_recarray_data is not None: + for index, exchange in zip( + range(0, len(exchange_recarray_data)), + exchange_recarray_data, + ): + if exchange[1] == package.filename: + # update existing exchange + exchange_recarray_data[index][0] = exgtype + exchange_recarray_data[index][2] = exgmnamea + exchange_recarray_data[index][3] = exgmnameb + ex_recarray = self.name_file.exchanges + try: + ex_recarray.set_data(exchange_recarray_data) + except MFDataException as mfde: + message = ( + "An error occurred while setting " + "exchange data in the simulation name " + "file. The error occurred while " + "registering the following " + "values (exgtype, filename, " + f'exgmnamea, exgmnameb): "{exgtype} ' + f"{package.filename} {exgmnamea}" + f'{exgmnameb}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + return + try: + # add new exchange + self.name_file.exchanges.append_data( + [(exgtype, package.filename, exgmnamea, exgmnameb)] + ) + except MFDataException as mfde: + message = ( + "An error occurred while setting exchange data " + "in the simulation name file. The error occurred " + "while registering the following values (exgtype, " + f'filename, exgmnamea, exgmnameb): "{exgtype} ' + f'{package.filename} {exgmnamea} {exgmnameb}".' + ) + raise MFDataException( + mfdata_except=mfde, + package=package._get_pname(), + message=message, + ) + if ( + package.dimensions is None + or package.dimensions.model_dim is None + ): + # resolve exchange package dimensions object + package.dimensions = package.create_package_dimensions() + + def _remove_package_by_type(self, package): + pname = None + if package.package_name is not None: + pname = package.package_name.lower() + if ( + package.package_type.lower() == "tdis" + and self._tdis_file is not None + and self._tdis_file in self._packagelist + ): + # tdis package already exists. there can be only one tdis + # package. remove existing tdis package + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + "WARNING: tdis package already exists. Replacing " + "existing tdis package." + ) + self._remove_package(self._tdis_file) + elif ( + package.package_type.lower() + in mfstructure.MFStructure().flopy_dict["solution_packages"] + and pname in self.package_name_dict + ): + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + "WARNING: Package with name " + f"{package.package_name.lower()} already exists. " + "Replacing existing package." + ) + self._remove_package(self.package_name_dict[pname]) + else: + if ( + package.filename in self._other_files + and self._other_files[package.filename] in self._packagelist + ): + # other package with same file name already exists. remove old + # package + if ( + self.simulation_data.verbosity_level.value + >= VerbosityLevel.normal.value + ): + print( + f"WARNING: package with name {pname} already exists. " + "Replacing existing package." + ) + self._remove_package(self._other_files[package.filename]) + del self._other_files[package.filename] + + def register_package( + self, + package, + add_to_package_list=True, + set_package_name=True, + set_package_filename=True, + ): + """ + Register a package file with the simulation. This is a + call-back method made from the package and should not be called + directly. + + Parameters + ---------- + package : MFPackage + Package to register + add_to_package_list : bool + Add package to lookup list + set_package_name : bool + Produce a package name for this package + set_package_filename : bool + Produce a filename for this package + + Returns + -------- + (path : tuple, package structure : MFPackageStructure) + + """ + if set_package_filename: + # set initial package filename + base_name = os.path.basename(os.path.normpath(self.name)) + package._filename = f"{base_name}.{package.package_type}" + + package.container_type = [PackageContainerType.simulation] + path = self._get_package_path(package) + if add_to_package_list and package.package_type.lower != "nam": + if ( + package.package_type.lower() + not in mfstructure.MFStructure().flopy_dict[ + "solution_packages" + ] + ): + # all but solution packages get added here. solution packages + # are added during solution package registration + self._remove_package_by_type(package) + self._add_package(package, path) + sln_dict = mfstructure.MFStructure().flopy_dict["solution_packages"] + if package.package_type.lower() == "nam": + if not package.internal_package: + excpt_str = ( + "Unable to register nam file. Do not create your own nam " + "files. Nam files are automatically created and managed " + "for you by FloPy." + ) + print(excpt_str) + raise FlopyException(excpt_str) + return path, self.structure.name_file_struct_obj + elif package.package_type.lower() == "tdis": + self._tdis_file = package + self._set_timing_block(package.quoted_filename) + return ( + path, + self.structure.package_struct_objs[ + package.package_type.lower() + ], + ) + elif package.package_type.lower() in sln_dict: + supported_packages = sln_dict[package.package_type.lower()] + # default behavior is to register the solution package with the + # first unregistered model + unregistered_models = [] + for model_name, model in self._models.items(): + model_registered = self._is_in_solution_group( + model_name, 2, True + ) + if not model_registered and ( + model.model_type in supported_packages + or "*" in supported_packages + ): + unregistered_models.append(model_name) + if unregistered_models: + self.register_solution_package(package, unregistered_models) + else: + self.register_solution_package(package, None) + return ( + path, + self.structure.package_struct_objs[ + package.package_type.lower() + ], + ) + else: + if package.filename not in self._other_files: + self._other_files[package.filename] = package + else: + # auto generate a unique file name and register it + file_name = MFFileMgmt.unique_file_name( + package.filename, self._other_files + ) + package.filename = file_name + self._other_files[file_name] = package + + if package.package_type.lower() in self.structure.package_struct_objs: + return ( + path, + self.structure.package_struct_objs[ + package.package_type.lower() + ], + ) + elif package.package_type.lower() in self.structure.utl_struct_objs: + return ( + path, + self.structure.utl_struct_objs[package.package_type.lower()], + ) + else: + excpt_str = ( + 'Invalid package type "{}". Unable to register ' + "package.".format(package.package_type) + ) + print(excpt_str) + raise FlopyException(excpt_str) + + def rename_model_namefile(self, model, new_namefile): + """ + Rename a model's namefile. For internal flopy library use only. + + Parameters + ---------- + model : MFModel + Model object whose namefile to rename + new_namefile : str + Name of the new namefile + + """ + # update simulation name file + models = self.name_file.models.get_data() + for mdl in models: + path, name_file_name = os.path.split(mdl[1]) + if name_file_name == model.name_file.filename: + mdl[1] = os.path.join(path, new_namefile) + self.name_file.models.set_data(models) + + def register_model(self, model, model_type, model_name, model_namefile): + """ + Add a model to the simulation. This is a call-back method made + from the package and should not be called directly. + + Parameters + ---------- + model : MFModel + Model object to add to simulation + sln_group : str + Solution group of model + + Returns + -------- + model_structure_object : MFModelStructure + """ + + # get model structure from model type + if model_type not in self.structure.model_struct_objs: + message = f'Invalid model type: "{model_type}".' + type_, value_, traceback_ = sys.exc_info() + raise MFDataException( + model.name, + "", + model.name, + "registering model", + "sim", + inspect.stack()[0][3], + type_, + value_, + traceback_, + message, + self.simulation_data.debug, + ) + + # add model + self._models[model_name] = model + + # update simulation name file + self.name_file.models.append_list_as_record( + [model_type, model_namefile, model_name] + ) + + if len(self._solution_files) > 0: + # register model with first solution file found + first_solution_key = next(iter(self._solution_files)) + self.register_solution_package( + self._solution_files[first_solution_key], model_name + ) + + return self.structure.model_struct_objs[model_type] + + def get_ims_package(self, key): + warnings.warn( + "get_ims_package() has been deprecated and will be " + "removed in version 3.3.7. Use " + "get_solution_package() instead.", + DeprecationWarning, + ) + return self.get_solution_package(key) + + def get_solution_package(self, key): + """ + Get the solution package with the specified `key`. + + Parameters + ---------- + key : str + solution package file name + + Returns + -------- + solution_package : MFPackage + + """ + if key in self._solution_files: + return self._solution_files[key] + return None + + def remove_model(self, model_name): + """ + Remove model with name `model_name` from the simulation + + Parameters + ---------- + model_name : str + Model name to remove from simulation + + """ + # Remove model + del self._models[model_name] + + # TODO: Fully implement this + # Update simulation name file + + def is_valid(self): + """ + Checks the validity of the solution and all of its models and + packages. Returns true if the solution is valid, false if it is not. + + + Returns + -------- + valid : bool + Whether this is a valid simulation + + """ + # name file valid + if not self.name_file.is_valid(): + return False + + # tdis file valid + if not self._tdis_file.is_valid(): + return False + + # exchanges valid + for exchange in self._exchange_files: + if not exchange.is_valid(): + return False + + # solution files valid + for solution_file in self._solution_files.values(): + if not solution_file.is_valid(): + return False + + # a model exists + if not self._models: + return False + + # models valid + for key in self._models: + if not self._models[key].is_valid(): + return False + + # each model has a solution file + + return True + + @staticmethod + def _resolve_verbosity_level(verbosity_level): + if verbosity_level == 0: + return VerbosityLevel.quiet + elif verbosity_level == 1: + return VerbosityLevel.normal + elif verbosity_level == 2: + return VerbosityLevel.verbose + else: + return verbosity_level + + @staticmethod + def _get_package_path(package): + if package.parent_file is not None: + return (package.parent_file.path) + (package.package_type,) + else: + return (package.package_type,) + + def _update_solution_group(self, solution_file, new_name=None): + solution_recarray = self.name_file.solutiongroup + for solution_group_num in solution_recarray.get_active_key_list(): + try: + rec_array = solution_recarray.get_data(solution_group_num[0]) + except MFDataException as mfde: + message = ( + "An error occurred while getting solution group" + '"{}" from the simulation name file' + ".".format(solution_group_num[0]) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + + new_array = [] + for record in rec_array: + if record.slnfname == solution_file: + if new_name is not None: + record.slnfname = new_name + new_array.append(tuple(record)) + else: + continue + else: + new_array.append(record) + + if not new_array: + new_array = None + + solution_recarray.set_data(new_array, solution_group_num[0]) + + def _remove_from_all_solution_groups(self, modelname): + solution_recarray = self.name_file.solutiongroup + for solution_group_num in solution_recarray.get_active_key_list(): + try: + rec_array = solution_recarray.get_data(solution_group_num[0]) + except MFDataException as mfde: + message = ( + "An error occurred while getting solution group" + '"{}" from the simulation name file' + ".".format(solution_group_num[0]) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + new_array = ["no_check"] + for index, record in enumerate(rec_array): + new_record = [] + new_record.append(record[0]) + new_record.append(record[1]) + for item in list(record)[2:]: + if item is not None and item.lower() != modelname.lower(): + new_record.append(item) + new_array.append(tuple(new_record)) + solution_recarray.set_data(new_array, solution_group_num[0]) + + def _append_to_solution_group(self, solution_file, new_models): + # clear models out of solution groups + if new_models is not None: + for model in new_models: + self._remove_from_all_solution_groups(model) + + # append models to solution_file + solution_recarray = self.name_file.solutiongroup + for solution_group_num in solution_recarray.get_active_key_list(): + try: + rec_array = solution_recarray.get_data(solution_group_num[0]) + except MFDataException as mfde: + message = ( + "An error occurred while getting solution group" + '"{}" from the simulation name file' + ".".format(solution_group_num[0]) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + new_array = [] + for index, record in enumerate(rec_array): + new_record = [] + rec_model_dict = {} + for index, item in enumerate(record): + if ( + record[1] == solution_file or item not in new_models + ) and item is not None: + new_record.append(item) + if index > 1 and item is not None: + rec_model_dict[item.lower()] = 1 + + if record[1] == solution_file: + for model in new_models: + if model.lower() not in rec_model_dict: + new_record.append(model) + + new_array.append(tuple(new_record)) + solution_recarray.set_data(new_array, solution_group_num[0]) + + def _replace_solution_in_solution_group(self, item, index, new_item): + solution_recarray = self.name_file.solutiongroup + for solution_group_num in solution_recarray.get_active_key_list(): + try: + rec_array = solution_recarray.get_data(solution_group_num[0]) + except MFDataException as mfde: + message = ( + "An error occurred while getting solution group" + '"{}" from the simulation name file. The error ' + 'occurred while replacing solution file "{}" with "{}"' + 'at index "{}"'.format( + solution_group_num[0], item, new_item, index + ) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + if rec_array is not None: + for rec_item in rec_array: + if rec_item[index] == item: + rec_item[index] = new_item + + def _is_in_solution_group(self, item, index, any_idx_after=False): + solution_recarray = self.name_file.solutiongroup + for solution_group_num in solution_recarray.get_active_key_list(): + try: + rec_array = solution_recarray.get_data(solution_group_num[0]) + except MFDataException as mfde: + message = ( + "An error occurred while getting solution group" + '"{}" from the simulation name file. The error ' + 'occurred while verifying file "{}" at index "{}" ' + "is in the simulation name file" + ".".format(solution_group_num[0], item, index) + ) + raise MFDataException( + mfdata_except=mfde, package="nam", message=message + ) + + if rec_array is not None: + for rec_item in rec_array: + if any_idx_after: + for idx in range(index, len(rec_item)): + if rec_item[idx] == item: + return True + else: + if rec_item[index] == item: + return True + return False + + def plot( + self, + model_list: Optional[Union[str, List[str]]] = None, + SelPackList=None, + **kwargs, + ): + """ + Plot simulation or models. + + Method to plot a whole simulation or a series of models + that are part of a simulation. + + Parameters + ---------- + model_list: list, optional + List of model names to plot, if none all models will be plotted + SelPackList: list, optional + List of package names to plot, if none all packages will be + plotted + kwargs: + filename_base : str + Base file name that will be used to automatically + generate file names for output image files. Plots will be + exported as image files if file_name_base is not None. + (default is None) + file_extension : str + Valid matplotlib.pyplot file extension for savefig(). + Only used if filename_base is not None. (default is 'png') + mflay : int + MODFLOW zero-based layer number to return. If None, then + all layers will be included. (default is None) + kper : int + MODFLOW zero-based stress period number to return. + (default is zero) + key : str + MFList dictionary key. (default is None) + + Returns + -------- + axes: (list) + matplotlib.pyplot.axes objects + + """ + from ...plot.plotutil import PlotUtilities + + axes = PlotUtilities._plot_simulation_helper( + self, model_list=model_list, SelPackList=SelPackList, **kwargs + ) + return axes diff --git a/flopy/mf6/modflow/mfgwf.py b/flopy/mf6/modflow/mfgwf.py index 9601fb38c..eb88d83e6 100644 --- a/flopy/mf6/modflow/mfgwf.py +++ b/flopy/mf6/modflow/mfgwf.py @@ -90,7 +90,6 @@ def __init__( print_flows=None, save_flows=None, newtonoptions=None, - packages=None, **kwargs, ): super().__init__( @@ -109,7 +108,12 @@ def __init__( self.name_file.print_flows.set_data(print_flows) self.name_file.save_flows.set_data(save_flows) self.name_file.newtonoptions.set_data(newtonoptions) - self.name_file.packages.set_data(packages) + + self.list = self.name_file.list + self.print_input = self.name_file.print_input + self.print_flows = self.name_file.print_flows + self.save_flows = self.name_file.save_flows + self.newtonoptions = self.name_file.newtonoptions @classmethod def load( @@ -125,6 +129,7 @@ def load( load_only=None, ): return mfmodel.MFModel.load_base( + cls, simulation, structure, modelname, diff --git a/flopy/mf6/modflow/mfgwt.py b/flopy/mf6/modflow/mfgwt.py index afd4d7f9c..d182e3d64 100644 --- a/flopy/mf6/modflow/mfgwt.py +++ b/flopy/mf6/modflow/mfgwt.py @@ -84,7 +84,6 @@ def __init__( print_input=None, print_flows=None, save_flows=None, - packages=None, **kwargs, ): super().__init__( @@ -102,7 +101,11 @@ def __init__( self.name_file.print_input.set_data(print_input) self.name_file.print_flows.set_data(print_flows) self.name_file.save_flows.set_data(save_flows) - self.name_file.packages.set_data(packages) + + self.list = self.name_file.list + self.print_input = self.name_file.print_input + self.print_flows = self.name_file.print_flows + self.save_flows = self.name_file.save_flows @classmethod def load( @@ -118,6 +121,7 @@ def load( load_only=None, ): return mfmodel.MFModel.load_base( + cls, simulation, structure, modelname, diff --git a/flopy/mf6/modflow/mfsimulation.py b/flopy/mf6/modflow/mfsimulation.py index 3e9304ffc..cde15d3a4 100644 --- a/flopy/mf6/modflow/mfsimulation.py +++ b/flopy/mf6/modflow/mfsimulation.py @@ -1,389 +1,14 @@ -import errno -import inspect -import os.path -import sys -import warnings -from pathlib import Path -from typing import List, Optional, Union +# DO NOT MODIFY THIS FILE DIRECTLY. THIS FILE MUST BE CREATED BY +# mf6/utils/createpackages.py +# FILE created on June 22, 2023 21:13:41 UTC +import os +from typing import Union -import numpy as np +from .. import mfsimbase -from ...mbase import run_model -from ..data import mfstructure -from ..data.mfdatautil import MFComment -from ..data.mfstructure import DatumType -from ..mfbase import ( - ExtFileAction, - FlopyException, - MFDataException, - MFFileMgmt, - PackageContainer, - PackageContainerType, - VerbosityLevel, -) -from ..mfpackage import MFPackage -from ..modflow import mfnam, mftdis -from ..utils import binaryfile_utils, mfobservation - -class SimulationDict(dict): - """ - Class containing custom dictionary for MODFLOW simulations. Dictionary - contains model data. Dictionary keys are "paths" to the data that include - the model and package containing the data. - - Behaves as an dict with some additional features described below. - - Parameters - ---------- - path : MFFileMgmt - Object containing path information for the simulation - - """ - - def __init__(self, path=None): - dict.__init__(self) - self._path = path - - def __getitem__(self, key): - """ - Define the __getitem__ magic method. - - Parameters - ---------- - key (string): Part or all of a dictionary key - - Returns: - MFData or numpy.ndarray - - """ - if key == "_path" or not hasattr(self, "_path"): - raise AttributeError(key) - - # FIX: Transport - Include transport output files - if key[1] in ("CBC", "HDS", "DDN", "UCN"): - val = binaryfile_utils.MFOutput(self, self._path, key) - return val.data - - elif key[-1] == "Observations": - val = mfobservation.MFObservation(self, self._path, key) - return val.data - - if key in self: - val = dict.__getitem__(self, key) - return val - return AttributeError(key) - - def __setitem__(self, key, val): - """ - Define the __setitem__ magic method. - - Parameters - ---------- - key : str - Dictionary key - val : MFData - MFData to store in dictionary - - """ - dict.__setitem__(self, key, val) - - def find_in_path(self, key_path, key_leaf): - """ - Attempt to find key_leaf in a partial key path key_path. - - Parameters - ---------- - key_path : str - partial path to the data - key_leaf : str - name of the data - - Returns - ------- - Data: MFData, - index: int - - """ - key_path_size = len(key_path) - for key, item in self.items(): - if key[:key_path_size] == key_path: - if key[-1] == key_leaf: - # found key_leaf as a key in the dictionary - return item, None - if not isinstance(item, MFComment): - data_item_index = 0 - data_item_structures = item.structure.data_item_structures - for data_item_struct in data_item_structures: - if data_item_struct.name == key_leaf: - # found key_leaf as a data item name in the data - # in the dictionary - return item, data_item_index - if data_item_struct.type != DatumType.keyword: - data_item_index += 1 - return None, None - - def output_keys(self, print_keys=True): - """ - Return a list of output data keys supported by the dictionary. - - Parameters - ---------- - print_keys : bool - print keys to console - - Returns - ------- - output keys : list - - """ - # get keys to request binary output - x = binaryfile_utils.MFOutputRequester.getkeys( - self, self._path, print_keys=print_keys - ) - return [key for key in x.dataDict] - - def input_keys(self): - """ - Return a list of input data keys. - - Returns - ------- - input keys : list - - """ - # get keys to request input ie. package data - for key in self: - print(key) - - def observation_keys(self): - """ - Return a list of observation keys. - - Returns - ------- - observation keys : list - - """ - # get keys to request observation file output - mfobservation.MFObservationRequester.getkeys(self, self._path) - - def keys(self): - """ - Return a list of all keys. - - Returns - ------- - all keys : list - - """ - # overrides the built in keys to print all keys, input and output - self.input_keys() - try: - self.output_keys() - except OSError as e: - if e.errno == errno.EEXIST: - pass - try: - self.observation_keys() - except KeyError: - pass - - -class MFSimulationData: +class MFSimulation(mfsimbase.MFSimulationBase): """ - Class containing MODFLOW simulation data and file formatting data. Use - MFSimulationData to set simulation-wide settings which include data - formatting and file location settings. - - Parameters - ---------- - path : str - path on disk to the simulation - - Attributes - ---------- - indent_string : str - String used to define how much indent to use (file formatting) - internal_formatting : list - List defining string to use for internal formatting - external_formatting : list - List defining string to use for external formatting - open_close_formatting : list - List defining string to use for open/close - max_columns_of_data : int - Maximum columns of data before line wraps. For structured grids this - is set to ncol by default. For all other grids the default is 20. - wrap_multidim_arrays : bool - Whether to wrap line for multi-dimensional arrays at the end of a - row/column/layer - _float_precision : int - Number of decimal points to write for a floating point number - _float_characters : int - Number of characters a floating point number takes up - write_headers: bool - When true flopy writes a header to each package file indicating that - it was created by flopy - sci_note_upper_thres : float - Numbers greater than this threshold are written in scientific notation - sci_note_lower_thres : float - Numbers less than this threshold are written in scientific notation - mfpath : MFFileMgmt - File path location information for the simulation - model_dimensions : dict - Dictionary containing discretization information for each model - mfdata : SimulationDict - Custom dictionary containing all model data for the simulation - - """ - - def __init__(self, path: Union[str, os.PathLike], mfsim): - # --- formatting variables --- - self.indent_string = " " - self.constant_formatting = ["constant", ""] - self._max_columns_of_data = 20 - self.wrap_multidim_arrays = True - self._float_precision = 8 - self._float_characters = 15 - self.write_headers = True - self._sci_note_upper_thres = 100000 - self._sci_note_lower_thres = 0.001 - self.fast_write = True - self.comments_on = False - self.auto_set_sizes = True - self.verify_data = True - self.debug = False - self.verbose = True - self.verbosity_level = VerbosityLevel.normal - self.max_columns_user_set = False - self.max_columns_auto_set = False - - self._update_str_format() - - # --- file path --- - self.mfpath = MFFileMgmt(path, mfsim) - - # --- ease of use variables to make working with modflow input and - # output data easier --- model dimension class for each model - self.model_dimensions = {} - - # --- model data --- - self.mfdata = SimulationDict(self.mfpath) - - # --- temporary variables --- - # other external files referenced - self.referenced_files = {} - - @property - def lazy_io(self): - if not self.auto_set_sizes and not self.verify_data: - return True - return False - - @lazy_io.setter - def lazy_io(self, val): - if val: - self.auto_set_sizes = False - self.verify_data = False - else: - self.auto_set_sizes = True - self.verify_data = True - - @property - def max_columns_of_data(self): - return self._max_columns_of_data - - @max_columns_of_data.setter - def max_columns_of_data(self, val): - if not self.max_columns_user_set and ( - not self.max_columns_auto_set or val > self._max_columns_of_data - ): - self._max_columns_of_data = val - self.max_columns_user_set = True - - @property - def float_precision(self): - """ - Gets precision of floating point numbers. - """ - return self._float_precision - - @float_precision.setter - def float_precision(self, value): - """ - Sets precision of floating point numbers. - - Parameters - ---------- - value: float - floating point precision - - """ - self._float_precision = value - self._update_str_format() - - @property - def float_characters(self): - """ - Gets max characters used in floating point numbers. - """ - return self._float_characters - - @float_characters.setter - def float_characters(self, value): - """ - Sets max characters used in floating point numbers. - - Parameters - ---------- - value: float - floating point max characters - - """ - self._float_characters = value - self._update_str_format() - - def set_sci_note_upper_thres(self, value): - """ - Sets threshold number where any number larger than threshold - is represented in scientific notation. - - Parameters - ---------- - value: float - threshold value - - """ - self._sci_note_upper_thres = value - self._update_str_format() - - def set_sci_note_lower_thres(self, value): - """ - Sets threshold number where any number smaller than threshold - is represented in scientific notation. - - Parameters - ---------- - value: float - threshold value - - """ - self._sci_note_lower_thres = value - self._update_str_format() - - def _update_str_format(self): - """ - Update floating point formatting strings.""" - self.reg_format_str = f"{{:.{self._float_precision}E}}" - self.sci_format_str = ( - f"{{:{self._float_characters}.{self._float_precision}f}}" - ) - - -class MFSimulation(PackageContainer): - """ - Entry point into any MODFLOW simulation. - MFSimulation is used to load, build, and/or save a MODFLOW 6 simulation. A MFSimulation object must be created before creating any of the MODFLOW 6 model objects. @@ -391,56 +16,60 @@ class MFSimulation(PackageContainer): Parameters ---------- sim_name : str - Name of the simulation. - version : str - Version of MODFLOW 6 executable - exe_name : str - Path to MODFLOW 6 executable - sim_ws : str - Path to MODFLOW 6 simulation working folder. This is the folder - containing the simulation name file. - verbosity_level : int - Verbosity level of standard output from 0 to 2. When 0 is specified no - standard output is written. When 1 is specified standard - error/warning messages with some informational messages are written. - When 2 is specified full error/warning/informational messages are - written (this is ideal for debugging). - continue_ : bool - Sets the continue option in the simulation name file. The continue - option is a keyword flag to indicate that the simulation should - continue even if one or more solutions do not converge. - nocheck : bool - Sets the nocheck option in the simulation name file. The nocheck - option is a keyword flag to indicate that the model input check - routines should not be called prior to each time step. Checks - are performed by default. - memory_print_option : str - Sets memory_print_option in the simulation name file. - Memory_print_option is a flag that controls printing of detailed - memory manager usage to the end of the simulation list file. NONE - means do not print detailed information. SUMMARY means print only - the total memory for each simulation component. ALL means print - information for each variable stored in the memory manager. NONE is - default if memory_print_option is not specified. - write_headers: bool - When true flopy writes a header to each package file indicating that - it was created by flopy. - lazy_io: bool - When true flopy only reads external data when the data is requested - and only writes external data if the data has changed. This option - automatically overrides the verify_data and auto_set_sizes, turning - both off. - Examples - -------- - >>> s = MFSimulation.load('my simulation', 'simulation.nam') - - Attributes - ---------- - sim_name : str - Name of the simulation - name_file : MFPackage - Simulation name file package - + Name of the simulation + continue_ : boolean + * continue (boolean) keyword flag to indicate that the simulation + should continue even if one or more solutions do not converge. + nocheck : boolean + * nocheck (boolean) keyword flag to indicate that the model input check + routines should not be called prior to each time step. Checks are + performed by default. + memory_print_option : string + * memory_print_option (string) is a flag that controls printing of + detailed memory manager usage to the end of the simulation list file. + NONE means do not print detailed information. SUMMARY means print + only the total memory for each simulation component. ALL means print + information for each variable stored in the memory manager. NONE is + default if MEMORY_PRINT_OPTION is not specified. + maxerrors : integer + * maxerrors (integer) maximum number of errors that will be stored and + printed. + tdis6 : string + * tdis6 (string) is the name of the Temporal Discretization (TDIS) + Input File. + models : [mtype, mfname, mname] + * mtype (string) is the type of model to add to simulation. + * mfname (string) is the file name of the model name file. + * mname (string) is the user-assigned name of the model. The model name + cannot exceed 16 characters and must not have blanks within the name. + The model name is case insensitive; any lowercase letters are + converted and stored as upper case letters. + exchanges : [exgtype, exgfile, exgmnamea, exgmnameb] + * exgtype (string) is the exchange type. + * exgfile (string) is the input file for the exchange. + * exgmnamea (string) is the name of the first model that is part of + this exchange. + * exgmnameb (string) is the name of the second model that is part of + this exchange. + mxiter : integer + * mxiter (integer) is the maximum number of outer iterations for this + solution group. The default value is 1. If there is only one solution + in the solution group, then MXITER must be 1. + solutiongroup : [slntype, slnfname, slnmnames] + * slntype (string) is the type of solution. The Integrated Model + Solution (IMS6) is the only supported option in this version. + * slnfname (string) name of file containing solution input. + * slnmnames (string) is the array of model names to add to this + solution. The number of model names is determined by the number of + model names the user provides on this line. + + Methods + ------- + load : (sim_name : str, version : string, + exe_name : str or PathLike, sim_ws : str or PathLike, strict : bool, + verbosity_level : int, load_only : list, verify_data : bool, + write_headers : bool, lazy_io : bool) : MFSimulation + a class method that loads a simulation from files """ def __init__( @@ -450,210 +79,32 @@ def __init__( exe_name: Union[str, os.PathLike] = "mf6", sim_ws: Union[str, os.PathLike] = os.curdir, verbosity_level=1, + write_headers=True, + lazy_io=False, continue_=None, nocheck=None, memory_print_option=None, - write_headers=True, - lazy_io=False, + maxerrors=None, ): - super().__init__(MFSimulationData(sim_ws, self), sim_name) - self.simulation_data.verbosity_level = self._resolve_verbosity_level( - verbosity_level - ) - self.simulation_data.write_headers = write_headers - if lazy_io: - self.simulation_data.lazy_io = True - - # verify metadata - fpdata = mfstructure.MFStructure() - if not fpdata.valid: - excpt_str = ( - "Invalid package metadata. Unable to load MODFLOW " - "file structure metadata." - ) - raise FlopyException(excpt_str) - - # initialize - self.dimensions = None - self.type = "Simulation" - - self.version = version - self.exe_name = exe_name - self._models = {} - self._tdis_file = None - self._exchange_files = {} - self._solution_files = {} - self._other_files = {} - self.structure = fpdata.sim_struct - self.model_type = None - - self._exg_file_num = {} - - self.simulation_data.mfpath.set_last_accessed_path() - - # build simulation name file - self.name_file = mfnam.ModflowNam( - self, - filename="mfsim.nam", - continue_=continue_, - nocheck=nocheck, - memory_print_option=memory_print_option, - _internal_package=True, - ) - - # try to build directory structure - sim_path = self.simulation_data.mfpath.get_sim_path() - if not os.path.isdir(sim_path): - try: - os.makedirs(sim_path) - except OSError as e: - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.quiet.value - ): - print( - "An error occurred when trying to create the " - "directory {}: {}".format(sim_path, e.strerror) - ) - - # set simulation validity initially to false since the user must first - # add at least one model to the simulation and fill out the name and - # tdis files - self.valid = False - - def __getattr__(self, item): - """ - Override __getattr__ to allow retrieving models. - - __getattr__ is used to allow for getting models and packages as if - they are attributes - - Parameters - ---------- - item : str - model or package name - - - Returns - ------- - md : Model or package object - Model or package object of type :class:flopy6.mfmodel or - :class:flopy6.mfpackage - - """ - if item == "valid" or not hasattr(self, "valid"): - raise AttributeError(item) - - models = [] - if item in self.structure.model_types: - # get all models of this type - for model in self._models.values(): - if model.model_type == item or model.model_type[:-1] == item: - models.append(model) - - if len(models) > 0: - return models - elif item in self._models: - model = self.get_model(item) - if model is not None: - return model - raise AttributeError(item) - else: - package = self.get_package(item) - if package is not None: - return package - raise AttributeError(item) - - def __repr__(self): - """ - Override __repr__ to print custom string. - - Returns - -------- - repr string : str - string describing object - - """ - return self._get_data_str(True) - - def __str__(self): - """ - Override __str__ to print custom string. - - Returns - -------- - str string : str - string describing object - - """ - return self._get_data_str(False) - - def _get_data_str(self, formal): - file_mgt = self.simulation_data.mfpath - data_str = ( - "sim_name = {}\nsim_path = {}\nexe_name = " - "{}\n" - "\n".format(self.name, file_mgt.get_sim_path(), self.exe_name) + super().__init__( + sim_name=sim_name, + version=version, + exe_name=exe_name, + sim_ws=sim_ws, + verbosity_level=verbosity_level, + write_headers=write_headers, + lazy_io=lazy_io, ) - for package in self._packagelist: - pk_str = package._get_data_str(formal, False) - if formal: - if len(pk_str.strip()) > 0: - data_str = ( - "{}###################\nPackage {}\n" - "###################\n\n" - "{}\n".format(data_str, package._get_pname(), pk_str) - ) - else: - if len(pk_str.strip()) > 0: - data_str = ( - "{}###################\nPackage {}\n" - "###################\n\n" - "{}\n".format(data_str, package._get_pname(), pk_str) - ) - for model in self._models.values(): - if formal: - mod_repr = repr(model) - if len(mod_repr.strip()) > 0: - data_str = ( - "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" - "@@@@@@@@@@@@@@@@@@@@\n\n" - "{}\n".format(data_str, model.name, mod_repr) - ) - else: - mod_str = str(model) - if len(mod_str.strip()) > 0: - data_str = ( - "{}@@@@@@@@@@@@@@@@@@@@\nModel {}\n" - "@@@@@@@@@@@@@@@@@@@@\n\n" - "{}\n".format(data_str, model.name, mod_str) - ) - return data_str - - @property - def model_names(self): - """ - Return a list of model names associated with this simulation. - - Returns - -------- - list: list of model names - - """ - return list(self._models.keys()) - - @property - def exchange_files(self): - """ - Return list of exchange files associated with this simulation. + self.name_file.continue_.set_data(continue_) + self.name_file.nocheck.set_data(nocheck) + self.name_file.memory_print_option.set_data(memory_print_option) + self.name_file.maxerrors.set_data(maxerrors) - Returns - -------- - list: list of exchange names - - """ - return self._exchange_files.values() + self.continue_ = self.name_file.continue_ + self.nocheck = self.name_file.nocheck + self.memory_print_option = self.name_file.memory_print_option + self.maxerrors = self.name_file.maxerrors @classmethod def load( @@ -669,1841 +120,16 @@ def load( write_headers=True, lazy_io=False, ): - """ - Load an existing model. - - Parameters - ---------- - sim_name : str - Name of the simulation. - version : str - MODFLOW version - exe_name : str or PathLike - Path to MODFLOW executable (relative to the simulation workspace or absolute) - sim_ws : str or PathLike - Path to simulation workspace - strict : bool - Strict enforcement of file formatting - verbosity_level : int - Verbosity level of standard output - 0: No standard output - 1: Standard error/warning messages with some informational - messages - 2: Verbose mode with full error/warning/informational - messages. This is ideal for debugging. - load_only : list - List of package abbreviations or package names corresponding to - packages that flopy will load. default is None, which loads all - packages. the discretization packages will load regardless of this - setting. subpackages, like time series and observations, will also - load regardless of this setting. - example list: ['ic', 'maw', 'npf', 'oc', 'ims', 'gwf6-gwf6'] - verify_data : bool - Verify data when it is loaded. this can slow down loading - write_headers: bool - When true flopy writes a header to each package file indicating - that it was created by flopy - lazy_io: bool - When true flopy only reads external data when the data is requested - and only writes external data if the data has changed. This option - automatically overrides the verify_data and auto_set_sizes, turning - both off. - Returns - ------- - sim : MFSimulation object - - Examples - -------- - >>> s = flopy.mf6.mfsimulation.load('my simulation') - - """ - # initialize - instance = cls( + return mfsimbase.MFSimulationBase.load( + cls, sim_name, version, exe_name, sim_ws, + strict, verbosity_level, - write_headers=write_headers, - ) - verbosity_level = instance.simulation_data.verbosity_level - - instance.simulation_data.verify_data = verify_data - if lazy_io: - instance.simulation_data.lazy_io = True - - if verbosity_level.value >= VerbosityLevel.normal.value: - print("loading simulation...") - - # build case consistent load_only dictionary for quick lookups - load_only = instance._load_only_dict(load_only) - - # load simulation name file - if verbosity_level.value >= VerbosityLevel.normal.value: - print(" loading simulation name file...") - instance.name_file.load(strict) - - # load TDIS file - tdis_pkg = f"tdis{mfstructure.MFStructure().get_version_string()}" - tdis_attr = getattr(instance.name_file, tdis_pkg) - instance._tdis_file = mftdis.ModflowTdis( - instance, filename=tdis_attr.get_data() - ) - - instance._tdis_file._filename = instance.simulation_data.mfdata[ - ("nam", "timing", tdis_pkg) - ].get_data() - if verbosity_level.value >= VerbosityLevel.normal.value: - print(" loading tdis package...") - instance._tdis_file.load(strict) - - # load models - try: - model_recarray = instance.simulation_data.mfdata[ - ("nam", "models", "models") - ] - models = model_recarray.get_data() - except MFDataException as mfde: - message = ( - "Error occurred while loading model names from the " - "simulation name file." - ) - raise MFDataException( - mfdata_except=mfde, - model=instance.name, - package="nam", - message=message, - ) - for item in models: - # resolve model working folder and name file - path, name_file = os.path.split(item[1]) - model_obj = PackageContainer.model_factory(item[0][:-1].lower()) - # load model - if verbosity_level.value >= VerbosityLevel.normal.value: - print(f" loading model {item[0].lower()}...") - instance._models[item[2]] = model_obj.load( - instance, - instance.structure.model_struct_objs[item[0].lower()], - item[2], - name_file, - version, - exe_name, - strict, - path, - load_only, - ) - - # load exchange packages and dependent packages - try: - exchange_recarray = instance.name_file.exchanges - has_exch_data = exchange_recarray.has_data() - except MFDataException as mfde: - message = ( - "Error occurred while loading exchange names from the " - "simulation name file." - ) - raise MFDataException( - mfdata_except=mfde, - model=instance.name, - package="nam", - message=message, - ) - if has_exch_data: - try: - exch_data = exchange_recarray.get_data() - except MFDataException as mfde: - message = ( - "Error occurred while loading exchange names from " - "the simulation name file." - ) - raise MFDataException( - mfdata_except=mfde, - model=instance.name, - package="nam", - message=message, - ) - for exgfile in exch_data: - if load_only is not None and not instance._in_pkg_list( - load_only, exgfile[0], exgfile[2] - ): - if ( - instance.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print(f" skipping package {exgfile[0].lower()}...") - continue - # get exchange type by removing numbers from exgtype - exchange_type = "".join( - [char for char in exgfile[0] if not char.isdigit()] - ).upper() - # get exchange number for this type - if exchange_type not in instance._exg_file_num: - exchange_file_num = 0 - instance._exg_file_num[exchange_type] = 1 - else: - exchange_file_num = instance._exg_file_num[exchange_type] - instance._exg_file_num[exchange_type] += 1 - - exchange_name = f"{exchange_type}_EXG_{exchange_file_num}" - # find package class the corresponds to this exchange type - package_obj = instance.package_factory( - exchange_type.replace("-", "").lower(), "" - ) - if not package_obj: - message = ( - "An error occurred while loading the " - "simulation name file. Invalid exchange type " - '"{}" specified.'.format(exchange_type) - ) - type_, value_, traceback_ = sys.exc_info() - raise MFDataException( - instance.name, - "nam", - "nam", - "loading simulation name file", - exchange_recarray.structure.name, - inspect.stack()[0][3], - type_, - value_, - traceback_, - message, - instance._simulation_data.debug, - ) - - # build and load exchange package object - exchange_file = package_obj( - instance, - exgtype=exgfile[0], - exgmnamea=exgfile[2], - exgmnameb=exgfile[3], - filename=exgfile[1], - pname=exchange_name, - loading_package=True, - ) - if verbosity_level.value >= VerbosityLevel.normal.value: - print( - f" loading exchange package {exchange_file._get_pname()}..." - ) - exchange_file.load(strict) - instance._exchange_files[exgfile[1]] = exchange_file - - # load simulation packages - solution_recarray = instance.simulation_data.mfdata[ - ("nam", "solutiongroup", "solutiongroup") - ] - - try: - solution_group_dict = solution_recarray.get_data() - except MFDataException as mfde: - message = ( - "Error occurred while loading solution groups from " - "the simulation name file." - ) - raise MFDataException( - mfdata_except=mfde, - model=instance.name, - package="nam", - message=message, - ) - for solution_group in solution_group_dict.values(): - for solution_info in solution_group: - if load_only is not None and not instance._in_pkg_list( - load_only, solution_info[0], solution_info[2] - ): - if ( - instance.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - f" skipping package {solution_info[0].lower()}..." - ) - continue - # create solution package - sln_package_obj = instance.package_factory( - solution_info[0][:-1].lower(), "" - ) - sln_package = sln_package_obj( - instance, - filename=solution_info[1], - pname=solution_info[2], - ) - - if verbosity_level.value >= VerbosityLevel.normal.value: - print( - f" loading solution package " - f"{sln_package._get_pname()}..." - ) - sln_package.load(strict) - - instance.simulation_data.mfpath.set_last_accessed_path() - if verify_data: - instance.check() - return instance - - def check( - self, - f: Optional[Union[str, os.PathLike]] = None, - verbose=True, - level=1, - ): - """ - Check model data for common errors. - - Parameters - ---------- - f : str or PathLike, optional - String defining file name or file handle for summary file - of check method output. If str or pathlike, a file handle - is created. If None, the method does not write results to - a summary file. (default is None) - verbose : bool - Boolean flag used to determine if check method results are - written to the screen - level : int - Check method analysis level. If level=0, summary checks are - performed. If level=1, full checks are performed. - - Returns - ------- - check list: list - Python list containing simulation check results - - Examples - -------- - - >>> import flopy - >>> m = flopy.modflow.Modflow.load('model.nam') - >>> m.check() - """ - # check instance for simulation-level check - chk_list = [] - - # check models - for model in self._models.values(): - print(f'Checking model "{model.name}"...') - chk_list.append(model.check(f, verbose, level)) - - print("Checking for missing simulation packages...") - if self._tdis_file is None: - if chk_list: - chk_list[0]._add_to_summary( - "Error", desc="\r No tdis package", package="model" - ) - print("Error: no tdis package") - if len(self._solution_files) == 0: - if chk_list: - chk_list[0]._add_to_summary( - "Error", desc="\r No solver package", package="model" - ) - print("Error: no solution package") - return chk_list - - @property - def sim_path(self) -> Path: - return Path(self.simulation_data.mfpath.get_sim_path()) - - @property - def sim_package_list(self): - """List of all "simulation level" packages""" - package_list = [] - if self._tdis_file is not None: - package_list.append(self._tdis_file) - for sim_package in self._solution_files.values(): - package_list.append(sim_package) - for sim_package in self._exchange_files.values(): - package_list.append(sim_package) - for sim_package in self._other_files.values(): - package_list.append(sim_package) - return package_list - - def load_package( - self, - ftype, - fname: Union[str, os.PathLike], - pname, - strict, - ref_path: Union[str, os.PathLike], - dict_package_name=None, - parent_package=None, - ): - """ - Load a package from a file. - - Parameters - ---------- - ftype : str - the file type - fname : str or PathLike - the path of the file containing the package input - pname : str - the user-defined name for the package - strict : bool - strict mode when loading the file - ref_path : str - path to the file. uses local path if set to None - dict_package_name : str - package name for dictionary lookup - parent_package : MFPackage - parent package - - """ - if ( - ftype in self.structure.package_struct_objs - and self.structure.package_struct_objs[ftype].multi_package_support - ) or ( - ftype in self.structure.utl_struct_objs - and self.structure.utl_struct_objs[ftype].multi_package_support - ): - # resolve dictionary name for package - if dict_package_name is not None: - if parent_package is not None: - dict_package_name = f"{parent_package.path[-1]}_{ftype}" - else: - # use dict_package_name as the base name - if ftype in self._ftype_num_dict: - self._ftype_num_dict[dict_package_name] += 1 - else: - self._ftype_num_dict[dict_package_name] = 0 - dict_package_name = "{}_{}".format( - dict_package_name, - self._ftype_num_dict[dict_package_name], - ) - else: - # use ftype as the base name - if ftype in self._ftype_num_dict: - self._ftype_num_dict[ftype] += 1 - else: - self._ftype_num_dict[ftype] = 0 - if pname is not None: - dict_package_name = pname - else: - dict_package_name = ( - f"{ftype}_{self._ftype_num_dict[ftype]}" - ) - else: - dict_package_name = ftype - - # get package object from file type - package_obj = self.package_factory(ftype, "") - # create package - package = package_obj( - self, - filename=fname, - pname=dict_package_name, - parent_file=parent_package, - loading_package=True, - ) - package.load(strict) - self._other_files[package.filename] = package - # register child package with the simulation - self._add_package(package, package.path) - if parent_package is not None: - # register child package with the parent package - parent_package._add_package(package, package.path) - return package - - def register_ims_package( - self, solution_file: MFPackage, model_list: Union[str, List[str]] - ): - self.register_solution_package(solution_file, model_list) - - def register_solution_package( - self, solution_file: MFPackage, model_list: Union[str, List[str]] - ): - """ - Register a solution package with the simulation. - - Parameters - solution_file : MFPackage - solution package to register - model_list : list of strings - list of models using the solution package to be registered - - """ - if isinstance(model_list, str): - model_list = [model_list] - - if ( - solution_file.package_type - not in mfstructure.MFStructure().flopy_dict["solution_packages"] - ): - comment = ( - 'Parameter "solution_file" is not a valid solution file. ' - 'Expected solution file, but type "{}" was given' - ".".format(type(solution_file)) - ) - type_, value_, traceback_ = sys.exc_info() - raise MFDataException( - None, - "solution", - "", - "registering solution package", - "", - inspect.stack()[0][3], - type_, - value_, - traceback_, - comment, - self.simulation_data.debug, - ) - valid_model_types = mfstructure.MFStructure().flopy_dict[ - "solution_packages" - ][solution_file.package_type] - # remove models from existing solution groups - if model_list is not None: - for model in model_list: - md = self.get_model(model) - if md is not None and ( - md.model_type not in valid_model_types - and "*" not in valid_model_types - ): - comment = ( - f"Model type {md.model_type} is not a valid type " - f"for solution file {solution_file.filename} solution " - f"file type {solution_file.package_type}. Valid model " - f"types are {valid_model_types}" - ) - type_, value_, traceback_ = sys.exc_info() - raise MFDataException( - None, - "solution", - "", - "registering solution package", - "", - inspect.stack()[0][3], - type_, - value_, - traceback_, - comment, - self.simulation_data.debug, - ) - self._remove_from_all_solution_groups(model) - - # register solution package with model list - in_simulation = False - pkg_with_same_name = None - for file in self._solution_files.values(): - if file is solution_file: - in_simulation = True - if ( - file.package_name == solution_file.package_name - and file != solution_file - ): - pkg_with_same_name = file - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - "WARNING: solution package with name {} already exists. " - "New solution package will replace old package" - ".".format(file.package_name) - ) - self._remove_package(self._solution_files[file.filename]) - del self._solution_files[file.filename] - break - # register solution package - if not in_simulation: - self._add_package( - solution_file, self._get_package_path(solution_file) - ) - # do not allow a solution package to be registered twice with the - # same simulation - if not in_simulation: - # create unique file/package name - if solution_file.package_name is None: - file_num = len(self._solution_files) - 1 - solution_file.package_name = ( - f"{solution_file.package_type}_{file_num}" - ) - if solution_file.filename in self._solution_files: - solution_file.filename = MFFileMgmt.unique_file_name( - solution_file.filename, self._solution_files - ) - # add solution package to simulation - self._solution_files[solution_file.filename] = solution_file - - # If solution file is being replaced, replace solution filename in - # solution group - if pkg_with_same_name is not None and self._is_in_solution_group( - pkg_with_same_name.filename, 1 - ): - # change existing solution group to reflect new solution file - self._replace_solution_in_solution_group( - pkg_with_same_name.filename, 1, solution_file.filename - ) - # only allow solution package to be registered to one solution group - elif model_list is not None: - sln_file_in_group = self._is_in_solution_group( - solution_file.filename, 1 - ) - # add solution group to the simulation name file - solution_recarray = self.name_file.solutiongroup - solution_group_list = solution_recarray.get_active_key_list() - if len(solution_group_list) == 0: - solution_group_num = 0 - else: - solution_group_num = solution_group_list[-1][0] - - if sln_file_in_group: - self._append_to_solution_group( - solution_file.filename, model_list - ) - else: - if self.name_file.mxiter.get_data(solution_group_num) is None: - self.name_file.mxiter.add_transient_key(solution_group_num) - - # associate any models in the model list to this - # simulation file - version_string = mfstructure.MFStructure().get_version_string() - solution_pkg = f"{solution_file.package_abbr}{version_string}" - new_record = [solution_pkg, solution_file.filename] - for model in model_list: - new_record.append(model) - try: - solution_recarray.append_list_as_record( - new_record, solution_group_num - ) - except MFDataException as mfde: - message = ( - "Error occurred while updating the " - "simulation name file with the solution package " - 'file "{}".'.format(solution_file.filename) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - - @staticmethod - def _rename_package_group(group_dict, name): - package_type_count = {} - # first build an array to avoid key modification errors - package_array = [] - for package in group_dict.values(): - package_array.append(package) - # update package file names and count - for package in package_array: - if package.package_type not in package_type_count: - file_name = f"{name}.{package.package_type}" - package_type_count[package.package_type] = 1 - else: - package_type_count[package.package_type] += 1 - ptc = package_type_count[package.package_type] - file_name = f"{name}_{ptc}.{package.package_type}" - base_filepath = os.path.split(package.filename)[0] - if base_filepath != "": - # include existing relative path in new file name - file_name = os.path.join(base_filepath, file_name) - package.filename = file_name - - def _rename_exchange_file(self, package, new_filename): - self._exchange_files[package.filename] = package - try: - exchange_recarray_data = self.name_file.exchanges.get_data() - except MFDataException as mfde: - message = ( - "An error occurred while retrieving exchange " - "data from the simulation name file. The error " - "occurred while registering exchange file " - f'"{package.filename}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - if exchange_recarray_data is not None: - for index, exchange in zip( - range(0, len(exchange_recarray_data)), - exchange_recarray_data, - ): - if exchange[1] == package.filename: - # update existing exchange - exchange_recarray_data[index][1] = new_filename - ex_recarray = self.name_file.exchanges - try: - ex_recarray.set_data(exchange_recarray_data) - except MFDataException as mfde: - message = ( - "An error occurred while setting " - "exchange data in the simulation name " - "file. The error occurred while " - "registering the following " - "values (exgtype, filename, " - f'exgmnamea, exgmnameb): "{package.exgtype} ' - f"{package.filename} {package.exgmnamea}" - f'{package.exgmnameb}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - return - - def _set_timing_block(self, file_name): - struct_root = mfstructure.MFStructure() - tdis_pkg = f"tdis{struct_root.get_version_string()}" - tdis_attr = getattr(self.name_file, tdis_pkg) - try: - tdis_attr.set_data(file_name) - except MFDataException as mfde: - message = ( - "An error occurred while setting the tdis package " - f'file name "{file_name}". The error occurred while ' - "registering the tdis package with the " - "simulation" - ) - raise MFDataException( - mfdata_except=mfde, - package=file_name, - message=message, - ) - - def update_package_filename(self, package, new_name): - """ - Updates internal arrays to be consistent with a new file name. - This is for internal flopy library use only. - - Parameters - ---------- - package: MFPackage - Package with new name - new_name: str - Package's new name - - """ - if ( - self._tdis_file is not None - and package.filename == self._tdis_file.filename - ): - self._set_timing_block(new_name) - elif package.filename in self._exchange_files: - self._exchange_files[new_name] = self._exchange_files.pop( - package.filename - ) - self._rename_exchange_file(package, new_name) - elif package.filename in self._solution_files: - self._solution_files[new_name] = self._solution_files.pop( - package.filename - ) - self._update_solution_group(package.filename, new_name) - else: - self._other_files[new_name] = self._other_files.pop( - package.filename - ) - - def rename_all_packages(self, name): - """ - Rename all packages with name as prefix. - - Parameters - ---------- - name: str - Prefix of package names - - """ - if self._tdis_file is not None: - self._tdis_file.filename = f"{name}.{self._tdis_file.package_type}" - - self._rename_package_group(self._exchange_files, name) - self._rename_package_group(self._solution_files, name) - self._rename_package_group(self._other_files, name) - for model in self._models.values(): - model.rename_all_packages(name) - - def set_all_data_external( - self, check_data=True, external_data_folder=None - ): - """Sets the simulation's list and array data to be stored externally. - - Parameters - ---------- - check_data: bool - Determines if data error checking is enabled during this - process. Data error checking can be slow on large datasets. - external_data_folder: str or PathLike - Path relative to the simulation path or model relative path - (see use_model_relative_path parameter), where external data - will be stored - """ - # copy any files whose paths have changed - self.simulation_data.mfpath.copy_files() - # set data external for all packages in all models - for model in self._models.values(): - model.set_all_data_external(check_data, external_data_folder) - # set data external for solution packages - for package in self._solution_files.values(): - package.set_all_data_external(check_data, external_data_folder) - # set data external for other packages - for package in self._other_files.values(): - package.set_all_data_external(check_data, external_data_folder) - for package in self._exchange_files.values(): - package.set_all_data_external(check_data, external_data_folder) - - def set_all_data_internal(self, check_data=True): - # set data external for all packages in all models - for model in self._models.values(): - model.set_all_data_internal(check_data) - # set data external for solution packages - for package in self._solution_files.values(): - package.set_all_data_internal(check_data) - # set data external for other packages - for package in self._other_files.values(): - package.set_all_data_internal(check_data) - # set data external for exchange packages - for package in self._exchange_files.values(): - package.set_all_data_internal(check_data) - - def write_simulation( - self, ext_file_action=ExtFileAction.copy_relative_paths, silent=False - ): - """ - Write the simulation to files. - - Parameters - ext_file_action : ExtFileAction - Defines what to do with external files when the simulation - path has changed. Defaults to copy_relative_paths which - copies only files with relative paths, leaving files defined - by absolute paths fixed. - silent : bool - Writes out the simulation in silent mode (verbosity_level = 0) - - """ - sim_data = self.simulation_data - if not sim_data.max_columns_user_set: - # search for dis packages - for model in self._models.values(): - dis = model.get_package("dis") - if dis is not None and hasattr(dis, "ncol"): - sim_data.max_columns_of_data = dis.ncol.get_data() - sim_data.max_columns_user_set = False - sim_data.max_columns_auto_set = True - - saved_verb_lvl = self.simulation_data.verbosity_level - if silent: - self.simulation_data.verbosity_level = VerbosityLevel.quiet - - # write simulation name file - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print("writing simulation...") - print(" writing simulation name file...") - self.name_file.write(ext_file_action=ext_file_action) - - # write TDIS file - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print(" writing simulation tdis package...") - self._tdis_file.write(ext_file_action=ext_file_action) - - # write solution files - for solution_file in self._solution_files.values(): - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - f" writing solution package " - f"{solution_file._get_pname()}..." - ) - solution_file.write(ext_file_action=ext_file_action) - - # write exchange files - for exchange_file in self._exchange_files.values(): - exchange_file.write() - - # write other packages - for pp in self._other_files.values(): - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print(f" writing package {pp._get_pname()}...") - pp.write(ext_file_action=ext_file_action) - - # FIX: model working folder should be model name file folder - - # write models - for model in self._models.values(): - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print(f" writing model {model.name}...") - model.write(ext_file_action=ext_file_action) - - self.simulation_data.mfpath.set_last_accessed_path() - - if silent: - self.simulation_data.verbosity_level = saved_verb_lvl - - def set_sim_path(self, path: Union[str, os.PathLike]): - """Return a list of output data keys. - - Parameters - ---------- - path : str - Relative or absolute path to simulation root folder. - - """ - # set all data internal - self.set_all_data_internal() - - # set simulation path - self.simulation_data.mfpath.set_sim_path(path, True) - - if not os.path.exists(path): - # create new simulation folder - os.makedirs(path) - - def run_simulation( - self, - silent=None, - pause=False, - report=False, - processors=None, - normal_msg="normal termination", - use_async=False, - cargs=None, - ): - """ - Run the simulation. - - Parameters - ---------- - silent: bool - Run in silent mode - pause: bool - Pause at end of run - report: bool - Save stdout lines to a list (buff) - processors: int - Number of processors. Parallel simulations are only supported - for MODFLOW 6 simulations. (default is None) - normal_msg: str or list - Normal termination message used to determine if the run - terminated normally. More than one message can be provided - using a list. (default is 'normal termination') - use_async : bool - Asynchronously read model stdout and report with timestamps. - good for models that take long time to run. not good for - models that run really fast - cargs : str or list of strings - Additional command line arguments to pass to the executable. - default is None - - Returns - -------- - success : bool - buff : list of lines of stdout - - """ - if silent is None: - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - silent = False - else: - silent = True - return run_model( - self.exe_name, - None, - self.simulation_data.mfpath.get_sim_path(), - silent=silent, - pause=pause, - report=report, - processors=processors, - normal_msg=normal_msg, - use_async=use_async, - cargs=cargs, - ) - - def delete_output_files(self): - """Deletes simulation output files.""" - output_req = binaryfile_utils.MFOutputRequester - output_file_keys = output_req.getkeys( - self.simulation_data.mfdata, self.simulation_data.mfpath, False - ) - for path in output_file_keys.binarypathdict.values(): - if os.path.isfile(path): - os.remove(path) - - def remove_package(self, package_name): - """ - Removes package from the simulation. `package_name` can be the - package's name, type, or package object to be removed from the model. - - Parameters - ---------- - package_name : str - Name of package to be removed - - """ - if isinstance(package_name, MFPackage): - packages = [package_name] - else: - packages = self.get_package(package_name) - if not isinstance(packages, list): - packages = [packages] - for package in packages: - if ( - self._tdis_file is not None - and package.path == self._tdis_file.path - ): - self._tdis_file = None - if package.filename in self._exchange_files: - del self._exchange_files[package.filename] - if package.filename in self._solution_files: - del self._solution_files[package.filename] - self._update_solution_group(package.filename) - if package.filename in self._other_files: - del self._other_files[package.filename] - - self._remove_package(package) - - @property - def model_dict(self): - """ - Return a dictionary of models associated with this simulation. - - Returns - -------- - model dict : dict - dictionary of models - - """ - return self._models.copy() - - def get_model(self, model_name=None): - """ - Returns the models in the simulation with a given model name, name - file name, or model type. - - Parameters - ---------- - model_name : str - Name of the model to get. Passing in None or an empty list - will get the first model. - - Returns - -------- - model : MFModel - - """ - if len(self._models) == 0: - return None - - if model_name is None: - for model in self._models.values(): - return model - if model_name in self._models: - return self._models[model_name] - # do case-insensitive lookup - for name, model in self._models.items(): - if model_name.lower() == name.lower(): - return model - return None - - def get_exchange_file(self, filename): - """ - Get a specified exchange file. - - Parameters - ---------- - filename : str - Name of exchange file to get - - Returns - -------- - exchange package : MFPackage - - """ - if filename in self._exchange_files: - return self._exchange_files[filename] - else: - excpt_str = f'Exchange file "{filename}" can not be found.' - raise FlopyException(excpt_str) - - def get_file(self, filename): - """ - Get a specified file. - - Parameters - ---------- - filename : str - Name of mover file to get - - Returns - -------- - mover package : MFPackage - - """ - if filename in self._other_files: - return self._other_files[filename] - else: - excpt_str = f'file "{filename}" can not be found.' - raise FlopyException(excpt_str) - - def get_mvr_file(self, filename): - """ - Get a specified mover file. - - Parameters - ---------- - filename : str - Name of mover file to get - - Returns - -------- - mover package : MFPackage - - """ - warnings.warn( - "get_mvr_file will be deprecated and will be removed in version " - "3.3.6. Use get_file", - PendingDeprecationWarning, - ) - if filename in self._other_files: - return self._other_files[filename] - else: - excpt_str = f'MVR file "{filename}" can not be found.' - raise FlopyException(excpt_str) - - def get_mvt_file(self, filename): - """ - Get a specified mvt file. - - Parameters - ---------- - filename : str - Name of mover transport file to get - - Returns - -------- - mover transport package : MFPackage - - """ - warnings.warn( - "get_mvt_file will be deprecated and will be removed in version " - "3.3.6. Use get_file", - PendingDeprecationWarning, - ) - if filename in self._other_files: - return self._other_files[filename] - else: - excpt_str = f'MVT file "{filename}" can not be found.' - raise FlopyException(excpt_str) - - def get_gnc_file(self, filename): - """ - Get a specified gnc file. - - Parameters - ---------- - filename : str - Name of gnc file to get - - Returns - -------- - gnc package : MFPackage - - """ - warnings.warn( - "get_gnc_file will be deprecated and will be removed in version " - "3.3.6. Use get_file", - PendingDeprecationWarning, - ) - if filename in self._other_files: - return self._other_files[filename] - else: - excpt_str = f'GNC file "{filename}" can not be found.' - raise FlopyException(excpt_str) - - def remove_exchange_file(self, package): - """ - Removes the exchange file "package". This is for internal flopy - library use only. - - Parameters - ---------- - package: MFPackage - Exchange package to be removed - - """ - self._exchange_files[package.filename] = package - try: - exchange_recarray_data = self.name_file.exchanges.get_data() - except MFDataException as mfde: - message = ( - "An error occurred while retrieving exchange " - "data from the simulation name file. The error " - "occurred while registering exchange file " - f'"{package.filename}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - remove_indices = [] - if exchange_recarray_data is not None: - for index, exchange in zip( - range(0, len(exchange_recarray_data)), - exchange_recarray_data, - ): - if ( - package.filename is not None - and exchange[1] == package.filename - ): - remove_indices.append(index) - if len(remove_indices) > 0: - self.name_file.exchanges.set_data( - np.delete(exchange_recarray_data, remove_indices) - ) - - def register_exchange_file(self, package): - """ - Register an exchange package file with the simulation. This is a - call-back method made from the package and should not be called - directly. - - Parameters - ---------- - package : MFPackage - Exchange package object to register - - """ - if package.filename not in self._exchange_files: - exgtype = package.exgtype - exgmnamea = package.exgmnamea - exgmnameb = package.exgmnameb - - if exgtype is None or exgmnamea is None or exgmnameb is None: - excpt_str = ( - "Exchange packages require that exgtype, " - "exgmnamea, and exgmnameb are specified." - ) - raise FlopyException(excpt_str) - - self._exchange_files[package.filename] = package - try: - exchange_recarray_data = self.name_file.exchanges.get_data() - except MFDataException as mfde: - message = ( - "An error occurred while retrieving exchange " - "data from the simulation name file. The error " - "occurred while registering exchange file " - f'"{package.filename}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - if exchange_recarray_data is not None: - for index, exchange in zip( - range(0, len(exchange_recarray_data)), - exchange_recarray_data, - ): - if exchange[1] == package.filename: - # update existing exchange - exchange_recarray_data[index][0] = exgtype - exchange_recarray_data[index][2] = exgmnamea - exchange_recarray_data[index][3] = exgmnameb - ex_recarray = self.name_file.exchanges - try: - ex_recarray.set_data(exchange_recarray_data) - except MFDataException as mfde: - message = ( - "An error occurred while setting " - "exchange data in the simulation name " - "file. The error occurred while " - "registering the following " - "values (exgtype, filename, " - f'exgmnamea, exgmnameb): "{exgtype} ' - f"{package.filename} {exgmnamea}" - f'{exgmnameb}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - return - try: - # add new exchange - self.name_file.exchanges.append_data( - [(exgtype, package.filename, exgmnamea, exgmnameb)] - ) - except MFDataException as mfde: - message = ( - "An error occurred while setting exchange data " - "in the simulation name file. The error occurred " - "while registering the following values (exgtype, " - f'filename, exgmnamea, exgmnameb): "{exgtype} ' - f'{package.filename} {exgmnamea} {exgmnameb}".' - ) - raise MFDataException( - mfdata_except=mfde, - package=package._get_pname(), - message=message, - ) - if ( - package.dimensions is None - or package.dimensions.model_dim is None - ): - # resolve exchange package dimensions object - package.dimensions = package.create_package_dimensions() - - def _remove_package_by_type(self, package): - pname = None - if package.package_name is not None: - pname = package.package_name.lower() - if ( - package.package_type.lower() == "tdis" - and self._tdis_file is not None - and self._tdis_file in self._packagelist - ): - # tdis package already exists. there can be only one tdis - # package. remove existing tdis package - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - "WARNING: tdis package already exists. Replacing " - "existing tdis package." - ) - self._remove_package(self._tdis_file) - elif ( - package.package_type.lower() - in mfstructure.MFStructure().flopy_dict["solution_packages"] - and pname in self.package_name_dict - ): - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - "WARNING: Package with name " - f"{package.package_name.lower()} already exists. " - "Replacing existing package." - ) - self._remove_package(self.package_name_dict[pname]) - else: - if ( - package.filename in self._other_files - and self._other_files[package.filename] in self._packagelist - ): - # other package with same file name already exists. remove old - # package - if ( - self.simulation_data.verbosity_level.value - >= VerbosityLevel.normal.value - ): - print( - f"WARNING: package with name {pname} already exists. " - "Replacing existing package." - ) - self._remove_package(self._other_files[package.filename]) - del self._other_files[package.filename] - - def register_package( - self, - package, - add_to_package_list=True, - set_package_name=True, - set_package_filename=True, - ): - """ - Register a package file with the simulation. This is a - call-back method made from the package and should not be called - directly. - - Parameters - ---------- - package : MFPackage - Package to register - add_to_package_list : bool - Add package to lookup list - set_package_name : bool - Produce a package name for this package - set_package_filename : bool - Produce a filename for this package - - Returns - -------- - (path : tuple, package structure : MFPackageStructure) - - """ - if set_package_filename: - # set initial package filename - base_name = os.path.basename(os.path.normpath(self.name)) - package._filename = f"{base_name}.{package.package_type}" - - package.container_type = [PackageContainerType.simulation] - path = self._get_package_path(package) - if add_to_package_list and package.package_type.lower != "nam": - if ( - package.package_type.lower() - not in mfstructure.MFStructure().flopy_dict[ - "solution_packages" - ] - ): - # all but solution packages get added here. solution packages - # are added during solution package registration - self._remove_package_by_type(package) - self._add_package(package, path) - sln_dict = mfstructure.MFStructure().flopy_dict["solution_packages"] - if package.package_type.lower() == "nam": - if not package.internal_package: - excpt_str = ( - "Unable to register nam file. Do not create your own nam " - "files. Nam files are automatically created and managed " - "for you by FloPy." - ) - print(excpt_str) - raise FlopyException(excpt_str) - return path, self.structure.name_file_struct_obj - elif package.package_type.lower() == "tdis": - self._tdis_file = package - self._set_timing_block(package.quoted_filename) - return ( - path, - self.structure.package_struct_objs[ - package.package_type.lower() - ], - ) - elif package.package_type.lower() in sln_dict: - supported_packages = sln_dict[package.package_type.lower()] - # default behavior is to register the solution package with the - # first unregistered model - unregistered_models = [] - for model_name, model in self._models.items(): - model_registered = self._is_in_solution_group( - model_name, 2, True - ) - if not model_registered and ( - model.model_type in supported_packages - or "*" in supported_packages - ): - unregistered_models.append(model_name) - if unregistered_models: - self.register_solution_package(package, unregistered_models) - else: - self.register_solution_package(package, None) - return ( - path, - self.structure.package_struct_objs[ - package.package_type.lower() - ], - ) - else: - if package.filename not in self._other_files: - self._other_files[package.filename] = package - else: - # auto generate a unique file name and register it - file_name = MFFileMgmt.unique_file_name( - package.filename, self._other_files - ) - package.filename = file_name - self._other_files[file_name] = package - - if package.package_type.lower() in self.structure.package_struct_objs: - return ( - path, - self.structure.package_struct_objs[ - package.package_type.lower() - ], - ) - elif package.package_type.lower() in self.structure.utl_struct_objs: - return ( - path, - self.structure.utl_struct_objs[package.package_type.lower()], - ) - else: - excpt_str = ( - 'Invalid package type "{}". Unable to register ' - "package.".format(package.package_type) - ) - print(excpt_str) - raise FlopyException(excpt_str) - - def rename_model_namefile(self, model, new_namefile): - """ - Rename a model's namefile. For internal flopy library use only. - - Parameters - ---------- - model : MFModel - Model object whose namefile to rename - new_namefile : str - Name of the new namefile - - """ - # update simulation name file - models = self.name_file.models.get_data() - for mdl in models: - path, name_file_name = os.path.split(mdl[1]) - if name_file_name == model.name_file.filename: - mdl[1] = os.path.join(path, new_namefile) - self.name_file.models.set_data(models) - - def register_model(self, model, model_type, model_name, model_namefile): - """ - Add a model to the simulation. This is a call-back method made - from the package and should not be called directly. - - Parameters - ---------- - model : MFModel - Model object to add to simulation - sln_group : str - Solution group of model - - Returns - -------- - model_structure_object : MFModelStructure - """ - - # get model structure from model type - if model_type not in self.structure.model_struct_objs: - message = f'Invalid model type: "{model_type}".' - type_, value_, traceback_ = sys.exc_info() - raise MFDataException( - model.name, - "", - model.name, - "registering model", - "sim", - inspect.stack()[0][3], - type_, - value_, - traceback_, - message, - self.simulation_data.debug, - ) - - # add model - self._models[model_name] = model - - # update simulation name file - self.name_file.models.append_list_as_record( - [model_type, model_namefile, model_name] - ) - - if len(self._solution_files) > 0: - # register model with first solution file found - first_solution_key = next(iter(self._solution_files)) - self.register_solution_package( - self._solution_files[first_solution_key], model_name - ) - - return self.structure.model_struct_objs[model_type] - - def get_ims_package(self, key): - warnings.warn( - "get_ims_package() has been deprecated and will be " - "removed in version 3.3.7. Use " - "get_solution_package() instead.", - DeprecationWarning, - ) - return self.get_solution_package(key) - - def get_solution_package(self, key): - """ - Get the solution package with the specified `key`. - - Parameters - ---------- - key : str - solution package file name - - Returns - -------- - solution_package : MFPackage - - """ - if key in self._solution_files: - return self._solution_files[key] - return None - - def remove_model(self, model_name): - """ - Remove model with name `model_name` from the simulation - - Parameters - ---------- - model_name : str - Model name to remove from simulation - - """ - # Remove model - del self._models[model_name] - - # TODO: Fully implement this - # Update simulation name file - - def is_valid(self): - """ - Checks the validity of the solution and all of its models and - packages. Returns true if the solution is valid, false if it is not. - - - Returns - -------- - valid : bool - Whether this is a valid simulation - - """ - # name file valid - if not self.name_file.is_valid(): - return False - - # tdis file valid - if not self._tdis_file.is_valid(): - return False - - # exchanges valid - for exchange in self._exchange_files: - if not exchange.is_valid(): - return False - - # solution files valid - for solution_file in self._solution_files.values(): - if not solution_file.is_valid(): - return False - - # a model exists - if not self._models: - return False - - # models valid - for key in self._models: - if not self._models[key].is_valid(): - return False - - # each model has a solution file - - return True - - @staticmethod - def _resolve_verbosity_level(verbosity_level): - if verbosity_level == 0: - return VerbosityLevel.quiet - elif verbosity_level == 1: - return VerbosityLevel.normal - elif verbosity_level == 2: - return VerbosityLevel.verbose - else: - return verbosity_level - - @staticmethod - def _get_package_path(package): - if package.parent_file is not None: - return (package.parent_file.path) + (package.package_type,) - else: - return (package.package_type,) - - def _update_solution_group(self, solution_file, new_name=None): - solution_recarray = self.name_file.solutiongroup - for solution_group_num in solution_recarray.get_active_key_list(): - try: - rec_array = solution_recarray.get_data(solution_group_num[0]) - except MFDataException as mfde: - message = ( - "An error occurred while getting solution group" - '"{}" from the simulation name file' - ".".format(solution_group_num[0]) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - - new_array = [] - for record in rec_array: - if record.slnfname == solution_file: - if new_name is not None: - record.slnfname = new_name - new_array.append(tuple(record)) - else: - continue - else: - new_array.append(record) - - if not new_array: - new_array = None - - solution_recarray.set_data(new_array, solution_group_num[0]) - - def _remove_from_all_solution_groups(self, modelname): - solution_recarray = self.name_file.solutiongroup - for solution_group_num in solution_recarray.get_active_key_list(): - try: - rec_array = solution_recarray.get_data(solution_group_num[0]) - except MFDataException as mfde: - message = ( - "An error occurred while getting solution group" - '"{}" from the simulation name file' - ".".format(solution_group_num[0]) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - new_array = ["no_check"] - for index, record in enumerate(rec_array): - new_record = [] - new_record.append(record[0]) - new_record.append(record[1]) - for item in list(record)[2:]: - if item is not None and item.lower() != modelname.lower(): - new_record.append(item) - new_array.append(tuple(new_record)) - solution_recarray.set_data(new_array, solution_group_num[0]) - - def _append_to_solution_group(self, solution_file, new_models): - # clear models out of solution groups - if new_models is not None: - for model in new_models: - self._remove_from_all_solution_groups(model) - - # append models to solution_file - solution_recarray = self.name_file.solutiongroup - for solution_group_num in solution_recarray.get_active_key_list(): - try: - rec_array = solution_recarray.get_data(solution_group_num[0]) - except MFDataException as mfde: - message = ( - "An error occurred while getting solution group" - '"{}" from the simulation name file' - ".".format(solution_group_num[0]) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - new_array = [] - for index, record in enumerate(rec_array): - new_record = [] - rec_model_dict = {} - for index, item in enumerate(record): - if ( - record[1] == solution_file or item not in new_models - ) and item is not None: - new_record.append(item) - if index > 1 and item is not None: - rec_model_dict[item.lower()] = 1 - - if record[1] == solution_file: - for model in new_models: - if model.lower() not in rec_model_dict: - new_record.append(model) - - new_array.append(tuple(new_record)) - solution_recarray.set_data(new_array, solution_group_num[0]) - - def _replace_solution_in_solution_group(self, item, index, new_item): - solution_recarray = self.name_file.solutiongroup - for solution_group_num in solution_recarray.get_active_key_list(): - try: - rec_array = solution_recarray.get_data(solution_group_num[0]) - except MFDataException as mfde: - message = ( - "An error occurred while getting solution group" - '"{}" from the simulation name file. The error ' - 'occurred while replacing solution file "{}" with "{}"' - 'at index "{}"'.format( - solution_group_num[0], item, new_item, index - ) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - if rec_array is not None: - for rec_item in rec_array: - if rec_item[index] == item: - rec_item[index] = new_item - - def _is_in_solution_group(self, item, index, any_idx_after=False): - solution_recarray = self.name_file.solutiongroup - for solution_group_num in solution_recarray.get_active_key_list(): - try: - rec_array = solution_recarray.get_data(solution_group_num[0]) - except MFDataException as mfde: - message = ( - "An error occurred while getting solution group" - '"{}" from the simulation name file. The error ' - 'occurred while verifying file "{}" at index "{}" ' - "is in the simulation name file" - ".".format(solution_group_num[0], item, index) - ) - raise MFDataException( - mfdata_except=mfde, package="nam", message=message - ) - - if rec_array is not None: - for rec_item in rec_array: - if any_idx_after: - for idx in range(index, len(rec_item)): - if rec_item[idx] == item: - return True - else: - if rec_item[index] == item: - return True - return False - - def plot( - self, - model_list: Optional[Union[str, List[str]]] = None, - SelPackList=None, - **kwargs, - ): - """ - Plot simulation or models. - - Method to plot a whole simulation or a series of models - that are part of a simulation. - - Parameters - ---------- - model_list: list, optional - List of model names to plot, if none all models will be plotted - SelPackList: list, optional - List of package names to plot, if none all packages will be - plotted - kwargs: - filename_base : str - Base file name that will be used to automatically - generate file names for output image files. Plots will be - exported as image files if file_name_base is not None. - (default is None) - file_extension : str - Valid matplotlib.pyplot file extension for savefig(). - Only used if filename_base is not None. (default is 'png') - mflay : int - MODFLOW zero-based layer number to return. If None, then - all layers will be included. (default is None) - kper : int - MODFLOW zero-based stress period number to return. - (default is zero) - key : str - MFList dictionary key. (default is None) - - Returns - -------- - axes: (list) - matplotlib.pyplot.axes objects - - """ - from ...plot.plotutil import PlotUtilities - - axes = PlotUtilities._plot_simulation_helper( - self, model_list=model_list, SelPackList=SelPackList, **kwargs + load_only, + verify_data, + write_headers, + lazy_io, ) - return axes diff --git a/flopy/mf6/utils/createpackages.py b/flopy/mf6/utils/createpackages.py index 4e5379da6..32b721b3d 100644 --- a/flopy/mf6/utils/createpackages.py +++ b/flopy/mf6/utils/createpackages.py @@ -295,6 +295,7 @@ def create_package_init_var( def add_var( init_vars, class_vars, + options_param_list, init_param_list, package_properties, doc_string, @@ -324,6 +325,8 @@ def add_var( if default_value is None: default_value = "None" init_param_list.append(f"{clean_ds_name}={default_value}") + if path is not None and "options" in path: + options_param_list.append(f"{clean_ds_name}={default_value}") # add to set parameter list set_param_list.append(f"{clean_ds_name}={clean_ds_name}") else: @@ -404,7 +407,7 @@ def build_model_load(model_type): " exe_name='mf6', strict=True, " "model_rel_path='.',\n" " load_only=None):\n " - "return mfmodel.MFModel.load_base(simulation, structure, " + "return mfmodel.MFModel.load_base(cls, simulation, structure, " "modelname,\n " "model_nam_file, '{}6', version,\n" " exe_name, strict, " @@ -415,13 +418,54 @@ def build_model_load(model_type): return model_load, model_load_c +def build_sim_load(): + sim_load_c = ( + " Methods\n -------\n" + " load : (sim_name : str, version : " + "string,\n exe_name : str or PathLike, " + "sim_ws : str or PathLike, strict : bool,\n verbosity_level : " + "int, load_only : list, verify_data : bool,\n " + "write_headers : bool, lazy_io : bool) : MFSimulation\n" + " a class method that loads a simulation from files" + '\n """' + ) + + sim_load = ( + " @classmethod\n def load(cls, sim_name='modflowsim', " + "version='mf6',\n " + "exe_name: Union[str, os.PathLike] = 'mf6',\n " + "sim_ws: Union[str, os.PathLike] = os.curdir,\n " + "strict=True, verbosity_level=1, load_only=None,\n " + "verify_data=False, write_headers=True,\n " + "lazy_io=False,):\n " + "return mfsimbase.MFSimulationBase.load(cls, sim_name, version, " + "\n " + "exe_name, sim_ws, strict,\n" + " verbosity_level, " + "load_only,\n " + "verify_data, write_headers, " + "\n lazy_io)" + "\n" + ) + return sim_load, sim_load_c + + def build_model_init_vars(param_list): init_var_list = [] + # build set data calls for param in param_list: param_parts = param.split("=") init_var_list.append( f" self.name_file.{param_parts[0]}.set_data({param_parts[0]})" ) + init_var_list.append("") + # build attributes + for param in param_list: + param_parts = param.split("=") + init_var_list.append( + f" self.{param_parts[0]} = self.name_file.{param_parts[0]}" + ) + return "\n".join(init_var_list) @@ -513,6 +557,7 @@ def create_packages(): package_properties = [] init_vars = [] init_param_list = [] + options_param_list = [] set_param_list = [] class_vars = [] template_gens = [] @@ -569,6 +614,7 @@ def create_packages(): add_var( init_vars, None, + options_param_list, init_param_list, package_properties, doc_string, @@ -589,6 +635,7 @@ def create_packages(): add_var( init_vars, None, + options_param_list, init_param_list, package_properties, doc_string, @@ -610,6 +657,7 @@ def create_packages(): add_var( init_vars, None, + options_param_list, init_param_list, package_properties, doc_string, @@ -640,6 +688,7 @@ def create_packages(): tg = add_var( init_vars, class_vars, + options_param_list, init_param_list, package_properties, doc_string, @@ -698,7 +747,7 @@ def create_packages(): ) ) init_string_full = init_string_def - init_string_model = f"{init_string_def}, simulation" + init_string_sim = f"{init_string_def}, simulation" # add variables to init string doc_string.add_parameter( " loading_package : bool\n " @@ -871,21 +920,20 @@ def create_packages(): if package[0].dfn_type == mfstructure.DfnType.model_name_file: # build model file - model_param_list = init_param_list[:-3] - init_vars = build_model_init_vars(model_param_list) - - model_param_list.insert(0, "model_rel_path='.'") - model_param_list.insert(0, "exe_name='mf6'") - model_param_list.insert(0, "version='mf6'") - model_param_list.insert(0, "model_nam_file=None") - model_param_list.insert(0, "modelname='model'") - model_param_list.append("**kwargs,") - init_string_model = build_init_string( - init_string_model, model_param_list - ) - model_name = clean_class_string(package[2]) + init_vars = build_model_init_vars(options_param_list) + + options_param_list.insert(0, "model_rel_path='.'") + options_param_list.insert(0, "exe_name='mf6'") + options_param_list.insert(0, "version='mf6'") + options_param_list.insert(0, "model_nam_file=None") + options_param_list.insert(0, "modelname='model'") + options_param_list.append("**kwargs,") + init_string_sim = build_init_string( + init_string_sim, options_param_list + ) + sim_name = clean_class_string(package[2]) class_def_string = "class Modflow{}(mfmodel.MFModel):\n".format( - model_name.capitalize() + sim_name.capitalize() ) class_def_string = class_def_string.replace("-", "_") doc_string.add_parameter( @@ -898,9 +946,9 @@ def create_packages(): model_parameter=True, ) doc_string.description = ( - f"Modflow{model_name} defines a {model_name} model" + f"Modflow{sim_name} defines a {sim_name} model" ) - class_var_string = f" model_type = '{model_name}'\n" + class_var_string = f" model_type = '{sim_name}'\n" mparent_init_string = " super().__init__(" spaces = " " * len(mparent_init_string) mparent_init_string = ( @@ -912,7 +960,7 @@ def create_packages(): "**kwargs," ")\n".format( mparent_init_string, - model_name, + sim_name, spaces, spaces, spaces, @@ -920,7 +968,7 @@ def create_packages(): spaces, ) ) - load_txt, doc_text = build_model_load(model_name) + load_txt, doc_text = build_model_load(sim_name) package_string = "{}\n{}\n\n\n{}{}\n{}\n{}\n{}{}\n{}\n\n{}".format( comment_string, nam_import_string, @@ -928,21 +976,103 @@ def create_packages(): doc_string.get_doc_string(True), doc_text, class_var_string, - init_string_model, + init_string_sim, mparent_init_string, init_vars, load_txt, ) md_file = open( - os.path.join(util_path, "..", "modflow", f"mf{model_name}.py"), + os.path.join(util_path, "..", "modflow", f"mf{sim_name}.py"), "w", newline="\n", ) md_file.write(package_string) md_file.close() init_file_imports.append( - f"from .mf{model_name} import Modflow{model_name.capitalize()}\n" + f"from .mf{sim_name} import Modflow{sim_name.capitalize()}\n" + ) + elif package[0].dfn_type == mfstructure.DfnType.sim_name_file: + # build simulation file + init_vars = build_model_init_vars(options_param_list) + + options_param_list.insert(0, "lazy_io=False") + options_param_list.insert(0, "write_headers=True") + options_param_list.insert(0, "verbosity_level=1") + options_param_list.insert( + 0, "sim_ws: Union[str, os.PathLike] = " "os.curdir" + ) + options_param_list.insert( + 0, "exe_name: Union[str, os.PathLike] " '= "mf6"' + ) + options_param_list.insert(0, "version='mf6'") + options_param_list.insert(0, "sim_name='sim'") + init_string_sim = " def __init__(self" + init_string_sim = build_init_string( + init_string_sim, options_param_list + ) + class_def_string = ( + "class MFSimulation(mfsimbase." "MFSimulationBase):\n" + ) + doc_string.add_parameter( + " sim_name : str\n" " Name of the simulation", + beginning_of_list=True, + model_parameter=True, ) + doc_string.description = ( + "MFSimulation is used to load, build, and/or save a MODFLOW " + "6 simulation. \n A MFSimulation object must be created " + "before creating any of the MODFLOW 6 \n model objects." + ) + sparent_init_string = " super().__init__(" + spaces = " " * len(sparent_init_string) + sparent_init_string = ( + "{}sim_name=sim_name,\n{}" + "version=version,\n{}" + "exe_name=exe_name,\n{}" + "sim_ws=sim_ws,\n{}" + "verbosity_level=verbosity_level,\n{}" + "write_headers=write_headers,\n{}" + "lazy_io=lazy_io,\n{}" + ")\n".format( + sparent_init_string, + spaces, + spaces, + spaces, + spaces, + spaces, + spaces, + spaces, + ) + ) + sim_import_string = ( + "import os\n" + "from typing import Union\n" + "from .. import mfsimbase" + ) + + load_txt, doc_text = build_sim_load() + package_string = "{}\n{}\n\n\n{}{}\n{}\n{}{}\n{}\n\n{}".format( + comment_string, + sim_import_string, + class_def_string, + doc_string.get_doc_string(False, True), + doc_text, + init_string_sim, + sparent_init_string, + init_vars, + load_txt, + ) + sim_file = open( + os.path.join(util_path, "..", "modflow", f"mfsimulation.py"), + "w", + newline="\n", + ) + sim_file.write(package_string) + sim_file.close() + init_file_imports.append( + "from .mfsimulation import MFSimulation\n" + ) + # Sort the imports for line in sorted(init_file_imports, key=lambda x: x.split()[3]): init_file.write(line) diff --git a/flopy/mf6/utils/generate_classes.py b/flopy/mf6/utils/generate_classes.py index d3872f437..ab078e33b 100644 --- a/flopy/mf6/utils/generate_classes.py +++ b/flopy/mf6/utils/generate_classes.py @@ -100,7 +100,7 @@ def delete_mf6_classes(): for entry in os.listdir(pth) if os.path.isfile(os.path.join(pth, entry)) ] - delete_files(files, pth, exclude="mfsimulation.py") + delete_files(files, pth) def generate_classes(