Skip to content

Commit

Permalink
ignore some ruff errors, fix validation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jcschaff committed Aug 15, 2024
1 parent ac5cf8a commit 26a0c47
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 101 deletions.
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ ignore = [
"E501",
# DoNotAssignLambda
"E731",
# avoid specifiying long messages outside the exception class
"TRY003",
# avoid functions which are too complex
"C901",
# avoid uses of Uses of `tarfile.extractall()
"S202",
# B008 Do not perform function call `typer.Argument` in argument defaults
"B008",
]

[tool.ruff.format]
Expand Down
43 changes: 26 additions & 17 deletions pyvcell/simdata/mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ def read(self) -> None:
# get line enumerator from f

iter_lines = iter(f.readlines())
assert next(iter_lines) == "Version 1.2\n"
assert next(iter_lines) == "CartesianMesh {\n"
assert next(iter_lines) == "\t// X Y Z\n"

if next(iter_lines) != "Version 1.2\n":
raise RuntimeError("Expected 'Version 1.2' at the beginning of the file")
if next(iter_lines) != "CartesianMesh {\n":
raise RuntimeError("Expected 'CartesianMesh {' after version")
if next(iter_lines) != "\t// X Y Z\n":
raise RuntimeError("Expected coordinate comment line")

size_line = next(iter_lines).split()
if size_line[0] == "Size":
Expand All @@ -129,24 +133,25 @@ def read(self) -> None:
num_volume_regions = int(next(iter_lines))
_header_line = next(iter_lines)
self.volume_regions = []
for i in range(num_volume_regions):
for _i in range(num_volume_regions):
parts = next(iter_lines).split()
self.volume_regions.append((int(parts[0]), int(parts[1]), float(parts[2]), parts[3].strip("//")))
self.volume_regions.append((int(parts[0]), int(parts[1]), float(parts[2]), parts[3].strip("/")))

while next(iter_lines) != "\tMembraneRegionsMapVolumeRegion {\n":
pass
num_membrane_regions = int(next(iter_lines))
_header_line = next(iter_lines)
self.membrane_regions = []
for i in range(num_membrane_regions):
for _i in range(num_membrane_regions):
parts = next(iter_lines).split()
self.membrane_regions.append((int(parts[0]), int(parts[1]), int(parts[2]), float(parts[3])))

while next(iter_lines) != "\tVolumeElementsMapVolumeRegion {\n":
pass
compressed_line = next(iter_lines).split()
num_volume_elements = int(compressed_line[0])
assert compressed_line[1] == "Compressed"
if compressed_line[1] != "Compressed":
raise ValueError("Expected 'Compressed' in VolumeElementsMapVolumeRegion")
# read HEX lines until "}" line, and concatenate into one string, then convert to bytes and decompress
hex_lines = []
while True:
Expand All @@ -156,12 +161,14 @@ def read(self) -> None:
hex_lines.append(line.strip())
hex_string: str = "".join(hex_lines).strip()
compressed_bytes = bytes.fromhex(hex_string)
# assert len(compressed_bytes) == num_compressed_bytes
uncompressed_bytes: bytes = zlib.decompress(compressed_bytes)
self.volume_region_map = np.frombuffer(uncompressed_bytes, dtype="<u2") # unsigned 2-byte integers
assert self.volume_region_map.shape[0] == self.size[0] * self.size[1] * self.size[2]
assert num_volume_elements == self.volume_region_map.shape[0]
assert set(np.unique(self.volume_region_map)) == set([v[0] for v in self.volume_regions])
if self.volume_region_map.shape[0] != self.size[0] * self.size[1] * self.size[2]:
raise ValueError("Expected number of volume elements to match the size of volume region map")
if num_volume_elements != self.volume_region_map.shape[0]:
raise ValueError("Expected number of volume elements to match the size of volume region map")
if set(np.unique(self.volume_region_map)) != {v[0] for v in self.volume_regions}:
raise ValueError("Expected volume region map to have the same unique values as volume regions")

while next(iter_lines).strip() != "MembraneElements {":
pass
Expand All @@ -184,8 +191,10 @@ def read(self) -> None:
mem_reg_id = int(parts[7])
self.membrane_elements[mem_index, :] = [idx, vol1, vol2, conn0, conn1, conn2, conn3, mem_reg_id]
mem_index += 1
assert self.membrane_elements.shape == (num_membrane_elements, 8)
assert set(np.unique(self.membrane_elements[:, 7])) == set([v[0] for v in self.membrane_regions])
if self.membrane_elements.shape != (num_membrane_elements, 8):
raise RuntimeError("Expected membrane elements to have the correct shape")
if set(np.unique(self.membrane_elements[:, 7])) != {v[0] for v in self.membrane_regions}:
raise RuntimeError("Expected volume region ids in membrane elements to match volume regions")

def get_volume_element_box(self, i: int, j: int, k: int) -> Box3D:
x_lo = self.origin[0] + i * self.extent[0] / self.size[0]
Expand All @@ -200,16 +209,16 @@ def get_membrane_region_index(self, mem_element_index: int) -> int:
return int(self.membrane_elements[mem_element_index, 7])

