From 0d04f3ef07192d04f2b69d9ea6f49f4ad8f3093f Mon Sep 17 00:00:00 2001 From: wpbonelli Date: Fri, 19 Jul 2024 14:10:21 -0400 Subject: [PATCH] initial package impl --- flopy4/block.py | 89 ++++++++++++------------- flopy4/dfn.py | 2 +- flopy4/package.py | 113 ++++++++++++++++++++++++++++++++ flopy4/parameter.py | 79 ++++++++++++----------- flopy4/scalar.py | 47 +++++--------- test/test_block.py | 9 ++- test/test_package.py | 150 +++++++++++++++++++++++++++++++++++++++++++ test/test_scalar.py | 9 +-- 8 files changed, 374 insertions(+), 124 deletions(-) create mode 100644 flopy4/package.py create mode 100644 test/test_package.py diff --git a/flopy4/block.py b/flopy4/block.py index 17a7f6d..aaad808 100644 --- a/flopy4/block.py +++ b/flopy4/block.py @@ -1,61 +1,60 @@ -from collections.abc import MutableMapping -from typing import Any, Dict +from abc import ABCMeta +from collections import UserDict +from dataclasses import asdict +from typing import Any from flopy4.parameter import MFParameter, MFParameters from flopy4.utils import strip -def get_member_params(cls) -> Dict[str, MFParameter]: - if not issubclass(cls, MFBlock): - raise ValueError(f"Expected MFBlock, got {cls}") +class MFBlockMeta(type): + def __new__(cls, clsname, bases, attrs): + new = super().__new__(cls, clsname, bases, attrs) + if clsname == "MFBlock": + return new - return { - k: v - for k, v in cls.__dict__.items() - if issubclass(type(v), MFParameter) - } + # add parameter specification as class attribute + new.params = MFParameters( + { + k: v + for k, v in attrs.items() + if issubclass(type(v), MFParameter) + } + ) + return new -class MFBlock(MutableMapping): - def __init__(self, name=None, index=None, *args, **kwargs): +class MFBlockMappingMeta(MFBlockMeta, ABCMeta): + # http://www.phyast.pitt.edu/~micheles/python/metatype.html + pass + + +class MFBlock(MFParameters, metaclass=MFBlockMappingMeta): + def __init__(self, name=None, index=None, params=None): self.name = name self.index = index - self.params = MFParameters() - self.update(dict(*args, **kwargs)) + super().__init__(params) for key, param in self.items(): setattr(self, key, param) def __getattribute__(self, name: str) -> Any: + # shortcut to parameter value for instance attribute. + # the class attribute is the full parameter instance. attr = super().__getattribute__(name) - if isinstance(attr, MFParameter): - # shortcut to parameter value for instance attributes. - # the class attribute is the full param specification. - return attr.value - else: - return attr - - def __getitem__(self, key): - return self.params[key] - - def __setitem__(self, key, value): - self.params[key] = value + return attr.value if isinstance(attr, MFParameter) else attr - def __delitem__(self, key): - del self.params[key] - - def __iter__(self): - return iter(self.params) - - def __len__(self): - return len(self.params) + @property + def params(self) -> MFParameters: + """Block parameters.""" + return self.data @classmethod - def load(cls, f, strict=False): + def load(cls, f): name = None index = None found = False params = dict() - members = get_member_params(cls) + members = cls.params while True: pos = f.tell() @@ -70,13 +69,14 @@ def load(cls, f, strict=False): elif key == "end": break elif found: - if not strict or key in members: + param = members.get(key) + if param is not None: f.seek(pos) - param = members[key] - param.block = name - params[key] = type(param).load(f, spec=param) + params[key] = type(param).load( + f, **asdict(param.with_name(key).with_block(name)) + ) - return cls(name, index, **params) + return cls(name, index, params) def write(self, f): index = self.index if self.index is not None else "" @@ -89,5 +89,8 @@ def write(self, f): f.write(end) -class MFBlocks: - pass +class MFBlocks(UserDict): + """Mapping of block names to blocks.""" + + def __init__(self, blocks=None): + super().__init__(blocks) diff --git a/flopy4/dfn.py b/flopy4/dfn.py index 80182d5..212a02e 100644 --- a/flopy4/dfn.py +++ b/flopy4/dfn.py @@ -93,7 +93,7 @@ def get(self, key): return self._dfns[key] - #def get(self, component, subcomponent): + # def get(self, component, subcomponent): # key = f"{component.lower()}-{subcomponent.lower()}" # if key not in self._dfns: # raise ValueError("DFN does not exist in container") diff --git a/flopy4/package.py b/flopy4/package.py new file mode 100644 index 0000000..0ebce30 --- /dev/null +++ b/flopy4/package.py @@ -0,0 +1,113 @@ +from abc import ABCMeta +from collections import UserDict +from itertools import groupby +from typing import Any + +from flopy4.block import MFBlock, MFBlockMeta, MFBlocks +from flopy4.parameter import MFParameter, MFParameters +from flopy4.utils import strip + + +def get_block(clsname, params): + return MFBlockMeta(clsname, (MFBlock,), params)(params=params) + + +class MFPackageMeta(type): + def __new__(cls, clsname, bases, attrs): + new = super().__new__(cls, clsname, bases, attrs) + if clsname == "MFPackage": + return new + + # add parameter and block specification as class + # attributes. subclass mfblock dynamically based + # on each block parameter specification. + pkg_name = clsname.replace("Package", "") + params = MFParameters( + { + k: v.with_name(k) + for k, v in attrs.items() + if issubclass(type(v), MFParameter) + } + ) + new.params = params + new.blocks = MFBlocks( + { + block_name: get_block( + clsname=f"{pkg_name.title()}{block_name.title()}Block", + params={p.name: p for p in block}, + ) + for block_name, block in groupby( + params.values(), lambda p: p.block + ) + } + ) + return new + + +class MFPackageMappingMeta(MFPackageMeta, ABCMeta): + # http://www.phyast.pitt.edu/~micheles/python/metatype.html + pass + + +class MFPackage(UserDict, metaclass=MFPackageMappingMeta): + """ + MF6 package base class. + + TODO: reimplement with `ChainMap`? + """ + + def __getattribute__(self, name: str) -> Any: + value = super().__getattribute__(name) + if name == "data": + return value + + # shortcut to parameter value for instance attribute. + # the class attribute is the full parameter instance. + params = { + param_name: param + for block in self.data.values() + for param_name, param in block.items() + } + param = params.get(name) + return params[name].value if param is not None else value + + @property + def params(self) -> MFParameters: + """Package parameters.""" + return MFParameters( + { + name: param + for block in self.data + for name, param in block.items() + } + ) + + @classmethod + def load(cls, f): + """Load the package from file.""" + blocks = dict() + members = cls.blocks + + while True: + pos = f.tell() + line = f.readline() + if line == "": + break + line = strip(line).lower() + words = line.split() + key = words[0] + if key == "begin": + name = words[1] + block = members.get(name) + if block is not None: + f.seek(pos) + blocks[name] = type(block).load(f) + + pkg = cls() + pkg.update(blocks) + return pkg + + def write(self, f): + """Write the package to file.""" + for block in self.data.values(): + block.write(f) diff --git a/flopy4/parameter.py b/flopy4/parameter.py index f0cc827..9f48960 100644 --- a/flopy4/parameter.py +++ b/flopy4/parameter.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from collections.abc import MutableMapping +from collections import UserDict from dataclasses import dataclass, fields from enum import Enum from typing import Any, Optional @@ -44,11 +44,25 @@ class MFParamSpec: @classmethod def fields(cls): + """ + Get the MF6 input parameter field specification. + These uniquely describe the MF6 input parameter. + + Notes + ----- + This is equivalent to `dataclasses.fields(MFParamSpec)`. + """ return fields(cls) @classmethod def load(cls, f) -> "MFParamSpec": + """ + Load an MF6 input input parameter specification + from a definition file. + """ spec = dict() + members = cls.fields() + keywords = [f.name for f in members if f.type is bool] while True: line = f.readline() if not line or line == "\n": @@ -56,10 +70,7 @@ def load(cls, f) -> "MFParamSpec": words = line.strip().lower().split() key = words[0] val = " ".join(words[1:]) - # todo dynamically load properties and - # filter by type instead of hardcoding - kw_fields = [f.name for f in cls.fields() if f.type is bool] - if key in kw_fields: + if key in keywords: spec[key] = val == "true" elif key == "reader": spec[key] = MFReader.from_str(val) @@ -67,6 +78,16 @@ def load(cls, f) -> "MFParamSpec": spec[key] = val return cls(**spec) + def with_name(self, name) -> "MFParamSpec": + """Set the parameter name and return the parameter.""" + self.name = name + return self + + def with_block(self, block) -> "MFParamSpec": + """Set the parameter block and return the parameter.""" + self.block = block + return self + class MFParameter(MFParamSpec): """ @@ -77,20 +98,22 @@ class MFParameter(MFParamSpec): as a data access layer by which higher components (blocks, packages, etc) can read/write parameters. The former is a developer task (though it may be automated as classes are - generated from DFNs) while the latter are user-facing APIs. + generated from DFNs) while the latter happens at runtime, + but both APIs are user-facing; the user can first inspect + a package's specification via class attributes, then load + an input file and inspect the package data. Notes ----- Specification attributes are set at import time. A parent - block, when defining parameters as class attributes, will - supply a description, whether the parameter is mandatory, - and other information comprising the input specification. + block or package defines parameters as class attributes, + including a description, whether the parameter is optional, + and other information specifying the parameter. The parameter's value is an instance attribute that is set - at load time. The parameter's parent block will introspect - its constituent parameters, then load each parameter value - from the input file and assign an eponymous attribute with - a value property. This is akin to "hydrating" a definition + at load time. The parameter's parent component introspects + its constituent parameters then loads each parameter value + from the input file. This is like "hydrating" a definition from a data store as in single-page web applications (e.g. React, Vue) or ORM frameworks (Django). """ @@ -130,12 +153,6 @@ def __init__( default_value=default_value, ) - def __get__(self, instance, _): - if instance is None: - return self - else: - return self.value - @property @abstractmethod def value(self) -> Optional[Any]: @@ -143,24 +160,10 @@ def value(self) -> Optional[Any]: pass -class MFParameters(MutableMapping): - def __init__(self, *args, **kwargs): - self.params = dict() - self.update(dict(*args, **kwargs)) +class MFParameters(UserDict): + """Mapping of parameter names to parameters.""" + + def __init__(self, params=None): + super().__init__(params) for key, param in self.items(): setattr(self, key, param) - - def __getitem__(self, key): - return self.params[key] - - def __setitem__(self, key, value): - self.params[key] = value - - def __delitem__(self, key): - del self.params[key] - - def __iter__(self): - return iter(self.params) - - def __len__(self): - return len(self.params) diff --git a/flopy4/scalar.py b/flopy4/scalar.py index 68a5b8e..c511703 100644 --- a/flopy4/scalar.py +++ b/flopy4/scalar.py @@ -1,23 +1,13 @@ from abc import abstractmethod -from dataclasses import asdict from enum import Enum from pathlib import Path -from typing import Optional -from flopy4.parameter import MFParameter, MFParamSpec, MFReader +from flopy4.parameter import MFParameter, MFReader from flopy4.utils import strip PAD = " " -def _as_dict(spec: Optional[MFParamSpec]) -> dict: - return dict() if spec is None else asdict(spec) - - -def _or_empty(spec: Optional[MFParamSpec]) -> MFParamSpec: - return MFParamSpec() if spec is None else spec - - class MFScalar(MFParameter): @abstractmethod def __init__( @@ -99,7 +89,7 @@ def __init__( ) @classmethod - def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFKeyword": + def load(cls, f, **kwargs) -> "MFKeyword": line = strip(f.readline()).lower() if not any(line): @@ -107,9 +97,8 @@ def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFKeyword": if " " in line: raise ValueError("Keyword may not contain spaces") - spec = _or_empty(spec) - spec.name = line - return cls(value=True, **_as_dict(spec)) + kwargs["name"] = line + return cls(value=True, **kwargs) def write(self, f): if self.value: @@ -154,16 +143,15 @@ def __init__( ) @classmethod - def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFInteger": + def load(cls, f, **kwargs) -> "MFInteger": line = strip(f.readline()).lower() words = line.split() if len(words) != 2: raise ValueError("Expected space-separated: 1) keyword, 2) value") - spec = _or_empty(spec) - spec.name = words[0] - return cls(value=int(words[1]), **_as_dict(spec)) + kwargs["name"] = words[0] + return cls(value=int(words[1]), **kwargs) def write(self, f): f.write(f"{PAD}{self.name.upper()} {self.value}\n") @@ -207,16 +195,15 @@ def __init__( ) @classmethod - def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFDouble": + def load(cls, f, **kwargs) -> "MFDouble": line = strip(f.readline()).lower() words = line.split() if len(words) != 2: raise ValueError("Expected space-separated: 1) keyword, 2) value") - spec = _or_empty(spec) - spec.name = words[0] - return cls(value=float(words[1]), **_as_dict(spec)) + kwargs["name"] = words[0] + return cls(value=float(words[1]), **kwargs) def write(self, f): f.write(f"{PAD}{self.name.upper()} {self.value}\n") @@ -260,16 +247,15 @@ def __init__( ) @classmethod - def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFString": + def load(cls, f, **kwargs) -> "MFString": line = strip(f.readline()).lower() words = line.split() if len(words) != 2: raise ValueError("Expected space-separated: 1) keyword, 2) value") - spec = _or_empty(spec) - spec.name = words[0] - return cls(value=words[1], **_as_dict(spec)) + kwargs["name"] = words[0] + return cls(value=words[1], **kwargs) def write(self, f): f.write(f"{PAD}{self.name.upper()} {self.value}\n") @@ -326,7 +312,7 @@ def __init__( ) @classmethod - def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFFilename": + def load(cls, f, **kwargs) -> "MFFilename": line = strip(f.readline()) words = line.split() inout = [io.name for io in MFFileInout] @@ -339,12 +325,11 @@ def load(cls, f, spec: Optional[MFParamSpec] = None) -> "MFFilename": "3) file path" ) - spec = _or_empty(spec) - spec.name = words[0].lower() + kwargs["name"] = words[0].lower() return cls( inout=MFFileInout.from_str(words[1]), value=Path(words[2]), - **_as_dict(spec), + **kwargs, ) def write(self, f): diff --git a/test/test_block.py b/test/test_block.py index 2f66ce5..c7b37c3 100644 --- a/test/test_block.py +++ b/test/test_block.py @@ -1,4 +1,4 @@ -from flopy4.block import MFBlock, get_member_params +from flopy4.block import MFBlock from flopy4.scalar import MFDouble, MFFilename, MFInteger, MFKeyword, MFString @@ -13,8 +13,8 @@ class TestBlock(MFBlock): # a = MFArray(description="array") -def test_params(): - params = get_member_params(TestBlock) +def test_members(): + params = TestBlock.params assert len(params) == 5 k = params["k"] @@ -44,8 +44,7 @@ def test_params(): # a = params["a"] # assert isinstance(f, MFArray) - # assert f.description == "array" - # assert not f.optional + # assert a.description == "array" def test_load_write(tmp_path): diff --git a/test/test_package.py b/test/test_package.py new file mode 100644 index 0000000..529e278 --- /dev/null +++ b/test/test_package.py @@ -0,0 +1,150 @@ +from flopy4.block import MFBlock +from flopy4.package import MFPackage +from flopy4.scalar import MFDouble, MFFilename, MFInteger, MFKeyword, MFString + + +class TestPackage(MFPackage): + __test__ = False # tell pytest not to collect + + k = MFKeyword( + block="options", + description="keyword", + ) + i = MFInteger( + block="options", + description="int", + ) + d = MFDouble( + block="options", + description="double", + ) + s = MFString(block="options", description="string", optional=False) + f = MFFilename(block="options", description="filename", optional=False) + # a = MFArray(block="packagedata", description="array") + + +def test_member_params(): + params = TestPackage.params + assert len(params) == 5 + + k = params["k"] + assert isinstance(k, MFKeyword) + assert k.block == "options" + assert k.description == "keyword" + assert k.optional + + i = params["i"] + assert isinstance(i, MFInteger) + assert i.block == "options" + assert i.description == "int" + assert i.optional + + d = params["d"] + assert isinstance(d, MFDouble) + assert d.block == "options" + assert d.description == "double" + assert d.optional + + s = params["s"] + assert isinstance(s, MFString) + assert s.block == "options" + assert s.description == "string" + assert not s.optional + + f = params["f"] + assert isinstance(f, MFFilename) + assert f.block == "options" + assert f.description == "filename" + assert not f.optional + + # a = params["a"] + # assert isinstance(f, MFArray) + # assert a.block == "packagedata" + # assert a.description == "array" + + +def test_member_blocks(): + blocks = TestPackage.blocks + assert len(blocks) == 1 + + block = blocks["options"] + assert isinstance(block, MFBlock) + assert len(block.params) == 5 + + k = block["k"] + assert isinstance(k, MFKeyword) + assert k.description == "keyword" + assert k.optional + + i = block["i"] + assert isinstance(i, MFInteger) + assert i.description == "int" + assert i.optional + + d = block["d"] + assert isinstance(d, MFDouble) + assert d.description == "double" + assert d.optional + + s = block["s"] + assert isinstance(s, MFString) + assert s.description == "string" + assert not s.optional + + f = block["f"] + assert isinstance(f, MFFilename) + assert f.description == "filename" + assert not f.optional + + # a = block["a"] + # assert isinstance(f, MFArray) + # assert a.description == "array" + + +def test_load_write(tmp_path): + name = "test" + fpth = tmp_path / f"{name}.txt" + with open(fpth, "w") as f: + f.write("BEGIN OPTIONS\n") + f.write(" K\n") + f.write(" I 1\n") + f.write(" D 1.0\n") + f.write(" S value\n") + f.write(f" F FILEIN {fpth}\n") + f.write("END OPTIONS\n") + # todo another block + + # test package load + with open(fpth, "r") as f: + package = TestPackage.load(f) + + assert len(package.blocks) == 1 + assert len(package.params) == 5 + + # class attributes: param specifications + assert isinstance(TestPackage.k, MFKeyword) + assert TestPackage.k.name == "k" + assert TestPackage.k.block == "options" + assert TestPackage.k.description == "keyword" + + # instance attributes: shortcut access to param values + assert isinstance(package.k, bool) + assert package.k + assert package.i == 1 + assert package.d == 1.0 + assert package.s == "value" + assert package.f == fpth + + # test package write + fpth2 = tmp_path / f"{name}2.txt" + with open(fpth2, "w") as f: + package.write(f) + with open(fpth2, "r") as f: + lines = f.readlines() + assert "BEGIN OPTIONS \n" in lines + assert " K\n" in lines + assert " I 1\n" in lines + assert " D 1.0\n" in lines + assert " S value\n" in lines + assert f" F FILEIN {fpth}\n" in lines + assert "END OPTIONS\n" in lines diff --git a/test/test_scalar.py b/test/test_scalar.py index 3bfd76b..ee58955 100644 --- a/test/test_scalar.py +++ b/test/test_scalar.py @@ -1,6 +1,5 @@ import pytest -from flopy4.parameter import MFParamSpec from flopy4.scalar import MFDouble, MFFilename, MFInteger, MFKeyword @@ -13,11 +12,9 @@ def test_keyword_load(tmp_path): with open(fpth, "r") as f: scalar = MFKeyword.load( f, - MFParamSpec( - block="options", - description="test keyword parameter", - optional=False, - ), + block="options", + description="test keyword parameter", + optional=False, ) assert scalar.name == name assert scalar.value