diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ec6c17d7c..c7b414d33 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -43,7 +43,7 @@ jobs: run: | MINIFORGE_INSTALL_DIR=.miniforge3 . "$MINIFORGE_INSTALL_DIR/bin/activate" testing - python -m pip install types-psutil + python -m pip install types-psutil types-PyYAML ./run-mypy.sh pylint: diff --git a/mirgecom/io.py b/mirgecom/io.py index dfbf9d42b..628605361 100644 --- a/mirgecom/io.py +++ b/mirgecom/io.py @@ -3,6 +3,7 @@ .. autofunction:: make_status_message .. autofunction:: make_rank_fname .. autofunction:: make_par_fname +.. autofunction:: read_and_distribute_yaml_data """ __copyright__ = """ @@ -38,7 +39,8 @@ def make_init_message(*, dim, order, dt, t_final, nstatus, nviz, cfl, constant_cfl, initname, eosname, casename, - nelements=0, global_nelements=0): + nelements=0, global_nelements=0, + t_initial=0): """Create a summary of some general simulation parameters and inputs.""" return ( f"Initialization for Case({casename})\n" @@ -46,6 +48,7 @@ def make_init_message(*, dim, order, dt, t_final, f"Num {dim}d order-{order} elements: {nelements}\n" f"Num global elements: {global_nelements}\n" f"Timestep: {dt}\n" + f"Initial time: {t_initial}\n" f"Final time: {t_final}\n" f"CFL: {cfl}\n" f"Constant CFL: {constant_cfl}\n" @@ -77,3 +80,26 @@ def make_rank_fname(basename, rank=0, step=0, t=0): def make_par_fname(basename, step=0, t=0): r"""Make parallel visualization filename.""" return f"{basename}-{step:09d}.pvtu" + + +def read_and_distribute_yaml_data(mpi_comm=None, file_path=None): + """Read a YAML file on one rank, broadcast result to world.""" + import yaml + + input_data = None + if file_path is None: + return input_data + + rank = 0 + + if mpi_comm is not None: + rank = mpi_comm.Get_rank() + + if rank == 0: + with open(file_path) as f: + input_data = yaml.load(f, Loader=yaml.FullLoader) + + if mpi_comm is not None: + input_data = mpi_comm.bcast(input_data, root=0) + + return input_data diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 2a08ea8bb..dc878cde7 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -17,13 +17,25 @@ .. autofunction:: max_component_norm .. autofunction:: check_naninf_local .. autofunction:: check_range_local +.. autofunction:: boundary_report Mesh and element utilities -------------------------- +.. autofunction:: geometric_mesh_partitioner .. autofunction:: generate_and_distribute_mesh .. autofunction:: get_number_of_tetrahedron_nodes +Simulation support utilities +---------------------------- + +.. autofunction:: configurate + +Lazy eval utilities +------------------- + +.. autofunction:: force_evaluation + File comparison utilities ------------------------- @@ -57,31 +69,33 @@ """ import logging import numpy as np -import grudge.op as op - -from arraycontext import map_array_container, flatten - from functools import partial +import grudge.op as op +# from grudge.op import nodal_min, elementwise_min +from arraycontext import map_array_container, flatten from meshmode.dof_array import DOFArray +from mirgecom.viscous import get_viscous_timestep from typing import List, Dict, Optional from grudge.discretization import DiscretizationCollection from grudge.dof_desc import DD_VOLUME_ALL -from mirgecom.viscous import get_viscous_timestep - +from mirgecom.utils import normalize_boundaries import pyopencl as cl logger = logging.getLogger(__name__) -def get_number_of_tetrahedron_nodes(dim, order): +def get_number_of_tetrahedron_nodes(dim, order, include_faces=False): """Get number of nodes (modes) in *dim* Tetrahedron of *order*.""" # number of {nodes, modes} see e.g.: # JSH/TW Nodal DG Methods, Section 10.1 # DOI: 10.1007/978-0-387-72067-8 - return int(np.math.factorial(dim+order) - / (np.math.factorial(dim) * np.math.factorial(order))) + nnodes = int(np.math.factorial(dim+order) + / (np.math.factorial(dim) * np.math.factorial(order))) + if include_faces: + nnodes = nnodes + (dim+1)*get_number_of_tetrahedron_nodes(dim-1, order) + return nnodes def check_step(step, interval): @@ -431,7 +445,335 @@ def max_component_norm(dcoll, fields, order=np.inf, *, dd=DD_VOLUME_ALL): componentwise_norms(dcoll, fields, order, dd=dd), actx))) -def generate_and_distribute_mesh(comm, generate_mesh): +class PartitioningError(Exception): + """Error tossed to indicate an error with domain decomposition.""" + + pass + + +def geometric_mesh_partitioner(mesh, num_ranks=None, *, nranks_per_axis=None, + auto_balance=False, imbalance_tolerance=.01, + debug=False): + """Partition a mesh uniformly along the X coordinate axis. + + The intent is to partition the mesh uniformly along user-specified + directions. In this current interation, the capability is demonstrated + by splitting along the X axis. + + Parameters + ---------- + mesh: :class:`meshmode.mesh.Mesh` + The serial mesh to partition + num_ranks: int + The number of partitions to make (deprecated) + nranks_per_axis: numpy.ndarray + How many partitions per specified axis. + auto_balance: bool + Indicates whether to perform automatic balancing. If true, the + partitioner will try to balance the number of elements over + the partitions. + imbalance_tolerance: float + If *auto_balance* is True, this parameter indicates the acceptable + relative difference to the average number of elements per partition. + It defaults to balance within 1%. + debug: bool + En/disable debugging/diagnostic print reporting. + + Returns + ------- + elem_to_rank: numpy.ndarray + Array indicating the MPI rank for each element + """ + mesh_dimension = mesh.dim + if nranks_per_axis is None or num_ranks is not None: + from warnings import warn + warn("num_ranks is deprecated, use nranks_per_axis instead.") + num_ranks = num_ranks or 1 + nranks_per_axis = np.ones(mesh_dimension, dtype=np.int32) + nranks_per_axis[0] = num_ranks + if len(nranks_per_axis) != mesh_dimension: + raise ValueError("nranks_per_axis must match mesh dimension.") + num_ranks = np.prod(nranks_per_axis) + if np.prod(nranks_per_axis[1:]) != 1: + raise NotImplementedError("geometric_mesh_partitioner currently only " + "supports partitioning in the X-dimension." + "(only nranks_per_axis[0] should be > 1).") + mesh_verts = mesh.vertices + mesh_x = mesh_verts[0] + + x_min = np.min(mesh_x) + x_max = np.max(mesh_x) + x_interval = x_max - x_min + part_loc = np.linspace(x_min, x_max, num_ranks+1) + + part_interval = x_interval / nranks_per_axis[0] + + all_elem_group_centroids = [] + for group in mesh.groups: + elem_group_x = mesh_verts[0, group.vertex_indices] + elem_group_centroids = np.sum(elem_group_x, axis=1)/elem_group_x.shape[1] + all_elem_group_centroids.append(elem_group_centroids) + elem_centroids = np.concatenate(all_elem_group_centroids) + global_nelements = len(elem_centroids) + + aver_part_nelem = global_nelements / num_ranks + + if debug: + print(f"Partitioning {global_nelements} elements in" + f" [{x_min},{x_max}]/{num_ranks}") + print(f"Average nelem/part: {aver_part_nelem}") + print(f"Initial part locs: {part_loc=}") + + # Create geometrically even partitions + elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) + + print(f"{elem_to_rank=}") + + # map partition id to list of elements in that partition + part_to_elements = {r: set(np.where(elem_to_rank == r)[0]) + for r in range(num_ranks)} + # make an array of the geometrically even partition sizes + # avoids calling "len" over and over on the element sets + nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] + + if debug: + print(f"Initial: {nelem_part=}") + + # Automatic load-balancing + if auto_balance: + + for r in range(num_ranks-1): + + # find the element reservoir (next part with elements in it) + # adv_part = r + 1 + # while nelem_part[adv_part] == 0: + # adv_part = adv_part + 1 + + num_elem_needed = aver_part_nelem - nelem_part[r] + part_imbalance = np.abs(num_elem_needed) / float(aver_part_nelem) + + if debug: + print(f"Processing part({r=})") + print(f"{part_loc[r]=}") + print(f"{num_elem_needed=}, {part_imbalance=}") + print(f"{nelem_part=}") + + niter = 0 + total_change = 0 + moved_elements = set() + + adv_part = r + 1 + # while ((part_imbalance > imbalance_tolerance) + # and (adv_part < num_ranks)): + while part_imbalance > imbalance_tolerance: + # This partition needs to keep changing in size until it meets the + # specified imbalance tolerance, or gives up trying + + # seek out the element reservoir + while nelem_part[adv_part] == 0: + adv_part = adv_part + 1 + if adv_part >= num_ranks: + raise PartitioningError("Ran out of elements to partition.") + + if debug: + print(f"-{nelem_part[r]=}, adv_part({adv_part})," + f" {nelem_part[adv_part]=}") + print(f"-{part_loc[r+1]=},{part_loc[adv_part+1]=}") + print(f"-{num_elem_needed=},{part_imbalance=}") + + if niter > 100: + raise PartitioningError("Detected too many iterations in" + " partitioning.") + + # The purpose of the next block is to populate the "moved_elements" + # data structure. Then those elements will be moved between the + # current partition being processed and the "reservoir," + # *and* to adjust the position of the "right" side of the current + # partition boundary. + moved_elements = set() + num_elements_added = 0 + + if num_elem_needed > 0: + + # Partition is SMALLER than it should be, grab elements from + # the reservoir + if debug: + print(f"-Grabbing elements from reservoir({adv_part})" + f", {nelem_part[adv_part]=}") + + portion_needed = (float(abs(num_elem_needed)) + / float(nelem_part[adv_part])) + portion_needed = min(portion_needed, 1.0) + + if debug: + print(f"--Chomping {portion_needed*100}% of" + f" reservoir({adv_part}) [by nelem].") + + if portion_needed == 1.0: # Chomp + new_loc = part_loc[adv_part+1] + moved_elements.update(part_to_elements[adv_part]) + + else: # Bite + # This is the spatial size of the reservoir + reserv_interval = part_loc[adv_part+1] - part_loc[r+1] + + # Find what portion of the reservoir to grab spatially + # This part is needed because the elements are not + # distributed uniformly in space. + fine_tuned = False + trial_portion_needed = portion_needed + while not fine_tuned: + pos_update = trial_portion_needed*reserv_interval + new_loc = part_loc[r+1] + pos_update + + moved_elements = set() + num_elem_mv = 0 + for e in part_to_elements[adv_part]: + if elem_centroids[e] <= new_loc: + moved_elements.add(e) + num_elem_mv = num_elem_mv + 1 + if num_elem_mv < num_elem_needed: + fine_tuned = True + else: + ovrsht = (num_elem_mv - num_elem_needed) + rel_ovrsht = ovrsht/float(num_elem_needed) + if rel_ovrsht > 0.8: + # bisect the space grabbed and try again + trial_portion_needed = trial_portion_needed/2.0 + else: + fine_tuned = True + + portion_needed = trial_portion_needed + new_loc = part_loc[r+1] + pos_update + if debug: + print(f"--Tuned: {portion_needed=} [by spatial volume]") + print(f"--Advancing part({r}) by +{pos_update}") + + num_elements_added = len(moved_elements) + if debug: + print(f"--Adding {num_elements_added} to part({r}).") + + else: + + # Partition is LARGER than it should be + # Grab the spatial size of the current partition + # to estimate the portion we need to shave off + # assuming uniform element density + part_interval = part_loc[r+1] - part_loc[r] + num_to_move = -num_elem_needed + portion_needed = num_to_move/float(nelem_part[r]) + + if debug: + print(f"--Shaving off {portion_needed*100}% of" + f" partition({r}) [by nelem].") + + # Tune the shaved portion to account for + # non-uniform element density + fine_tuned = False + while not fine_tuned: + pos_update = portion_needed*part_interval + new_pos = part_loc[r+1] - pos_update + moved_elements = set() + num_elem_mv = 0 + for e in part_to_elements[r]: + if elem_centroids[e] > new_pos: + moved_elements.add(e) + num_elem_mv = num_elem_mv + 1 + if num_elem_mv < num_to_move: + fine_tuned = True + else: + ovrsht = (num_elem_mv - num_to_move) + rel_ovrsht = ovrsht/float(num_to_move) + if rel_ovrsht > 0.8: + # bisect and try again + portion_needed = portion_needed/2.0 + else: + fine_tuned = True + + # new "right" wall location of shranken part + # and negative num_elements_added for removal + new_loc = new_pos + num_elements_added = -len(moved_elements) + if debug: + print(f"--Reducing partition size by {portion_needed*100}%" + " [by nelem].") + print(f"--Removing {-num_elements_added} from part({r}).") + + # Now "moved_elements", "num_elements_added", and "new_loc" + # are computed. Update the partition, and reservoir. + if debug: + print(f"--Number of elements to ADD: {num_elements_added}.") + + if num_elements_added > 0: + part_to_elements[r].update(moved_elements) + part_to_elements[adv_part].difference_update( + moved_elements) + for e in moved_elements: + elem_to_rank[e] = r + else: + part_to_elements[r].difference_update(moved_elements) + part_to_elements[adv_part].update(moved_elements) + for e in moved_elements: + elem_to_rank[e] = adv_part + + total_change = total_change + num_elements_added + part_loc[r+1] = new_loc + if debug: + print(f"--Before: {nelem_part=}") + nelem_part[r] = nelem_part[r] + num_elements_added + nelem_part[adv_part] = nelem_part[adv_part] - num_elements_added + if debug: + print(f"--After: {nelem_part=}") + + # Compute new nelem_needed and part_imbalance + num_elem_needed = num_elem_needed - num_elements_added + part_imbalance = \ + np.abs(num_elem_needed) / float(aver_part_nelem) + niter = niter + 1 + + # Summarize the total change and state of the partition + # and reservoir + if debug: + print(f"-Part({r}): {total_change=}") + print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") + print(f"-Part({adv_part}): {nelem_part[adv_part]=}") + + # Validate the partitioning before returning + total_partitioned_elements = sum([len(part_to_elements[r]) + for r in range(num_ranks)]) + total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) + + if debug: + print("Validating mesh parts.") + + if total_partitioned_elements != total_nelem_part: + raise PartitioningError("Validator: parted element counts dont match") + if total_partitioned_elements != global_nelements: + raise PartitioningError("Validator: global element counts dont match.") + if len(elem_to_rank) != global_nelements: + raise PartitioningError("Validator: elem-to-rank wrong size.") + if np.any(nelem_part) <= 0: + raise PartitioningError("Validator: empty partitions.") + + for e in range(global_nelements): + part = elem_to_rank[e] + if e not in part_to_elements[part]: + raise PartitioningError("Validator: part/element/part map mismatch.") + + part_counts = np.zeros(global_nelements) + for part_elements in part_to_elements.values(): + for element in part_elements: + part_counts[element] = part_counts[element] + 1 + + if np.any(part_counts > 1): + raise PartitioningError("Validator: degenerate elements") + if np.any(part_counts < 1): + raise PartitioningError("Validator: orphaned elements") + + return elem_to_rank + + +def generate_and_distribute_mesh(comm, generate_mesh, **kwargs): """Generate a mesh and distribute it among all ranks in *comm*. Generate the mesh with the user-supplied mesh generation function @@ -489,6 +831,66 @@ def create_parallel_grid(comm, generate_grid): return generate_and_distribute_mesh(comm=comm, generate_mesh=generate_grid) +def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL, + mesh=None): + """Generate a report of the grid boundaries.""" + boundaries = normalize_boundaries(boundaries) + + comm = dcoll.mpi_communicator + nproc = 1 + rank = 0 + if comm is not None: + nproc = comm.Get_size() + rank = comm.Get_rank() + + if mesh is not None: + nelem = 0 + for grp in mesh.groups: + nelem = nelem + grp.nelements + local_header = f"nproc: {nproc}\nrank: {rank}\nnelem: {nelem}\n" + else: + local_header = f"nproc: {nproc}\nrank: {rank}\n" + + from io import StringIO + local_report = StringIO(local_header) + local_report.seek(0, 2) + + for bdtag in boundaries: + boundary_discr = dcoll.discr_from_dd(bdtag) + nnodes = sum([grp.ndofs for grp in boundary_discr.groups]) + local_report.write(f"{bdtag}: {nnodes}\n") + + from meshmode.mesh import BTAG_PARTITION + from meshmode.distributed import get_connected_parts + connected_part_ids = get_connected_parts(dcoll.discr_from_dd(dd).mesh) + local_report.write(f"num_nbr_parts: {len(connected_part_ids)}\n") + local_report.write(f"connected_part_ids: {connected_part_ids}\n") + part_nodes = [] + for connected_part_id in connected_part_ids: + boundary_discr = dcoll.discr_from_dd( + dd.trace(BTAG_PARTITION(connected_part_id))) + nnodes = sum([grp.ndofs for grp in boundary_discr.groups]) + part_nodes.append(nnodes) + if part_nodes: + local_report.write(f"nnodes_pb: {part_nodes}\n") + + local_report.write("-----\n") + local_report.seek(0) + + for irank in range(nproc): + if irank == rank: + f = open(outfile_name, "a+") + f.write(local_report.read()) + f.close() + if comm is not None: + comm.barrier() + + +def force_evaluation(actx, expn): + """Wrap freeze/thaw forcing evaluation of expressions.""" + return actx.thaw(actx.freeze(expn)) + + def get_reasonable_memory_pool(ctx: cl.Context, queue: cl.CommandQueue, force_buffer: bool = False, force_non_pool: bool = False): @@ -519,7 +921,6 @@ def get_reasonable_memory_pool(ctx: cl.Context, queue: cl.CommandQueue, ctx, alignment=0, queue=queue)) else: from warnings import warn - if not has_coarse_grain_buffer_svm(queue.device): warn(f"No SVM support on {queue.device}, returning a CL buffer-based " "memory pool. If you are running with PoCL-cuda, please update " @@ -537,7 +938,10 @@ def configurate(config_key, config_object=None, default_value=None): d = config_object if isinstance(config_object, dict) else\ config_object.__dict__ if config_key in d: - return d[config_key] + value = d[config_key] + if default_value is not None: + return type(default_value)(value) + return value return default_value diff --git a/requirements.txt b/requirements.txt index 2c4213576..72e2ce930 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ mpi4py numpy pytest pytest-cov -pyvisfile pymetis importlib-resources psutil @@ -24,3 +23,4 @@ git+https://github.com/pythological/kanren.git#egg=miniKanren --editable git+https://github.com/pyrometheus/pyrometheus.git@tulio-transport#egg=pyrometheus --editable git+https://github.com/illinois-ceesd/logpyle.git#egg=logpyle --editable git+https://github.com/kaushikcfd/feinsum.git#egg=feinsum +--editable git+https://github.com/inducer/pyvisfile.git#egg=pyvisfile