From 39fc8be040dfe153767c9f2fffb4fb41973d3879 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Tue, 11 Oct 2022 16:21:07 -0700 Subject: [PATCH 01/12] Instrument to get some more details in python init phase. --- examples/combozzle-mpi.py | 49 +++++++++++++++++++++++++++++---------- mirgecom/simutil.py | 17 +++++++++++++- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/examples/combozzle-mpi.py b/examples/combozzle-mpi.py index 0957d9465..f610630cc 100644 --- a/examples/combozzle-mpi.py +++ b/examples/combozzle-mpi.py @@ -96,8 +96,10 @@ def _get_box_mesh(dim, a, b, n, t=None, periodic=None): dim_names = ["x", "y", "z"] bttf = {} for i in range(dim): - bttf["-"+str(i+1)] = ["-"+dim_names[i]] - bttf["+"+str(i+1)] = ["+"+dim_names[i]] + if not periodic[i]: + bttf["-"+str(i+1)] = ["-"+dim_names[i]] + bttf["+"+str(i+1)] = ["+"+dim_names[i]] + from meshmode.mesh.generation import generate_regular_rect_mesh as gen return gen(a=a, b=b, n=n, boundary_tag_to_face=bttf, mesh_type=t, periodic=periodic) @@ -180,7 +182,7 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, comm.Barrier() if rank == 0: - print(f"Main start: {time.ctime(time.time())}") + print(f"Hello: {time.ctime(time.time())}") comm.Barrier() from mirgecom.simutil import global_reduce as _global_reduce @@ -225,7 +227,7 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, nhealth = 100 nrestart = 1000 do_checkpoint = 0 - boundary_report = 0 + boundary_report = 1 do_callbacks = 0 # }}} Time stepping control @@ -597,7 +599,7 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, comm.Barrier() if rank == 0: - print(f"ACTX setup start: {time.ctime(time.time())}") + print(f"Queue/ACTX setup start: {time.ctime(time.time())}") comm.Barrier() if use_profiling: @@ -614,6 +616,11 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, else: actx = actx_class(comm, queue, allocator=alloc, force_device_scalars=True) + comm.Barrier() + if rank == 0: + print(f"Queue/ACTX setup done: {time.ctime(time.time())}") + comm.Barrier() + rst_path = "restart_data/" rst_pattern = ( rst_path + "{cname}-{step:04d}-{rank:04d}.pkl" @@ -638,6 +645,11 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, generate_mesh) local_nelements = local_mesh.nelements + comm.Barrier() + if rank == 0: + print(f"Mesh setup done: {time.ctime(time.time())}") + comm.Barrier() + print(f"{rank=},{dim=},{order=},{local_nelements=},{global_nelements=}") if grid_only: return 0 @@ -648,7 +660,7 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, comm.Barrier() if rank == 0: - print(f"ACTX Setup end -> Solution init start: {time.ctime(time.time())}") + print(f"Discretization Setup end: {time.ctime(time.time())}") comm.Barrier() def _compiled_stepper_wrapper(state, t, dt, rhs): @@ -733,6 +745,11 @@ def vol_max(x): ("min_temperature", "------- T (min, max) (K) = ({value:7g}, "), ("max_temperature", "{value:7g})\n")]) + comm.Barrier() + if rank == 0: + print(f"Solution init start: {time.ctime(time.time())}") + comm.Barrier() + if single_gas_only: nspecies = 0 init_y = 0 @@ -959,6 +976,11 @@ def _sponge(cv): f" {eq_pressure=}, {eq_temperature=}," f" {eq_density=}, {eq_mass_fractions=}") + comm.Barrier() + if rank == 0: + print(f"Solution init done: {time.ctime(time.time())}") + comm.Barrier() + def my_write_status(dt, cfl, dv=None): status_msg = f"------ {dt=}" if constant_cfl else f"----- {cfl=}" if ((dv is not None) and (not log_dependent)): @@ -1241,6 +1263,10 @@ def dummy_rhs(t, state): comm.Barrier() + if rank == 0: + print(f"Simulation end time: {time.ctime(time.time())}") + + finish_tol = 1e-16 assert np.abs(current_t - t_final) < finish_tol @@ -1251,10 +1277,9 @@ def dummy_rhs(t, state): logger.info("Fluid solution failed health check.") raise MyRuntimeError("Failed simulation health check.") - if rank == 0: - print(f"Simulation end time: {time.ctime(time.time())}") - comm.Barrier() + if rank == 0: + print(f"Goodbye: {time.ctime(time.time())}") if __name__ == "__main__": @@ -1278,8 +1303,8 @@ def dummy_rhs(t, state): parser.add_argument("--restart_file", help="root name of restart file") parser.add_argument("--casename", help="casename to use for i/o") args = parser.parse_args() - from warnings import warn - warn("Automatically turning off DV logging. MIRGE-Com Issue(578)") + # from warnings import warn + # warn("Automatically turning off DV logging. MIRGE-Com Issue(578)") lazy = args.lazy log_dependent = False force_eval = not args.no_force @@ -1304,7 +1329,7 @@ def dummy_rhs(t, state): else: print("No user input file, using default values") - print(f"Calling main: {time.ctime(time.time())}") + # print(f"Calling main: {time.ctime(time.time())}") main(use_logmgr=args.log, use_leap=args.leap, input_file=input_file, use_overintegration=args.overintegration, diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 024ef75ba..7977d2a60 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -457,6 +457,7 @@ def generate_and_distribute_mesh(comm, generate_mesh): global_nelements : :class:`int` The number of elements in the serial mesh """ + import time from warnings import warn warn( "generate_and_distribute_mesh is deprecated and will go away Q4 2022. " @@ -500,16 +501,19 @@ def distribute_mesh(comm, get_mesh_data, partition_generator_func=None): global_nelements: :class:`int` The number of elements in the global mesh """ + import time from meshmode.distributed import mpi_distribute num_ranks = comm.Get_size() + rank = comm.Get_rank() if partition_generator_func is None: def partition_generator_func(mesh, tag_to_elements, num_ranks): from meshmode.distributed import get_partition_by_pymetis return get_partition_by_pymetis(mesh, num_ranks) - if comm.Get_rank() == 0: + if rank == 0: + print(f"Mesh generation begin: {time.ctime(time.time())}") global_data = get_mesh_data() from meshmode.mesh import Mesh @@ -522,16 +526,21 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): else: raise TypeError("Unexpected result from get_mesh_data") + print(f"Mesh generation end: {time.ctime(time.time())}") + print(f"Mesh partition begin: {time.ctime(time.time())}") from meshmode.mesh.processing import partition_mesh rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) + print(f"Mesh partition_generator_func done: {time.ctime(time.time())}") if tag_to_elements is None: rank_to_elements = { rank: np.where(rank_per_element == rank)[0] for rank in range(num_ranks)} + print(f"Mesh partition e-conn inverted: {time.ctime(time.time())}") rank_to_mesh_data = partition_mesh(mesh, rank_to_elements) + print(f"Mesh partition_mesh done: {time.ctime(time.time())}") else: tag_to_volume = { @@ -561,6 +570,7 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): part_id_to_part_index = { part_id: part_index for part_index, part_id in enumerate(part_id_to_elements.keys())} + from meshmode.mesh.processing import _compute_global_elem_to_part_elem global_elem_to_part_elem = _compute_global_elem_to_part_elem( mesh.nelements, part_id_to_elements, part_id_to_part_index, @@ -589,6 +599,8 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): for vol in volumes} for rank in range(num_ranks)} + print(f"Mesh partitioning done: {time.ctime(time.time())}") + local_mesh_data = mpi_distribute( comm, source_rank=0, source_data=rank_to_mesh_data) @@ -599,6 +611,9 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): global_nelements = comm.bcast(None, root=0) + comm.Barrier() + if rank == 0: + print(f"Mesh distribution done: {time.ctime(time.time())}") return local_mesh_data, global_nelements From bd3cee6c8e9e49919becb2d8609fa46924449118 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Wed, 12 Oct 2022 07:13:41 -0500 Subject: [PATCH 02/12] tweak instrumentation a bit --- mirgecom/simutil.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 7977d2a60..ae455247d 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -457,7 +457,6 @@ def generate_and_distribute_mesh(comm, generate_mesh): global_nelements : :class:`int` The number of elements in the serial mesh """ - import time from warnings import warn warn( "generate_and_distribute_mesh is deprecated and will go away Q4 2022. " @@ -513,7 +512,7 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): return get_partition_by_pymetis(mesh, num_ranks) if rank == 0: - print(f"Mesh generation begin: {time.ctime(time.time())}") + print(f"distribute_mesh:generate begin: {time.ctime(time.time())}") global_data = get_mesh_data() from meshmode.mesh import Mesh @@ -526,21 +525,23 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): else: raise TypeError("Unexpected result from get_mesh_data") - print(f"Mesh generation end: {time.ctime(time.time())}") - print(f"Mesh partition begin: {time.ctime(time.time())}") + print(f"distribute_mesh partitioning begin: {time.ctime(time.time())}") from meshmode.mesh.processing import partition_mesh - rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) - print(f"Mesh partition_generator_func done: {time.ctime(time.time())}") + print(f"distribute_mesh partitioning done: {time.ctime(time.time())}") + print(f"distribute_mesh: mesh data struct start: {time.ctime(time.time())}") + + from pyinstrument import Profiler + + profiler = Profiler() + profiler.start() if tag_to_elements is None: rank_to_elements = { rank: np.where(rank_per_element == rank)[0] for rank in range(num_ranks)} - print(f"Mesh partition e-conn inverted: {time.ctime(time.time())}") rank_to_mesh_data = partition_mesh(mesh, rank_to_elements) - print(f"Mesh partition_mesh done: {time.ctime(time.time())}") else: tag_to_volume = { @@ -599,7 +600,10 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): for vol in volumes} for rank in range(num_ranks)} - print(f"Mesh partitioning done: {time.ctime(time.time())}") + profiler.stop() + profiler.print() + + print(f"distribute_mesh: mesh data struct done: {time.ctime(time.time())}") local_mesh_data = mpi_distribute( comm, source_rank=0, source_data=rank_to_mesh_data) @@ -613,7 +617,7 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): comm.Barrier() if rank == 0: - print(f"Mesh distribution done: {time.ctime(time.time())}") + print(f"distrubute_mesh distribution done: {time.ctime(time.time())}") return local_mesh_data, global_nelements From df3f0aee2daa7f224879703fbbde7ce613ee6c34 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Wed, 12 Oct 2022 10:30:02 -0500 Subject: [PATCH 03/12] Replace nozzle production test with y2-prediction --- .ci-support/production-drivers-install.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci-support/production-drivers-install.sh b/.ci-support/production-drivers-install.sh index dbc2a5ec6..4ffdf5b03 100755 --- a/.ci-support/production-drivers-install.sh +++ b/.ci-support/production-drivers-install.sh @@ -12,7 +12,7 @@ # The default values result in an install of the Y1 nozzle driver and # Wyatt Hagen's isolator driver that work with current MIRGE-Com # production branch: mirgecom@y1-production. -PRODUCTION_DRIVERS=${PRODUCTION_DRIVERS:-"illinois-ceesd/drivers_y1-nozzle@main:illinois-ceesd/drivers_y2-isolator@main:illinois-ceesd/drivers_flame1d@main"} +PRODUCTION_DRIVERS=${PRODUCTION_DRIVERS:-"illinois-ceesd/drivers_y2-prediction@main:illinois-ceesd/drivers_y2-isolator@main:illinois-ceesd/drivers_flame1d@main"} # Loop over the production drivers, clone them, and prepare for execution set -x OIFS="$IFS" From 762cdc662abdea6b55688869e2d5ad5cf56ca413 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Wed, 12 Oct 2022 15:44:23 -0500 Subject: [PATCH 04/12] Add prediction driver to test suite --- .ci-support/production-drivers-install.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.ci-support/production-drivers-install.sh b/.ci-support/production-drivers-install.sh index 4ffdf5b03..3801fe4f4 100755 --- a/.ci-support/production-drivers-install.sh +++ b/.ci-support/production-drivers-install.sh @@ -24,7 +24,11 @@ do PRODUCTION_DRIVER_DIR="production_driver_$PRODUCTION_DRIVER_NAME" git clone -b "$PRODUCTION_DRIVER_BRANCH" https\://github.com/"$PRODUCTION_DRIVER_REPO" "$PRODUCTION_DRIVER_DIR" cd "$PRODUCTION_DRIVER_DIR"/smoke_test - ln -s *.py driver.py # name the driver generically + if [ -f prediction.py ]; then + ln -s prediction.py driver.py + else + ln -s *.py driver.py # name the driver generically + fi cd ../.. done IFS="$OIFS" From 52ec05c519a216667de5f93def9fced945a65d96 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Wed, 12 Oct 2022 15:45:16 -0500 Subject: [PATCH 05/12] Add geometric mesh partitioning (x-only atm). --- mirgecom/simutil.py | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index ae455247d..8f1e25c31 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -464,6 +464,33 @@ def generate_and_distribute_mesh(comm, generate_mesh): return distribute_mesh(comm, generate_mesh) +def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, + nranks_per_axis=None): + """Partition a mesh uniformly along the major coordinate axes.""" + mesh_dimension = mesh.dim + if nranks_per_axis is None: + nranks_per_axis = np.ones(mesh_dimension) + nranks_per_axis[0] = num_ranks + if len(nranks_per_axis) != mesh_dimension: + raise ValueError("nranks_per_axis must match mesh dimension.") + nranks_test = 1 + for i in range(mesh_dimension): + nranks_test = nranks_test * nranks_per_axis[i] + if nranks_test != num_ranks: + raise ValueError("nranks_per_axis must match num_ranks.") + + mesh_groups, = mesh.groups + mesh_verts = mesh.vertices + mesh_x = mesh_verts[0] + x_min = np.min(mesh_x) + x_max = np.max(mesh_x) + x_interval = x_max - x_min + part_dx = x_interval / nranks_per_axis[0] + elem_x = mesh_verts[0, mesh_groups.vertex_indices] + elem_centroids = np.sum(elem_x, axis=1)/elem_x.shape[1] + return ((elem_centroids-x_min) / part_dx).astype(int) + + def distribute_mesh(comm, get_mesh_data, partition_generator_func=None): r"""Distribute a mesh among all ranks in *comm*. @@ -527,7 +554,9 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): print(f"distribute_mesh partitioning begin: {time.ctime(time.time())}") from meshmode.mesh.processing import partition_mesh - rank_per_element = partition_generator_func(mesh, tag_to_elements, num_ranks) + rank_per_element = \ + partition_generator_func(mesh, tag_to_elements=tag_to_elements, + num_ranks=num_ranks) print(f"distribute_mesh partitioning done: {time.ctime(time.time())}") print(f"distribute_mesh: mesh data struct start: {time.ctime(time.time())}") @@ -621,7 +650,8 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): return local_mesh_data, global_nelements -def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL): +def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL, + mesh=None): """Generate a report of the grid boundaries.""" boundaries = normalize_boundaries(boundaries) @@ -632,7 +662,14 @@ def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL): nproc = comm.Get_size() rank = comm.Get_rank() - local_header = f"nproc: {nproc}\nrank: {rank}\n" + if mesh is not None: + nelem = 0 + for grp in mesh.groups: + nelem = nelem + grp.nelements + local_header = f"nproc: {nproc}\nrank: {rank}\nnelem: {nelem}" + else: + local_header = f"nproc: {nproc}\nrank: {rank}" + from io import StringIO local_report = StringIO(local_header) local_report.seek(0, 2) From cc66edc2aceb1df2f9b40daae03d8956b9107137 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Thu, 13 Oct 2022 13:49:26 -0500 Subject: [PATCH 06/12] Add automatic balancing in partitioning func. --- mirgecom/simutil.py | 124 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 3 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 8f1e25c31..8885f8778 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -465,7 +465,7 @@ def generate_and_distribute_mesh(comm, generate_mesh): def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, - nranks_per_axis=None): + nranks_per_axis=None, auto_balance=False): """Partition a mesh uniformly along the major coordinate axes.""" mesh_dimension = mesh.dim if nranks_per_axis is None: @@ -482,13 +482,131 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, mesh_groups, = mesh.groups mesh_verts = mesh.vertices mesh_x = mesh_verts[0] + x_min = np.min(mesh_x) x_max = np.max(mesh_x) x_interval = x_max - x_min - part_dx = x_interval / nranks_per_axis[0] + part_loc = np.linspace(x_min, x_max, num_ranks+1) + + part_interval = x_interval / nranks_per_axis[0] elem_x = mesh_verts[0, mesh_groups.vertex_indices] elem_centroids = np.sum(elem_x, axis=1)/elem_x.shape[1] - return ((elem_centroids-x_min) / part_dx).astype(int) + global_nelements = len(elem_centroids) + aver_part_nelem = global_nelements / num_ranks + + elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) + + if auto_balance: + + part_to_elements = {r: set((np.where(elem_to_rank == r))[0].flat) + for r in range(num_ranks)} + nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] + adv_part = 1 + + for r in range(num_ranks-1): + + adv_part = max(int((part_loc[r+1]-x_min) / part_interval), 1) + niter = 0 + num_elem_needed = aver_part_nelem - nelem_part[r] + part_imbalance = np.abs(num_elem_needed) / float(aver_part_nelem) + # while nelem_part[adv_part] == 0: + # adv_part = adv_part + 1 + + print(f"Processing part({r=})") + print(f"{part_loc[r]=}") + print(f"{adv_part=}") + + while ((part_imbalance > .01) and (adv_part < num_ranks)): + + print(f"-{nelem_part[r]=}") + print(f"-{part_loc[r+1]=},{part_loc[adv_part+1]=}") + print(f"-{num_elem_needed=},{part_imbalance=}") + print(f"-{nelem_part[adv_part]=}") + + if niter > 10: + raise ValueError("Detected too many iterations in partitioning.") + + if num_elem_needed > 0: + + # Partition is SMALLER than it should be + nelem_bag = nelem_part[adv_part] + print(f"-{nelem_bag=}") + + portion_needed = (float(num_elem_needed) + / float(nelem_part[adv_part])) + + print(f"--Chomping {portion_needed*100}% of next" + f" partition({adv_part}).") + + if portion_needed == 1.0: # Chomp + part_loc[r+1] = part_loc[r+2] + # add all elements of next part to this part + num_elems_added = nelem_part[adv_part] + part_to_elements[r].update(part_to_elements[adv_part]) + # update elem_to_partition for moved elements + for e in part_to_elements[adv_part]: + elem_to_rank[e] = r + nelem_part[r] = nelem_part[r] + num_elems_added + part_to_elements[adv_part].difference_update( + part_to_elements[adv_part]) + # next_pos = sliding_pos + part_interval + nelem_part[adv_part] = 0 + adv_part = adv_part + 1 + print(f"--{adv_part=}") + # next_rank = next_rank + 1 + + else: # Bite + if portion_needed > .05: + portion_needed = .9*portion_needed + sliding_interval = part_loc[adv_part+1] - part_loc[r+1] + pos_update = portion_needed*sliding_interval + print(f"--Advancing part({r}) by +{pos_update}") + part_loc[r+1] = part_loc[r+1] + pos_update + # loop over next part's elements and move those + # that fall into the new part's bounding box + moved_elements = set() + for e in part_to_elements[adv_part]: + if elem_centroids[e] <= part_loc[r+1]: + moved_elements.add(e) + part_to_elements[r].update(moved_elements) + part_to_elements[adv_part].difference_update(moved_elements) + # update elem_to_partition for moved elements + for e in moved_elements: + elem_to_rank[e] = r + num_elements_added = len(moved_elements) + + print(f"--Number of elements added: {num_elements_added}") + + else: + + # Partition is LARGER than it should be + portion_needed = abs(num_elem_needed)/float(nelem_part[r]) + if portion_needed > .05: + portion_needed = .9*portion_needed + print(f"--Reducing partition size by {portion_needed*100}%.") + # Move the partition location + sliding_interval = part_loc[r+1] - part_loc[r] + pos_update = portion_needed*sliding_interval + part_loc[r+1] = part_loc[r+1] - pos_update + # Move *this* part's elements if they fell out + moved_elements = set() + for e in part_to_elements[r]: + if elem_centroids[e] > part_loc[r+1]: + moved_elements.add(e) + elem_to_rank[e] = r+1 + part_to_elements[r].difference_update(moved_elements) + part_to_elements[adv_part].update(moved_elements) + num_elements_added = -len(moved_elements) + + nelem_part[r] = nelem_part[r] + num_elements_added + nelem_part[adv_part] = nelem_part[adv_part] - num_elements_added + num_elem_needed = num_elem_needed - num_elements_added + part_imbalance = \ + np.abs(num_elem_needed) / float(aver_part_nelem) + niter = niter + 1 + + print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") + return elem_to_rank def distribute_mesh(comm, get_mesh_data, partition_generator_func=None): From e522368d7dc3fd25d579f54a03b80e783fedd741 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Thu, 13 Oct 2022 15:56:47 -0500 Subject: [PATCH 07/12] Add a little fine tuning for partitioning --- mirgecom/simutil.py | 83 +++++++++++++++++++++++++++++++++------------ 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 8885f8778..be24a1934 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -494,6 +494,11 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, global_nelements = len(elem_centroids) aver_part_nelem = global_nelements / num_ranks + print(f"Partitioning {global_nelements} elements in" + f" [{x_min},{x_max}]/{num_ranks}") + print(f"Average nelem/part: {aver_part_nelem}") + print(f"Initial part locs: {part_loc=}") + elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) if auto_balance: @@ -509,8 +514,6 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, niter = 0 num_elem_needed = aver_part_nelem - nelem_part[r] part_imbalance = np.abs(num_elem_needed) / float(aver_part_nelem) - # while nelem_part[adv_part] == 0: - # adv_part = adv_part + 1 print(f"Processing part({r=})") print(f"{part_loc[r]=}") @@ -523,7 +526,7 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, print(f"-{num_elem_needed=},{part_imbalance=}") print(f"-{nelem_part[adv_part]=}") - if niter > 10: + if niter > 100: raise ValueError("Detected too many iterations in partitioning.") if num_elem_needed > 0: @@ -532,7 +535,7 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, nelem_bag = nelem_part[adv_part] print(f"-{nelem_bag=}") - portion_needed = (float(num_elem_needed) + portion_needed = (float(abs(num_elem_needed)) / float(nelem_part[adv_part])) print(f"--Chomping {portion_needed*100}% of next" @@ -556,18 +559,35 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, # next_rank = next_rank + 1 else: # Bite - if portion_needed > .05: - portion_needed = .9*portion_needed sliding_interval = part_loc[adv_part+1] - part_loc[r+1] - pos_update = portion_needed*sliding_interval + fine_tuned = False + while not fine_tuned: + pos_update = portion_needed*sliding_interval + new_loc = part_loc[r+1] + pos_update + moved_elements = set() + num_elem_mv = 0 + for e in part_to_elements[adv_part]: + if elem_centroids[e] <= new_loc: + moved_elements.add(e) + num_elem_mv = num_elem_mv + 1 + if num_elem_mv < num_elem_needed: + fine_tuned = True + else: + ovrsht = (num_elem_mv - num_elem_needed) + rel_ovrsht = ovrsht/float(num_elem_needed) + if rel_ovrsht > 1.0: + portion_needed = portion_needed/2.0 + else: + fine_tuned = True + print(f"--Tuned: {portion_needed=}") print(f"--Advancing part({r}) by +{pos_update}") part_loc[r+1] = part_loc[r+1] + pos_update # loop over next part's elements and move those # that fall into the new part's bounding box - moved_elements = set() - for e in part_to_elements[adv_part]: - if elem_centroids[e] <= part_loc[r+1]: - moved_elements.add(e) + # moved_elements = set() + # for e in part_to_elements[adv_part]: + # if elem_centroids[e] <= part_loc[r+1]: + # moved_elements.add(e) part_to_elements[r].update(moved_elements) part_to_elements[adv_part].difference_update(moved_elements) # update elem_to_partition for moved elements @@ -580,20 +600,39 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, else: # Partition is LARGER than it should be - portion_needed = abs(num_elem_needed)/float(nelem_part[r]) - if portion_needed > .05: - portion_needed = .9*portion_needed + sliding_interval = part_loc[r+1] - part_loc[r] + num_to_move = -num_elem_needed + portion_needed = num_to_move/float(nelem_part[r]) + + fine_tuned = False + while not fine_tuned: + # if portion_needed > .05: + # portion_needed = .9*portion_needed + pos_update = portion_needed*sliding_interval + new_pos = part_loc[r+1] - pos_update + moved_elements = set() + num_elem_mv = 0 + for e in part_to_elements[r]: + if elem_centroids[e] > new_pos: + moved_elements.add(e) + num_elem_mv = num_elem_mv + 1 + if num_elem_mv < num_to_move: + fine_tuned = True + else: + ovrsht = (num_elem_mv - num_to_move) + rel_ovrsht = ovrsht/float(num_to_move) + if rel_ovrsht > 1.0: + portion_needed = portion_needed/2.0 + else: + fine_tuned = True + print(f"--Reducing partition size by {portion_needed*100}%.") # Move the partition location - sliding_interval = part_loc[r+1] - part_loc[r] - pos_update = portion_needed*sliding_interval - part_loc[r+1] = part_loc[r+1] - pos_update + part_loc[r+1] = new_pos # Move *this* part's elements if they fell out - moved_elements = set() - for e in part_to_elements[r]: - if elem_centroids[e] > part_loc[r+1]: - moved_elements.add(e) - elem_to_rank[e] = r+1 + for e in moved_elements: + elem_to_rank[e] = r+1 + part_to_elements[r].difference_update(moved_elements) part_to_elements[adv_part].update(moved_elements) num_elements_added = -len(moved_elements) From c8de6d7c2476406ca1e4470e610f320693565db8 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Fri, 14 Oct 2022 11:20:03 -0700 Subject: [PATCH 08/12] Tweak partitioning util. --- mirgecom/simutil.py | 206 ++++++++++++++++++++++++++++---------------- 1 file changed, 134 insertions(+), 72 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index be24a1934..ebd2ff0ef 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -465,7 +465,8 @@ def generate_and_distribute_mesh(comm, generate_mesh): def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, - nranks_per_axis=None, auto_balance=False): + nranks_per_axis=None, auto_balance=False, + imbalance_tolerance=.01, debug=False): """Partition a mesh uniformly along the major coordinate axes.""" mesh_dimension = mesh.dim if nranks_per_axis is None: @@ -493,12 +494,14 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, elem_centroids = np.sum(elem_x, axis=1)/elem_x.shape[1] global_nelements = len(elem_centroids) aver_part_nelem = global_nelements / num_ranks - - print(f"Partitioning {global_nelements} elements in" - f" [{x_min},{x_max}]/{num_ranks}") - print(f"Average nelem/part: {aver_part_nelem}") - print(f"Initial part locs: {part_loc=}") - + debug = True + + if debug: + print(f"Partitioning {global_nelements} elements in" + f" [{x_min},{x_max}]/{num_ranks}") + print(f"Average nelem/part: {aver_part_nelem}") + print(f"Initial part locs: {part_loc=}") + elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) if auto_balance: @@ -508,61 +511,84 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] adv_part = 1 + if debug: + print(f"Initial: {nelem_part=}") + for r in range(num_ranks-1): + + # find the element reservoir (next part with elements in it) + adv_part = r + 1 + while nelem_part[adv_part] == 0: + adv_part = adv_part + 1 - adv_part = max(int((part_loc[r+1]-x_min) / part_interval), 1) - niter = 0 num_elem_needed = aver_part_nelem - nelem_part[r] part_imbalance = np.abs(num_elem_needed) / float(aver_part_nelem) - print(f"Processing part({r=})") - print(f"{part_loc[r]=}") - print(f"{adv_part=}") - - while ((part_imbalance > .01) and (adv_part < num_ranks)): + if debug: + print(f"Processing part({r=})") + print(f"{part_loc[r]=}, {adv_part=}") + print(f"{num_elem_needed=}, {part_imbalance=}") + print(f"{nelem_part=}") - print(f"-{nelem_part[r]=}") - print(f"-{part_loc[r+1]=},{part_loc[adv_part+1]=}") - print(f"-{num_elem_needed=},{part_imbalance=}") - print(f"-{nelem_part[adv_part]=}") + niter = 0 + total_change = 0 + moved_elements = set() + + while ((part_imbalance > imbalance_tolerance) and (adv_part < num_ranks)): + # This partition needs to keep changing in size until it meets the + # specified imbalance tolerance, or gives up trying + + while nelem_part[adv_part] == 0: + adv_part = adv_part + 1 + if adv_part >= num_ranks: + raise ValueError("Ran out of elements to partition.") + if debug: + print(f"-{nelem_part[r]=}, adv_part({adv_part})," + f" {nelem_part[adv_part]=}") + print(f"-{part_loc[r+1]=},{part_loc[adv_part+1]=}") + print(f"-{num_elem_needed=},{part_imbalance=}") if niter > 100: raise ValueError("Detected too many iterations in partitioning.") + # The purpose of the next block is to populate the "moved_elements" + # data structure. Then those elements will be moved between the + # current partition being processed and the "reservoir," + # *and* to adjust the position of the "right" side of the current + # partition boundary. + moved_elements = set() + num_elements_added = 0 + if num_elem_needed > 0: - # Partition is SMALLER than it should be - nelem_bag = nelem_part[adv_part] - print(f"-{nelem_bag=}") + # Partition is SMALLER than it should be, grab elements from + # the reservoir + if debug: + print(f"-Grabbing elements from reservoir({adv_part})" + f", {nelem_part[adv_part]=}") portion_needed = (float(abs(num_elem_needed)) / float(nelem_part[adv_part])) + portion_needed = min(portion_needed, 1.0) - print(f"--Chomping {portion_needed*100}% of next" - f" partition({adv_part}).") + if debug: + print(f"--Chomping {portion_needed*100}% of" + f" reservoir({adv_part}) [by nelem].") if portion_needed == 1.0: # Chomp - part_loc[r+1] = part_loc[r+2] - # add all elements of next part to this part - num_elems_added = nelem_part[adv_part] - part_to_elements[r].update(part_to_elements[adv_part]) - # update elem_to_partition for moved elements - for e in part_to_elements[adv_part]: - elem_to_rank[e] = r - nelem_part[r] = nelem_part[r] + num_elems_added - part_to_elements[adv_part].difference_update( - part_to_elements[adv_part]) - # next_pos = sliding_pos + part_interval - nelem_part[adv_part] = 0 - adv_part = adv_part + 1 - print(f"--{adv_part=}") - # next_rank = next_rank + 1 + new_loc = part_loc[adv_part+1] + moved_elements.update(part_to_elements[adv_part]) else: # Bite - sliding_interval = part_loc[adv_part+1] - part_loc[r+1] + # This is the spatial size of the reservoir + reserv_interval = part_loc[adv_part+1] - part_loc[r+1] + + # Find what portion of the reservoir to grab spatially + # This part is needed because the elements are not + # distributed uniformly in space. fine_tuned = False while not fine_tuned: - pos_update = portion_needed*sliding_interval + pos_update = portion_needed*reserv_interval new_loc = part_loc[r+1] + pos_update moved_elements = set() num_elem_mv = 0 @@ -575,40 +601,39 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, else: ovrsht = (num_elem_mv - num_elem_needed) rel_ovrsht = ovrsht/float(num_elem_needed) - if rel_ovrsht > 1.0: + if rel_ovrsht > 0.8: + # bisect the space grabbed and try again portion_needed = portion_needed/2.0 else: fine_tuned = True - print(f"--Tuned: {portion_needed=}") - print(f"--Advancing part({r}) by +{pos_update}") - part_loc[r+1] = part_loc[r+1] + pos_update - # loop over next part's elements and move those - # that fall into the new part's bounding box - # moved_elements = set() - # for e in part_to_elements[adv_part]: - # if elem_centroids[e] <= part_loc[r+1]: - # moved_elements.add(e) - part_to_elements[r].update(moved_elements) - part_to_elements[adv_part].difference_update(moved_elements) - # update elem_to_partition for moved elements - for e in moved_elements: - elem_to_rank[e] = r - num_elements_added = len(moved_elements) - - print(f"--Number of elements added: {num_elements_added}") + + new_loc = part_loc[r+1] + pos_update + if debug: + print(f"--Tuned: {portion_needed=} [by spatial volume]") + print(f"--Advancing part({r}) by +{pos_update}") + + num_elements_added = len(moved_elements) + if debug: + print(f"--Adding {num_elements_added} to part({r}).") else: # Partition is LARGER than it should be - sliding_interval = part_loc[r+1] - part_loc[r] + # Grab the spatial size of the current partition + # to compute the portion we need to shave off + part_interval = part_loc[r+1] - part_loc[r] num_to_move = -num_elem_needed portion_needed = num_to_move/float(nelem_part[r]) + if debug: + print(f"--Shaving off {portion_needed*100}% of" + f" partition({r}) [by nelem].") + + # Tune the shaved portion to account for + # non-uniform element density fine_tuned = False while not fine_tuned: - # if portion_needed > .05: - # portion_needed = .9*portion_needed - pos_update = portion_needed*sliding_interval + pos_update = portion_needed*part_interval new_pos = part_loc[r+1] - pos_update moved_elements = set() num_elem_mv = 0 @@ -621,30 +646,67 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, else: ovrsht = (num_elem_mv - num_to_move) rel_ovrsht = ovrsht/float(num_to_move) - if rel_ovrsht > 1.0: + if rel_ovrsht > 0.8: + # bisect and try again portion_needed = portion_needed/2.0 else: fine_tuned = True - - print(f"--Reducing partition size by {portion_needed*100}%.") - # Move the partition location - part_loc[r+1] = new_pos - # Move *this* part's elements if they fell out + + new_loc = new_pos + num_elements_added = -len(moved_elements) + if debug: + print(f"--Reducing partition size by {portion_needed*100}%" + " [by nelem].") + print(f"--Removing {-num_elements_added} from part({r}).") + + # Now "moved_elements", "num_elements_added", and "new_loc" + # are computed. Update the partition, reservoir. + if debug: + print(f"--Number of elements to ADD: {num_elements_added}.") + + if num_elements_added > 0: + part_to_elements[r].update(moved_elements) + part_to_elements[adv_part].difference_update( + moved_elements) for e in moved_elements: - elem_to_rank[e] = r+1 - + elem_to_rank[e] = r + else: part_to_elements[r].difference_update(moved_elements) part_to_elements[adv_part].update(moved_elements) - num_elements_added = -len(moved_elements) + for e in moved_elements: + elem_to_rank[e] = adv_part + total_change = total_change + num_elements_added + part_loc[r+1] = new_loc + if debug: + print(f"--Before: {nelem_part=}") nelem_part[r] = nelem_part[r] + num_elements_added nelem_part[adv_part] = nelem_part[adv_part] - num_elements_added + if debug: + print(f"--After: {nelem_part=}") + + # Compute new nelem_needed and part_imbalance num_elem_needed = num_elem_needed - num_elements_added part_imbalance = \ np.abs(num_elem_needed) / float(aver_part_nelem) niter = niter + 1 - print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") + # Summarize the total change and state of the partition + # and reservoir + if debug: + print(f"-Part({r}): {total_change=}") + print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") + print(f"-Part({adv_part}): {nelem_part[adv_part]=}") + + # Validate the partitioning before returning + total_partitioned_elements = sum([len(part_to_elements[r]) + for r in range(num_ranks)]) + total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) + if total_partitioned_elements != total_nelem_part: + raise ValueError("Element counts dont match") + if total_partitioned_elements != global_nelements: + raise ValueError("Total elements dont match.") + return elem_to_rank From 350fba6688df64a4b78464a689dc77722980d717 Mon Sep 17 00:00:00 2001 From: Michael Campbell Date: Mon, 17 Oct 2022 13:02:06 -0500 Subject: [PATCH 09/12] Clean up, better comments --- mirgecom/simutil.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index ebd2ff0ef..ab396dca2 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -494,28 +494,31 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, elem_centroids = np.sum(elem_x, axis=1)/elem_x.shape[1] global_nelements = len(elem_centroids) aver_part_nelem = global_nelements / num_ranks - debug = True if debug: print(f"Partitioning {global_nelements} elements in" f" [{x_min},{x_max}]/{num_ranks}") print(f"Average nelem/part: {aver_part_nelem}") print(f"Initial part locs: {part_loc=}") - + + # Create geometrically even partitions elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) + # Automatic load-balancing if auto_balance: + # maps partition id to list of elements in that partition part_to_elements = {r: set((np.where(elem_to_rank == r))[0].flat) for r in range(num_ranks)} + # make an array of the geometrically even partition sizes + # avoids calling "len" over and over on the element sets nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] - adv_part = 1 if debug: print(f"Initial: {nelem_part=}") for r in range(num_ranks-1): - + # find the element reservoir (next part with elements in it) adv_part = r + 1 while nelem_part[adv_part] == 0: @@ -534,14 +537,17 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, total_change = 0 moved_elements = set() - while ((part_imbalance > imbalance_tolerance) and (adv_part < num_ranks)): - # This partition needs to keep changing in size until it meets the + while ((part_imbalance > imbalance_tolerance) + and (adv_part < num_ranks)): + # This partition needs to keep changing in size until it meets the # specified imbalance tolerance, or gives up trying + # seek out the element reservoir while nelem_part[adv_part] == 0: adv_part = adv_part + 1 if adv_part >= num_ranks: raise ValueError("Ran out of elements to partition.") + if debug: print(f"-{nelem_part[r]=}, adv_part({adv_part})," f" {nelem_part[adv_part]=}") @@ -552,7 +558,7 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, raise ValueError("Detected too many iterations in partitioning.") # The purpose of the next block is to populate the "moved_elements" - # data structure. Then those elements will be moved between the + # data structure. Then those elements will be moved between the # current partition being processed and the "reservoir," # *and* to adjust the position of the "right" side of the current # partition boundary. @@ -620,7 +626,8 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, # Partition is LARGER than it should be # Grab the spatial size of the current partition - # to compute the portion we need to shave off + # to estimate the portion we need to shave off + # assuming uniform element density part_interval = part_loc[r+1] - part_loc[r] num_to_move = -num_elem_needed portion_needed = num_to_move/float(nelem_part[r]) @@ -651,7 +658,9 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, portion_needed = portion_needed/2.0 else: fine_tuned = True - + + # new "right" wall location of shranken part + # and negative num_elements_added for removal new_loc = new_pos num_elements_added = -len(moved_elements) if debug: @@ -660,7 +669,7 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, print(f"--Removing {-num_elements_added} from part({r}).") # Now "moved_elements", "num_elements_added", and "new_loc" - # are computed. Update the partition, reservoir. + # are computed. Update the partition, and reservoir. if debug: print(f"--Number of elements to ADD: {num_elements_added}.") @@ -697,7 +706,7 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, print(f"-Part({r}): {total_change=}") print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") print(f"-Part({adv_part}): {nelem_part[adv_part]=}") - + # Validate the partitioning before returning total_partitioned_elements = sum([len(part_to_elements[r]) for r in range(num_ranks)]) From 1b683aeb540ad59874dcd8cdf39bb833e24378d6 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Tue, 25 Oct 2022 14:34:27 -0700 Subject: [PATCH 10/12] My changes --- examples/combozzle-mpi.py | 119 ++++++++++++++++++ examples/vortex-mpi.py | 7 ++ mirgecom/diffusion.py | 3 +- .../thermally_coupled_fluid_wall.py | 30 ++++- mirgecom/navierstokes.py | 6 +- mirgecom/simutil.py | 16 ++- 6 files changed, 170 insertions(+), 11 deletions(-) diff --git a/examples/combozzle-mpi.py b/examples/combozzle-mpi.py index f610630cc..9ff160a4b 100644 --- a/examples/combozzle-mpi.py +++ b/examples/combozzle-mpi.py @@ -105,6 +105,121 @@ def _get_box_mesh(dim, a, b, n, t=None, periodic=None): periodic=periodic) +def get_elements_in_box(box_a, box_b, mesh, elements=None): + + mesh_groups, = mesh.groups + mesh_verts = mesh.vertices + + mesh_verts = mesh.vertices + elem_verts = mesh_verts[:, mesh_groups.vertex_indices] + elem_centroids = np.sum(elem_verts, axis=2) / elem_verts.shape[2] + num_elements = elem_centroids.shape[1] + dim = mesh_verts.shape[0] + box_els = set() + if elements is None: + elements = range(num_elements) + + for e in elements: + elem_centroid = elem_centroids[:,e] + in_box = True + for d in range(dim): + in_box = (in_box and (elem_centroid[d] >= box_a[d] + and elem_centroid[d] <= box_b[d])) + if in_box: + box_els.add(e) + + + return box_els + + +def test_mesh(dim, a, b, mesh, axis_fractions_wall=None): + + if axis_fractions_wall is None: + axis_fractions_wall = [1/4, 1/2, 1/2] + + if len(axis_fractions_wall) != dim: + raise ValueError("Wall axis fractions must be of size(dimension).") + + mesh_lens = [b[i] - a[i] for i in range(dim)] + print(f"{mesh_lens=}") + + wall_box_lens = [mesh_lens[i]*axis_fractions_wall[i] for i in range(dim)] + + wall_b = [b[i] for i in range(dim)] + wall_a = [b[i] - wall_box_lens[i] for i in range(dim)] + + print(f"{wall_box_lens=},{wall_a=},{wall_b=}") + + domain_elements = get_elements_in_box( + box_a=a, box_b=b, mesh=mesh) + + structures_elements = get_elements_in_box( + box_a=wall_a, box_b=wall_b, mesh=mesh) + + print(f"{len(structures_elements)=}") + + fluid_elements = domain_elements - structures_elements + print(f"{len(fluid_elements)=}") + + axis_fractions_wall[0] = axis_fractions_wall[0]/2 + surround_box_lens = [mesh_lens[i]*axis_fractions_wall[i] for i in range(dim)] + surround_b = [b[i] for i in range(dim)] + surround_a = [b[i] - surround_box_lens[i] for i in range(dim)] + print(f"{surround_box_lens=},{surround_a=},{surround_b=}") + + surround_elements = get_elements_in_box( + box_a=surround_a, box_b=surround_b, mesh=mesh, + elements=structures_elements) + print(f"{len(surround_elements)=}") + + wall_elements = structures_elements - surround_elements + print(f"{len(wall_elements)=}") + + volume_to_tags = { + "fluid": ["fluid"], + "wall": ["wall_insert", "wall_surround"]} + + tag_to_elements = { + "fluid": np.array(list(fluid_elements)), + "wall_insert": np.array(list(wall_elements)), + "wall_surround": np.array(list(surround_elements))} + + return mesh, tag_to_elements, volume_to_tags + +def mesh_testing(mesh, box_a, box_b): + + mesh_groups, = mesh.groups + + mesh_verts = mesh.vertices + print(f"{mesh_verts.shape=}") + + elem_verts = mesh_verts[:, mesh_groups.vertex_indices] + print(f"{elem_verts.shape=}") + + elem_centroids = np.sum(elem_verts, axis=2) / elem_verts.shape[2] + print(f"{elem_centroids.shape=}") + + num_elements = elem_centroids.shape[1] + dim = mesh_verts.shape[0] + print(f"{num_elements=},{dim=}") + box_els = [] + for e in range(num_elements): + elem_centroid = elem_centroids[:,e] + in_box = True + for d in range(dim): + in_box = (in_box and (elem_centroid[d] >= box_a[d] + and elem_centroid[d] <= box_b[d])) + if in_box: + box_els.append(e) + + print(f"nelem in box: {len(box_els)=}") + # elem_x = mesh_verts[0, mesh_groups.vertex_indices] + # elem_centroids = np.sum(elem_x, axis=1)/elem_x.shape[1] + # global_nelements = len(elem_centroids) + # aver_part_nelem = global_nelements / num_ranks + + + class InitSponge: r"""Solution initializer for flow in the ACT-II facility. @@ -654,6 +769,10 @@ def main(ctx_factory=cl.create_some_context, use_logmgr=True, if grid_only: return 0 + mesh, tag_to_els, vols_to_tags = test_mesh(dim, box_ll, box_ur, local_mesh) + + return 0 + dcoll = create_discretization_collection(actx, local_mesh, order) nodes = actx.thaw(dcoll.nodes()) ones = dcoll.zeros(actx) + 1.0 diff --git a/examples/vortex-mpi.py b/examples/vortex-mpi.py index 47b69d933..66be48964 100644 --- a/examples/vortex-mpi.py +++ b/examples/vortex-mpi.py @@ -82,6 +82,9 @@ def main(actx_class, ctx_factory=cl.create_some_context, use_logmgr=True, rank = comm.Get_rank() num_parts = comm.Get_size() + from communicator import Communicator + comm = Communicator(comm) + from mirgecom.simutil import global_reduce as _global_reduce global_reduce = partial(_global_reduce, comm=comm) @@ -289,6 +292,7 @@ def my_health_check(pressure, component_errors): return health_error def my_pre_step(step, t, dt, state): + comm.comm_profile.reset() fluid_state = make_fluid_state(state, gas_model) cv = fluid_state.cv dv = fluid_state.dv @@ -348,6 +352,9 @@ def my_pre_step(step, t, dt, state): def my_post_step(step, t, dt, state): # Logmgr needs to know about EOS, dt, dim? # imo this is a design/scope flaw + comm.comm_profile.finalize() + comm.comm_profile.print_profile() + comm.comm_profile.print_msg_sizes() if logmgr: set_dt(logmgr, dt) set_sim_state(logmgr, dim, state, eos) diff --git a/mirgecom/diffusion.py b/mirgecom/diffusion.py index 9e9b79cdc..5bc202eae 100644 --- a/mirgecom/diffusion.py +++ b/mirgecom/diffusion.py @@ -458,7 +458,8 @@ def diffusion_operator( # FIXME: Do something similar to make_operator_fluid_states to avoid # communicating kappa and u multiple times grad_u = grad_operator( - dcoll, kappa, boundaries, u, quadrature_tag=quadrature_tag, dd=dd_vol) + dcoll, kappa, boundaries, u, quadrature_tag=quadrature_tag, dd=dd_vol, + comm_tag=comm_tag) kappa_quad = op.project(dcoll, dd_vol, dd_vol_quad, kappa) grad_u_quad = op.project(dcoll, dd_vol, dd_vol_quad, grad_u) diff --git a/mirgecom/multiphysics/thermally_coupled_fluid_wall.py b/mirgecom/multiphysics/thermally_coupled_fluid_wall.py index 7cdb02638..65bae81f4 100644 --- a/mirgecom/multiphysics/thermally_coupled_fluid_wall.py +++ b/mirgecom/multiphysics/thermally_coupled_fluid_wall.py @@ -71,6 +71,25 @@ ) +class _FluidOpStatesTag: + pass + + +class _FluidGradTag: + pass + + +class _FluidOperatorTag: + pass + + +class _WallGradTag: + pass + + +class _WallOperatorTag: + pass + class _TemperatureInterVolTag: pass @@ -565,10 +584,11 @@ def coupled_grad_t_operator( dcoll, gas_model, fluid_all_boundaries_no_grad, fluid_state, time=time, numerical_flux_func=fluid_numerical_flux_func, quadrature_tag=quadrature_tag, dd=fluid_dd, - operator_states_quad=_fluid_operator_states_quad), + operator_states_quad=_fluid_operator_states_quad, + comm_tag=_FluidGradTag), wall_grad_t_operator( dcoll, wall_kappa, wall_all_boundaries_no_grad, wall_temperature, - quadrature_tag=quadrature_tag, dd=wall_dd)) + quadrature_tag=quadrature_tag, dd=wall_dd, comm_tag=_WallGradTag)) def coupled_ns_heat_operator( @@ -626,7 +646,7 @@ def coupled_ns_heat_operator( fluid_operator_states_quad = make_operator_fluid_states( dcoll, fluid_state, gas_model, fluid_all_boundaries_no_grad, - quadrature_tag, dd=fluid_dd) + quadrature_tag, dd=fluid_dd, comm_tag=_FluidOpStatesTag) fluid_grad_temperature, wall_grad_temperature = coupled_grad_t_operator( dcoll, @@ -665,7 +685,7 @@ def coupled_ns_heat_operator( time=time, quadrature_tag=quadrature_tag, dd=fluid_dd, viscous_numerical_flux_func=viscous_facial_flux_harmonic, operator_states_quad=fluid_operator_states_quad, - grad_t=fluid_grad_temperature, comm_tag=_NSOperatorTag) + grad_t=fluid_grad_temperature, comm_tag=_FluidOperatorTag) if use_av: if av_kwargs is None: @@ -677,6 +697,6 @@ def coupled_ns_heat_operator( wall_rhs = diffusion_operator( dcoll, wall_kappa, wall_all_boundaries, wall_temperature, penalty_amount=wall_penalty_amount, quadrature_tag=quadrature_tag, - dd=wall_dd, grad_u=wall_grad_temperature, comm_tag=_DiffusionOperatorTag) + dd=wall_dd, grad_u=wall_grad_temperature, comm_tag=_WallOperatorTag) return fluid_rhs, wall_rhs diff --git a/mirgecom/navierstokes.py b/mirgecom/navierstokes.py index 91a01259a..d552be84b 100644 --- a/mirgecom/navierstokes.py +++ b/mirgecom/navierstokes.py @@ -450,7 +450,8 @@ def ns_operator(dcoll, gas_model, state, boundaries, *, time=0.0, dcoll, gas_model, boundaries, state, time=time, numerical_flux_func=gradient_numerical_flux_func, quadrature_tag=quadrature_tag, dd=dd_vol, - operator_states_quad=operator_states_quad) + operator_states_quad=operator_states_quad, + comm_tag=comm_tag) # Communicate grad(CV) and put it on the quadrature domain grad_cv_interior_pairs = [ @@ -470,7 +471,8 @@ def ns_operator(dcoll, gas_model, state, boundaries, *, time=0.0, dcoll, gas_model, boundaries, state, time=time, numerical_flux_func=gradient_numerical_flux_func, quadrature_tag=quadrature_tag, dd=dd_vol, - operator_states_quad=operator_states_quad) + operator_states_quad=operator_states_quad, + comm_tag=comm_tag) # Create the interior face trace pairs, perform MPI exchange, interp to quad grad_t_interior_pairs = [ diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index ab396dca2..534b03b4e 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -711,10 +711,18 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, total_partitioned_elements = sum([len(part_to_elements[r]) for r in range(num_ranks)]) total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) + if total_partitioned_elements != total_nelem_part: raise ValueError("Element counts dont match") if total_partitioned_elements != global_nelements: raise ValueError("Total elements dont match.") + if len(elem_to_rank) != global_nelements: + raise ValueError("Elem-to-rank wrong size.") + + for e in range(global_nelements): + part = elem_to_rank[e] + if e not in part_to_elements[part]: + raise ValueError("part/element/part map mismatch.") return elem_to_rank @@ -894,9 +902,9 @@ def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL, nelem = 0 for grp in mesh.groups: nelem = nelem + grp.nelements - local_header = f"nproc: {nproc}\nrank: {rank}\nnelem: {nelem}" + local_header = f"nproc: {nproc}\nrank: {rank}\nnelem: {nelem}\n" else: - local_header = f"nproc: {nproc}\nrank: {rank}" + local_header = f"nproc: {nproc}\nrank: {rank}\n" from io import StringIO local_report = StringIO(local_header) @@ -910,10 +918,12 @@ def boundary_report(dcoll, boundaries, outfile_name, *, dd=DD_VOLUME_ALL, from meshmode.mesh import BTAG_PARTITION from meshmode.distributed import get_connected_parts connected_part_ids = get_connected_parts(dcoll.discr_from_dd(dd).mesh) + local_report.write(f"num_nbr_parts: {len(connected_part_ids)}\n") local_report.write(f"connected_part_ids: {connected_part_ids}\n") part_nodes = [] for connected_part_id in connected_part_ids: - boundary_discr = dcoll.discr_from_dd(BTAG_PARTITION(connected_part_id)) + boundary_discr = dcoll.discr_from_dd( + dd.trace(BTAG_PARTITION(connected_part_id))) nnodes = sum([grp.ndofs for grp in boundary_discr.groups]) part_nodes.append(nnodes) if part_nodes: From 3d8c9cb8eacacb3f7cc1455d1871d188d53a8ac3 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Wed, 26 Oct 2022 13:12:59 -0700 Subject: [PATCH 11/12] Add more validation, validate untuned parts too. --- mirgecom/simutil.py | 78 ++++++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/mirgecom/simutil.py b/mirgecom/simutil.py index 534b03b4e..adadcac06 100644 --- a/mirgecom/simutil.py +++ b/mirgecom/simutil.py @@ -432,7 +432,7 @@ def max_component_norm(dcoll, fields, order=np.inf, *, dd=DD_VOLUME_ALL): componentwise_norms(dcoll, fields, order, dd=dd), actx))) -def generate_and_distribute_mesh(comm, generate_mesh): +def generate_and_distribute_mesh(comm, generate_mesh, **kwargs): """Generate a mesh and distribute it among all ranks in *comm*. Generate the mesh with the user-supplied mesh generation function @@ -461,7 +461,7 @@ def generate_and_distribute_mesh(comm, generate_mesh): warn( "generate_and_distribute_mesh is deprecated and will go away Q4 2022. " "Use distribute_mesh instead.", DeprecationWarning, stacklevel=2) - return distribute_mesh(comm, generate_mesh) + return distribute_mesh(comm, generate_mesh, **kwargs) def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, @@ -504,18 +504,18 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, # Create geometrically even partitions elem_to_rank = ((elem_centroids-x_min) / part_interval).astype(int) - # Automatic load-balancing - if auto_balance: + # map partition id to list of elements in that partition + part_to_elements = {r: set((np.where(elem_to_rank == r))[0].flat) + for r in range(num_ranks)} + # make an array of the geometrically even partition sizes + # avoids calling "len" over and over on the element sets + nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] - # maps partition id to list of elements in that partition - part_to_elements = {r: set((np.where(elem_to_rank == r))[0].flat) - for r in range(num_ranks)} - # make an array of the geometrically even partition sizes - # avoids calling "len" over and over on the element sets - nelem_part = [len(part_to_elements[r]) for r in range(num_ranks)] + if debug: + print(f"Initial: {nelem_part=}") - if debug: - print(f"Initial: {nelem_part=}") + # Automatic load-balancing + if auto_balance: for r in range(num_ranks-1): @@ -593,9 +593,11 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, # This part is needed because the elements are not # distributed uniformly in space. fine_tuned = False + trial_portion_needed = portion_needed while not fine_tuned: - pos_update = portion_needed*reserv_interval + pos_update = trial_portion_needed*reserv_interval new_loc = part_loc[r+1] + pos_update + moved_elements = set() num_elem_mv = 0 for e in part_to_elements[adv_part]: @@ -609,10 +611,11 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, rel_ovrsht = ovrsht/float(num_elem_needed) if rel_ovrsht > 0.8: # bisect the space grabbed and try again - portion_needed = portion_needed/2.0 + trial_portion_needed = trial_portion_needed/2.0 else: fine_tuned = True + portion_needed = trial_portion_needed new_loc = part_loc[r+1] + pos_update if debug: print(f"--Tuned: {portion_needed=} [by spatial volume]") @@ -707,24 +710,39 @@ def geometric_mesh_partitioner(mesh, num_ranks=1, *, tag_to_elements=None, print(f"-Part({r=}): {nelem_part[r]=}, {part_imbalance=}") print(f"-Part({adv_part}): {nelem_part[adv_part]=}") - # Validate the partitioning before returning - total_partitioned_elements = sum([len(part_to_elements[r]) - for r in range(num_ranks)]) - total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) + # Validate the partitioning before returning + total_partitioned_elements = sum([len(part_to_elements[r]) + for r in range(num_ranks)]) + total_nelem_part = sum([nelem_part[r] for r in range(num_ranks)]) + + if debug: + print("Validating mesh parts.") + + if total_partitioned_elements != total_nelem_part: + raise ValueError("Validator: parted element counts dont match") + if total_partitioned_elements != global_nelements: + raise ValueError("Validator: global element counts dont match.") + if len(elem_to_rank) != global_nelements: + raise ValueError("Validator: elem-to-rank wrong size.") + if np.any(nelem_part) <= 0: + raise ValueError("Validator: empty partitions.") + + for e in range(global_nelements): + part = elem_to_rank[e] + if e not in part_to_elements[part]: + raise ValueError("Validator: part/element/part map mismatch.") - if total_partitioned_elements != total_nelem_part: - raise ValueError("Element counts dont match") - if total_partitioned_elements != global_nelements: - raise ValueError("Total elements dont match.") - if len(elem_to_rank) != global_nelements: - raise ValueError("Elem-to-rank wrong size.") + part_counts = np.zeros(global_nelements) + for part_elements in part_to_elements.values(): + for element in part_elements: + part_counts[element] = part_counts[element] + 1 - for e in range(global_nelements): - part = elem_to_rank[e] - if e not in part_to_elements[part]: - raise ValueError("part/element/part map mismatch.") + if np.any(part_counts > 1): + raise ValueError("Validator: degenerate elements") + if np.any(part_counts < 1): + raise ValueError("Validator: orphaned elements") - return elem_to_rank + return elem_to_rank def distribute_mesh(comm, get_mesh_data, partition_generator_func=None): @@ -789,10 +807,12 @@ def partition_generator_func(mesh, tag_to_elements, num_ranks): raise TypeError("Unexpected result from get_mesh_data") print(f"distribute_mesh partitioning begin: {time.ctime(time.time())}") + from meshmode.mesh.processing import partition_mesh rank_per_element = \ partition_generator_func(mesh, tag_to_elements=tag_to_elements, num_ranks=num_ranks) + print(f"distribute_mesh partitioning done: {time.ctime(time.time())}") print(f"distribute_mesh: mesh data struct start: {time.ctime(time.time())}") From 12620392e746fe58b3207ef4625a700e9f96b8b1 Mon Sep 17 00:00:00 2001 From: "Michael T. Campbell" Date: Wed, 26 Oct 2022 13:15:18 -0700 Subject: [PATCH 12/12] Add Shelbys MPI Comm profiler --- mirgecom/mpi.py | 358 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 358 insertions(+) diff --git a/mirgecom/mpi.py b/mirgecom/mpi.py index de292ba1d..bbe2fb0f3 100644 --- a/mirgecom/mpi.py +++ b/mirgecom/mpi.py @@ -202,3 +202,361 @@ def rank_print(*args, **kwargs): if "oldprint" not in __builtins__: __builtins__["oldprint"] = __builtins__["print"] __builtins__["print"] = rank_print + + +################################################ +# +# CPU helper functions for sending and receiving +# +################################################ +def _isend_cpu(mpi_communicator, data_ary, receiver_rank, Tag, data_ary_size, profiler): + """ + Returns : + MPI send request + Inputs : + data_ary : data to be communicated -- in a format that the array context understands + receiver_rank : MPI rank receiving data + Tag : MPI communication tag + """ + if profiler: + profiler.init_start(data_ary_size, receiver=receiver_rank, sender=None) + Return_Request = mpi_communicator.Isend(data_ary, receiver_rank, tag=Tag) + if profiler: + profiler.init_stop(receiver_rank) + + return Return_Request + +def _irecv_cpu(mpi_communicator, data_ary, sender_rank, Tag, data_ary_size, profiler): + """ + Returns mpi recv request + """ + if profiler: + profiler.init_start(data_ary_size, receiver=None, sender=sender_rank) + Return_Request = mpi_communicator.Irecv(data_ary, sender_rank, tag=Tag) + if profiler: + profiler.init_stop(sender_rank) + + return Return_Request + +def _wait_cpu(mpi_req, profiler): + """ + Returns nothing + """ + if profiler: + profiler.finish_start() + mpi_req.Wait() + if profiler: + profiler.finish_stop() + +def _waitsome_cpu(mpi_reqs, profiler): + """ + Returns indices + """ + from mpi4py import MPI + if profiler: + profiler.finish_start() + indices = MPI.Request.Waitsome(mpi_reqs) + if profiler: + profiler.finish_stop() + + return indices + +def _waitall_cpu(mpi_reqs, profiler): + """ + Returns nothing + """ + from mpi4py import MPI + if profiler: + profiler.finish_start() + MPI.Request.waitall(mpi_reqs) + if profiler: + profiler.finish_stop() + +################################################ +# +# Profiling object to contain all communication +# data +# +################################################ +class CommunicationProfile: + """ + Holds all communication profile data + """ + + def __init__(self): + """ + Initializes communication profile object + """ + self.ppn = 4 + + # Variables to hold total times for sends and receives + # as well as the data transfer costs encurred for communication + self.init_t = 0.0 + self.finish_t = 0.0 + self.dev_cpy_t = 0.0 + + self.init_t_start = 0.0 + self.finish_t_start = 0.0 + self.dev_cpy_t_start = 0.0 + + self.init_inter_t = 0.0 + self.init_intra_t = 0.0 + self.finish_inter_t = 0.0 + self.finish_intra_t = 0.0 + + self.init_inter_t_start = 0.0 + self.init_intra_t_start = 0.0 + self.finish_inter_t_start = 0.0 + self.finish_intra_t_start = 0.0 + + # Variables to hold numbers of messages initialized and received + # as well as the number of data copies to and from device + self.init_m = 0 + self.finish_m = 0 + self.dev_cpy_m = 0 + + # Lists to contain ALL messages sizes for initialized and + # received messages per rank + self.send_msg_sizes = [] + self.recv_msg_sizes = [] + + def reset(self): + self.init_t = 0.0 + self.finish_t = 0.0 + self.dev_cpy_t = 0.0 + + self.init_m = 0 + self.finish_m = 0 + self.dev_cpy_m = 0 + + self.send_msg_sizes = [] + self.recv_msg_sizes = [] + + def init_start(self, msg_size=None, receiver=None, sender=None): + from mpi4py import MPI + # if receiver: + # my_rank = MPI.COMM_WORLD.Get_rank() + # remainder = my_rank % self.ppn + # highest_local = self.ppn - remainder + my_rank + # lowest_local = my_rank - remainder + # if (receiver < lowest_local) or (receiver > highest_local): + # self.init_inter_t_start = MPI.Wtime() + # else: + # self.init_intra_t_start = MPI.Wtime() + # else: + self.init_t_start = MPI.Wtime() + + self.init_m += 1 + if msg_size and receiver: + #print('to ', receiver,'sz ', msg_size) + self.send_msg_sizes.append([receiver, msg_size]) + elif msg_size and sender: + #print('from ', sender,'sz ', msg_size) + self.recv_msg_sizes.append([sender, msg_size]) + + def init_stop(self, receiver=None): + from mpi4py import MPI + # if receiver: + # my_rank = MPI.COMM_WORLD.Get_rank() + # remainder = my_rank % self.ppn + # highest_local = self.ppn - remainder + my_rank + # lowest_local = my_rank - remainder + # if (receiver < lowest_local) or (receiver > highest_local): + # self.init_inter_t += (MPI.Wtime() - self.init_inter_t_start) + # else: + # self.init_intra_t += (MPI.Wtime() - self.init_intra_t_start) + #else: + self.init_t += (MPI.Wtime() - self.init_t_start) + + def finish_start(self, msg_size=None, receiver=None): + from mpi4py import MPI + if receiver: + my_rank = MPI.COMM_WORLD.Get_rank() + remainder = my_rank % self.ppn + highest_local = self.ppn - remainder + my_rank + lowest_local = my_rank - remainder + if (receiver < lowest_local) or (receiver > highest_local): + self.finish_inter_t_start = MPI.Wtime() + else: + self.finish_intra_t_start = MPI.Wtime() + else: + self.finish_t_start = MPI.Wtime() + + self.finish_m += 1 + + def finish_stop(self, receiver=None): + from mpi4py import MPI + if receiver: + my_rank = MPI.COMM_WORLD.Get_rank() + remainder = my_rank % self.ppn + highest_local = self.ppn - remainder + my_rank + lowest_local = my_rank - remainder + if (receiver < lowest_local) or (receiver > highest_local): + self.finish_inter_t += (MPI.Wtime() - self.finish_inter_t_start) + else: + self.finish_intra_t += (MPI.Wtime() - self.finish_intra_t_start) + else: + self.finish_t += (MPI.Wtime() - self.finish_t_start) + + def dev_copy_start(self): + from mpi4py import MPI + self.dev_cpy_t_start = MPI.Wtime() + self.dev_cpy_m += 1 + + def dev_copy_stop(self): + from mpi4py import MPI + self.dev_cpy_t += (MPI.Wtime() - self.dev_cpy_t_start) + + def average(self): + """ + Returns profiling data averages in a tuple of the form: + ( init_avg, finish_avg, dev_avg ) + + init_avg : initialization time average, + finish_avg : finishing communication time average, + dev_avg : average amount of time spent copying data to/from device + """ + + init_avg = self.init_t / self.init_m + finish_avg = self.finish_t / self.finish_m + dev_copy_avg = self.dev_cpy_t / self.dev_cpy_m + + return (init_avg, finish_avg, dev_copy_avg) + + def finalize(self): + """ + Finalizes profiling data + """ + self.print_profile() + self.print_msg_sizes() + + return + + def print_profile(self): + """ + Formatted print of profiling data + """ + from mpi4py import MPI + rank = MPI.COMM_WORLD.Get_rank() + num_procs = MPI.COMM_WORLD.Get_size() + + for i in range(num_procs): + if i == rank: + print(F'------------------Process {rank:4d}------------') + print(F'Init Total Time {self.init_intra_t+self.init_inter_t:.5f}') + print(F'Init Intra Time {self.init_intra_t:.5f}') + print(F'Init Inter Time {self.init_inter_t:.5f}') + print(F'Init Messages {self.init_m:4d}') + print(F'Finish Total Time {self.finish_t:.5f}') + print(F'Finish Messages {self.finish_m:4d}') + print(F'Device Copy Total Time {self.dev_cpy_t:.5f}') + print(F'Device Copies {self.dev_cpy_m:4d}') + MPI.COMM_WORLD.Barrier() + + return + + def print_msg_sizes(self): + import numpy as np + from mpi4py import MPI + p = MPI.COMM_WORLD.Get_rank() + np.save('initialized_send_msg_sizes_p'+str(p), np.array(self.send_msg_sizes)) + np.save('initialized_recv_msg_sizes_p'+str(p), np.array(self.recv_msg_sizes)) + return + +################################################ +# +# Communicator object to hold which sending +# and receiving functions to use +# +################################################ +class ProfilingCommunicator: + """ + Communication class + actx : meshmode array_context + """ + + def __init__(self, comm=None, cflag=False, profile=True): + """ + Initialization function + """ + from mpi4py import MPI + self.mpi_communicator = comm + if comm is None: + self.mpi_communicator = MPI.COMM_WORLD + + self.d_type = MPI.DOUBLE # The MPI datatype being communicated + self.cuda_flag = cflag # Whether the MPI is CUDA-Aware and running on Nvidia GPU + self.comm_profile = None # Communication profile is not initialized unless profile + # flag is set + + self.rank = comm.Get_rank() + self.size = comm.Get_size() + + # Initialize communication routines to be performed via CPU communication + self.isend = _isend_cpu + self.irecv = _irecv_cpu + self.wait = _wait_cpu + self.waitall = _waitall_cpu + self.waitsome = _waitsome_cpu + + # Create CommunicationProfile object + if profile: + self.comm_profile = CommunicationProfile() + + def Isend(self, data_ary, dest=None, tag=None, size=0): + """ + data_ary : data to be communicated -- in a format that the array context understands + receiver_rank : MPI rank receiving data + Tag : MPI communication tag + """ + return self.isend(self.mpi_communicator, data_ary, dest, tag, size, self.comm_profile) + + def Irecv(self, buf=None, source=None, tag=None, size=0): + """ + data_ary : aray for data to be received into -- in a format that the array context understands + sender_rank : MPI rank sending data + Tag : MPI communication tag + """ + return self.irecv(self.mpi_communicator, buf, source, tag, size, self.comm_profile) + + def Wait(self, mpi_req): + self.wait(mpi_req, self.comm_profile) + + def Waitsome(self, mpi_reqs): + return self.waitsome(mpi_reqs, self.comm_profile) + + def Waitall(self, mpi_reqs): + self.waitall(mpi_reqs, self.comm_profile) + + def allreduce(self, array, op=None): + from mpi4py import MPI + if op is None: + op = MPI.MAX + return self.mpi_communicator.allreduce(array, op=op) + + def reduce(self, obj, op=None, root=0): + from mpi4py import MPI + if op is None: + op = MPI.MAX + return self.mpi_communicator.reduce(obj, op=op, root=root) + + def barrier(self): + return self.mpi_communicator.barrier() + + def Barrier(self): + return self.mpi_communicator.Barrier() + + def bcast(self, obj, root=0): + return self.mpi_communicator.bcast(obj, root=0) + + def gather(self, obj, root=0): + return self.mpi_communicator.gather(obj, root=0) + + def Get_rank(self): + return self.mpi_communicator.Get_rank() + + def Get_size(self): + return self.mpi_communicator.Get_size() + + def Dup(self): + return self.mpi_communicator.Dup()