def get_membrane_region_ids(self, volume_domain_name: str) -> set[int]:
return set([
return {
mem_reg_id
for mem_reg_id, vol_reg1, vol_reg2, surface in self.membrane_regions
if self.volume_regions[vol_reg1][3] == volume_domain_name
or self.volume_regions[vol_reg2][3] == volume_domain_name
])
}

def get_volume_region_ids(self, volume_domain_name: str) -> set[int]:
return set([
return {
vol_reg_id
for vol_reg_id, subvol_id, volume, domain_name in self.volume_regions
if domain_name == volume_domain_name
])
}
24 changes: 12 additions & 12 deletions pyvcell/simdata/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ def __init__(self, name: str, group_path: str):
def get_dataset(self, hdf5_file: H5File, time_index: int) -> Dataset:
group_path_object = hdf5_file[self.group_path]
if not isinstance(group_path_object, Group):
raise ValueError(f"Expected a group at {self.group_path} but found {type(group_path_object)}")
raise TypeError(f"Expected a group at {self.group_path} but found {type(group_path_object)}")
image_group: Group = group_path_object
dataset_path_object = image_group[f"time{time_index:06d}"]
if not isinstance(dataset_path_object, Dataset):
raise ValueError(
raise TypeError(
f"Expected a dataset at {self.group_path}/time{time_index:06d} but found {type(dataset_path_object)}"
)
image_ds: Dataset = dataset_path_object
Expand All @@ -40,7 +40,7 @@ def get_dataset(self, hdf5_file: H5File, time_index: int) -> Dataset:
def read(self, f: H5File) -> None:
group_path_object = f[self.group_path]
if not isinstance(group_path_object, Group):
raise ValueError(f"Expected a group at {self.group_path} but found {type(group_path_object)}")
raise TypeError(f"Expected a group at {self.group_path} but found {type(group_path_object)}")
image_group: Group = group_path_object

# get attributes from the group
Expand Down Expand Up @@ -96,11 +96,11 @@ def __init__(self, postprocessing_hdf5_path: Path):

def read(self) -> None:
# read the file as hdf5
with H5File(name=self.postprocessing_hdf5_path, mode="r") as file: # type: ignore
with H5File(name=self.postprocessing_hdf5_path, mode="r") as file: # type: ignore[call-arg]
# read dataset with path /PostProcessing/Times
postprocessing_times_object = file["/PostProcessing/Times"]
if not isinstance(postprocessing_times_object, Dataset):
raise ValueError(
raise TypeError(
f"Expected a dataset at /PostProcessing/Times but found {type(postprocessing_times_object)}"
)
times_ds: Dataset = postprocessing_times_object
Expand All @@ -121,7 +121,7 @@ def read(self) -> None:
#
var_stats_grp_object = file["/PostProcessing/VariableStatistics"]
if not isinstance(var_stats_grp_object, Group):
raise ValueError(
raise TypeError(
f"Expected a group at /PostProcessing/VariableStatistics but found {type(var_stats_grp_object)}"
)
var_stats_grp: Group = var_stats_grp_object
Expand All @@ -132,7 +132,7 @@ def read(self) -> None:
parts = k.split("_")
channel = int(parts[1])
if not isinstance(v, bytes):
raise ValueError(f"Expected a bytes object for attribute {k} but found {type(v)}")
raise TypeError(f"Expected a bytes object for attribute {k} but found {type(v)}")
value = v.decode("utf-8")
if parts[2] == "name":
var_name_by_channel[channel] = value
Expand Down Expand Up @@ -162,7 +162,7 @@ def read(self) -> None:
for time_index in range(len(self.times)):
time_ds_object = var_stats_grp[f"time{time_index:06d}"]
if not isinstance(time_ds_object, Dataset):
raise ValueError(
raise TypeError(
f"Expected a dataset at /PostProcessing/VariableStatistics/time{time_index:06d} "
f"but found {type(time_ds_object)}"
)
Expand All @@ -176,8 +176,8 @@ def read(self) -> None:
# e.g. /PostProcessing/fluor
postprocessing_dataset = file["/PostProcessing"]
if not isinstance(postprocessing_dataset, Group):
raise ValueError(f"Expected a group at /PostProcessing but found {type(postprocessing_dataset)}")
image_groups = [k for k in postprocessing_dataset.keys() if k not in ["Times", "VariableStatistics"]]
raise TypeError(f"Expected a group at /PostProcessing but found {type(postprocessing_dataset)}")
image_groups = [k for k in postprocessing_dataset if k not in ["Times", "VariableStatistics"]]

# for each image group, read the metadata to allow reading later
for image_group in image_groups:
Expand All @@ -186,6 +186,6 @@ def read(self) -> None:
self.image_metadata.append(metadata)

def read_image_data(self, image_metadata: ImageMetadata, time_index: int) -> np.ndarray:
with H5File(self.postprocessing_hdf5_path, "r") as file: # type: ignore
with H5File(name=self.postprocessing_hdf5_path, mode="r") as file: # type: ignore[call-arg]
image_ds = image_metadata.get_dataset(hdf5_file=file, time_index=time_index)
return image_ds[()] # type: ignore
return np.array(image_ds[()])
51 changes: 24 additions & 27 deletions pyvcell/simdata/simdata_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import IO, Literal, Optional
from zipfile import ZipFile

import numexpr as ne # type: ignore
import numexpr as ne # type: ignore[import-untyped]
import numpy
import numpy as np

Expand Down Expand Up @@ -137,16 +137,15 @@ def __init__(self, zip_file: Path, zip_entry: str) -> None:
self.zip_entry = zip_entry

def read(self) -> None:
with ZipFile(self.zip_file, "r") as zip_file:
with zip_file.open(self.zip_entry) as f:
self.file_header = DataFileHeader()
self.file_header.read(f)
blocks = []
for _ in range(self.file_header.num_blocks):
data_block = DataBlockHeader()
data_block.read(f)
blocks.append(data_block)
self.data_blocks = blocks
with ZipFile(self.zip_file, "r") as zip_file, zip_file.open(self.zip_entry) as f:
self.file_header = DataFileHeader()
self.file_header.read(f)
blocks = []
for _ in range(self.file_header.num_blocks):
data_block = DataBlockHeader()
data_block.read(f)
blocks.append(data_block)
self.data_blocks = blocks

def get_data_block_header(self, variable: VariableInfo | str) -> DataBlockHeader:
for db in self.data_blocks:
Expand Down Expand Up @@ -206,7 +205,7 @@ def variables_block_headers(self) -> list[DataBlockHeader]:
first_zip_entry = self.first_data_zip_file_metadata()
if first_zip_entry is None:
return []
return [db for db in first_zip_entry.data_blocks]
return first_zip_entry.data_blocks

def _get_data_zip_file_metadata(self, time: float) -> DataZipFileMetadata:
zip_entry = self.data_zip_file_metadata.get(time)
Expand All @@ -222,17 +221,16 @@ def get_data(self, variable: VariableInfo | str, time: float) -> numpy.ndarray:
zip_file_entry: DataZipFileMetadata = self._get_data_zip_file_metadata(time)
data_block_header: DataBlockHeader = zip_file_entry.get_data_block_header(variable)

with ZipFile(zip_file_entry.zip_file, "r") as zip_file:
with zip_file.open(zip_file_entry.zip_entry, mode="r") as f:
f.seek(data_block_header.data_offset)
buffer = bytearray(0)
bytes_left_to_read = data_block_header.size * 8
while bytes_left_to_read > 0:
bytes_read = f.read(bytes_left_to_read)
buffer.extend(bytes_read)
bytes_left_to_read -= len(bytes_read)
array = np.frombuffer(buffer, dtype=NUMPY_FLOAT_DTYPE)
return array
with ZipFile(zip_file_entry.zip_file, "r") as zip_file, zip_file.open(zip_file_entry.zip_entry, mode="r") as f:
f.seek(data_block_header.data_offset)
buffer = bytearray(0)
bytes_left_to_read = data_block_header.size * 8
while bytes_left_to_read > 0:
bytes_read = f.read(bytes_left_to_read)
buffer.extend(bytes_read)
bytes_left_to_read -= len(bytes_read)
array = np.frombuffer(buffer, dtype=NUMPY_FLOAT_DTYPE)
return array


class NamedFunction:
Expand All @@ -256,10 +254,9 @@ def evaluate(self, variable_bindings: dict[str, np.ndarray]) -> np.ndarray:
ne.set_num_threads(1)
expression = self.python_expression
result = ne.evaluate(expression, local_dict=variable_bindings)
if isinstance(result, np.ndarray):
return result
else:
raise ValueError(f"Expression {expression} did not evaluate to a numpy array")
if not isinstance(result, np.ndarray):
raise TypeError(f"Expression {expression} did not evaluate to a numpy array")
return result

def __str__(self) -> str:
return (
Expand Down
4 changes: 2 additions & 2 deletions pyvcell/simdata/vtk/fv_mesh_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def from_mesh_data(cartesian_mesh: CartesianMesh, domain_name: str, b_volume: bo
elif dimension == 2 and not b_volume:
return from_mesh2d_membrane(cartesian_mesh, domain_name)
else:
raise Exception(f"unsupported: mesh dimension = {dimension}, volumeDomain = {b_volume}")
raise ValueError(f"unsupported: mesh dimension = {dimension}, volumeDomain = {b_volume}")


def to_string_key(vis_point: VisPoint, precision: int = 8) -> str:
Expand Down Expand Up @@ -112,7 +112,7 @@ def from_mesh3d_membrane(cartesian_mesh: CartesianMesh, membrane_region_ids: set
VisPoint(inside_box.x_lo, inside_box.y_hi, z),
]
else:
raise Exception("inside/outside volume indices not reconciled in membraneElement")
raise ValueError("inside/outside volume indices not reconciled in membraneElement")

indices = []
for vis_point in vis_points:
Expand Down
Loading

0 comments on commit 26a0c47

Please sign in to comment.