From e502fe1e0a535216fe1f9a0835b85f315b158d77 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Mon, 16 Dec 2024 15:22:46 +0000 Subject: [PATCH 01/11] wip --- gusto/core/coordinates.py | 91 ++++++++++++++++++++------------------- gusto/core/domain.py | 40 +++++++++-------- gusto/core/io.py | 1 + 3 files changed, 69 insertions(+), 63 deletions(-) diff --git a/gusto/core/coordinates.py b/gusto/core/coordinates.py index 28862e176..53cc2d75f 100644 --- a/gusto/core/coordinates.py +++ b/gusto/core/coordinates.py @@ -6,6 +6,7 @@ from gusto.core.coord_transforms import lonlatr_from_xyz, rotated_lonlatr_coords from gusto.core.logging import logger from firedrake import SpatialCoordinate, Function +from firedrake.utils import IntType import numpy as np import pandas as pd @@ -67,7 +68,6 @@ def __init__(self, mesh, on_sphere=False, rotated_pole=None, radius=None): # -------------------------------------------------------------------- # self.chi_coords = {} # Dict of natural coords by space - self.global_chi_coords = {} # Dict of whole coords stored on first proc self.parallel_array_lims = {} # Dict of array lengths for each proc def register_space(self, domain, space_name): @@ -87,8 +87,6 @@ def register_space(self, domain, space_name): """ comm = self.mesh.comm - comm_size = comm.Get_size() - my_rank = comm.Get_rank() topological_dimension = self.mesh.topological_dimension() if space_name in self.chi_coords.keys(): @@ -113,48 +111,53 @@ def register_space(self, domain, space_name): self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i])) # -------------------------------------------------------------------- # - # Code for settings up coordinates for parallel-serial IO + # Code for settings up coordinates for parallel IO # -------------------------------------------------------------------- # - len_coords = space.dim() - my_num_dofs = len(self.chi_coords[space_name][0].dat.data_ro[:]) - - if my_rank != 0: - # Do not make full coordinate array - self.global_chi_coords[space_name] = None - self.parallel_array_lims[space_name] = None - # Find number of DoFs on this processor - comm.send(my_num_dofs, dest=0) - else: - # First processor has one large array of the global chi data - self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords)) - # Store the limits inside this array telling us how data is partitioned - self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int) - # First processor has the first bit of data - self.parallel_array_lims[space_name][my_rank][0] = 0 - self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1 - # Receive number of DoFs on other processors - for procid in range(1, comm_size): - other_num_dofs = comm.recv(source=procid) - self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1 - self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1 + nlocal_dofs = len(self.chi_coords[space_name][0].dat.data_ro) + sendbuf = np.asarray([nlocal_dofs], dtype=IntType) + comm.scan(sendbuf) + stop = sendbuf[0] + start = stop - nlocal_dofs + + self.parallel_array_lims[space_name] = (start, stop) + + # if my_rank != 0: + # # Do not make full coordinate array + # # self.global_chi_coords[space_name] = None + # self.parallel_array_lims[space_name] = None + # # Find number of DoFs on this processor + # comm.send(my_num_dofs, dest=0) + # else: + # # First processor has one large array of the global chi data + # # self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords)) + # # Store the limits inside this array telling us how data is partitioned + # self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int) + # # First processor has the first bit of data + # self.parallel_array_lims[space_name][my_rank][0] = 0 + # self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1 + # # Receive number of DoFs on other processors + # for procid in range(1, comm_size): + # other_num_dofs = comm.recv(source=procid) + # self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1 + # self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1 # Now move coordinates to first processor - for i in range(topological_dimension): - if my_rank != 0: - # Send information to rank 0 - my_tag = comm_size*i + my_rank - comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag) - else: - # Rank 0 -- the receiver - (low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:] - self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:] - # Receive coords from each processor and put them into array - for procid in range(1, comm_size): - my_tag = comm_size*i + procid - new_coords = comm.recv(source=procid, tag=my_tag) - (low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:] - self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords + # for i in range(topological_dimension): + # if my_rank != 0: + # # Send information to rank 0 + # my_tag = comm_size*i + my_rank + # comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag) + # else: + # # Rank 0 -- the receiver + # (low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:] + # # self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:] + # # Receive coords from each processor and put them into array + # for procid in range(1, comm_size): + # my_tag = comm_size*i + procid + # new_coords = comm.recv(source=procid, tag=my_tag) + # (low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:] + # self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords def get_column_data(self, field, domain): """ @@ -177,9 +180,9 @@ def get_column_data(self, field, domain): coords = self.chi_coords[space_name] data_is_3d = (len(coords) == 3) - coords_X = coords[0].dat.data - coords_Y = coords[1].dat.data if data_is_3d else None - coords_Z = coords[-1].dat.data + coords_X = coords[0].dat.data_ro + coords_Y = coords[1].dat.data_ro if data_is_3d else None + coords_Z = coords[-1].dat.data_ro # ------------------------------------------------------------------------ # # Round data to ensure sorting in dataframe is OK diff --git a/gusto/core/domain.py b/gusto/core/domain.py index 0cf23fb7f..e73496ab0 100644 --- a/gusto/core/domain.py +++ b/gusto/core/domain.py @@ -10,6 +10,7 @@ inner, grad, VectorFunctionSpace, Function, FunctionSpace, perp) import numpy as np +from mpi4py import MPI class Domain(object): @@ -218,25 +219,26 @@ def construct_domain_metadata(mesh, coords, on_sphere): else: raise ValueError('Unable to determine domain type') + # Determine domain properties + chi = coords.chi_coords['DG1_equispaced'] comm = mesh.comm - my_rank = comm.Get_rank() - - # Properties of domain will be determined from full coords, so need - # doing on the first processor then broadcasting to others - - if my_rank == 0: - chi = coords.global_chi_coords['DG1_equispaced'] - if not on_sphere: - metadata['domain_extent_x'] = np.max(chi[0, :]) - np.min(chi[0, :]) - if metadata['domain_type'] in ['plane', 'extruded_plane']: - metadata['domain_extent_y'] = np.max(chi[1, :]) - np.min(chi[1, :]) - if mesh.extruded: - metadata['domain_extent_z'] = np.max(chi[-1, :]) - np.min(chi[-1, :]) - - else: - metadata = {} - - # Send information to other processors - metadata = comm.bcast(metadata, root=0) + if not on_sphere: + _min_x = np.min(chi[0].dat.data_ro) + _max_x = np.max(chi[0].dat.data_ro) + min_x = comm.allreduce(_min_x, MPI.MIN) + max_x = comm.allreduce(_max_x, MPI.MAX) + metadata['domain_extent_x'] = max_x - min_x + if metadata['domain_type'] in ['plane', 'extruded_plane']: + _min_y = np.min(chi[1].dat.data_ro) + _max_y = np.max(chi[1].dat.data_ro) + min_y = comm.allreduce(_min_y, MPI.MIN) + max_y = comm.allreduce(_max_y, MPI.MAX) + metadata['domain_extent_y'] = max_y - min_y + if mesh.extruded: + _min_z = np.min(chi[-1].dat.data_ro) + _max_z = np.max(chi[-1].dat.data_ro) + min_z = comm.allreduce(_min_z, MPI.MIN) + max_z = comm.allreduce(_max_z, MPI.MAX) + metadata['domain_extent_z'] = max_z - min_z return metadata diff --git a/gusto/core/io.py b/gusto/core/io.py index 6f8f74712..ddd85dfb6 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -816,6 +816,7 @@ def create_nc_dump(self, filename, space_names): nc_field_file.close() + # NOT PARALLEL SAFE! def write_nc_dump(self, t): comm = self.mesh.comm From 010f94c476f7716603881ff8bac698f475b5c7ce Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 11:29:37 +0000 Subject: [PATCH 02/11] Appears to work, needs testing --- gusto/core/coordinates.py | 44 +-------- gusto/core/io.py | 200 +++++++++++++++++++++++++------------- 2 files changed, 135 insertions(+), 109 deletions(-) diff --git a/gusto/core/coordinates.py b/gusto/core/coordinates.py index 53cc2d75f..e27cb065e 100644 --- a/gusto/core/coordinates.py +++ b/gusto/core/coordinates.py @@ -110,55 +110,15 @@ def register_space(self, domain, space_name): for i in range(topological_dimension): self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i])) - # -------------------------------------------------------------------- # - # Code for settings up coordinates for parallel IO - # -------------------------------------------------------------------- # - + # Determine the offsets of the local piece of data into the global array + # NOTE: Could come from a section?? getOffsetRange? nlocal_dofs = len(self.chi_coords[space_name][0].dat.data_ro) sendbuf = np.asarray([nlocal_dofs], dtype=IntType) comm.scan(sendbuf) stop = sendbuf[0] start = stop - nlocal_dofs - self.parallel_array_lims[space_name] = (start, stop) - # if my_rank != 0: - # # Do not make full coordinate array - # # self.global_chi_coords[space_name] = None - # self.parallel_array_lims[space_name] = None - # # Find number of DoFs on this processor - # comm.send(my_num_dofs, dest=0) - # else: - # # First processor has one large array of the global chi data - # # self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords)) - # # Store the limits inside this array telling us how data is partitioned - # self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int) - # # First processor has the first bit of data - # self.parallel_array_lims[space_name][my_rank][0] = 0 - # self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1 - # # Receive number of DoFs on other processors - # for procid in range(1, comm_size): - # other_num_dofs = comm.recv(source=procid) - # self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1 - # self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1 - - # Now move coordinates to first processor - # for i in range(topological_dimension): - # if my_rank != 0: - # # Send information to rank 0 - # my_tag = comm_size*i + my_rank - # comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag) - # else: - # # Rank 0 -- the receiver - # (low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:] - # # self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:] - # # Receive coords from each processor and put them into array - # for procid in range(1, comm_size): - # my_tag = comm_size*i + procid - # new_coords = comm.recv(source=procid, tag=my_tag) - # (low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:] - # self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords - def get_column_data(self, field, domain): """ Reshapes a field's data into columns. diff --git a/gusto/core/io.py b/gusto/core/io.py index ddd85dfb6..6c2cc6efe 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -393,7 +393,7 @@ def setup_dump(self, state_fields, t, pick_up=False): running_tests = '--running-tests' in sys.argv or "pytest" in self.output.dirname # Raising exceptions needs to be done in parallel - if self.mesh.comm.Get_rank() == 0: + if self.mesh.comm.rank == 0: # Create results directory if it doesn't already exist if not path.exists(self.dumpdir): try: @@ -444,7 +444,7 @@ def setup_dump(self, state_fields, t, pick_up=False): if pick_up: # Pick up t idx - if self.mesh.comm.Get_rank() == 0: + if self.mesh.comm.rank == 0: nc_field_file = Dataset(self.nc_filename, 'r') self.field_t_idx = len(nc_field_file['time'][:]) nc_field_file.close() @@ -761,75 +761,84 @@ def dump(self, state_fields, time_data): self.dumpfile_ll.write(*self.to_dump_latlon) def create_nc_dump(self, filename, space_names): - my_rank = self.mesh.comm.Get_rank() self.field_t_idx = 0 - if my_rank == 0: - nc_field_file = Dataset(filename, 'w') + comm = self.mesh.comm + nc_field_file, nc_supports_parallel = make_nc_dataset(filename, 'w', comm) + + if nc_supports_parallel or comm.rank == 0: nc_field_file.createDimension('time', None) nc_field_file.createVariable('time', float, ('time',)) + # https://unidata.github.io/netcdf4-python/#parallel-io + nc_field_file['time'].set_collective(True) # Add mesh metadata for metadata_key, metadata_value in self.domain.metadata.items(): # If the metadata is None or a Boolean, try converting to string # This is because netCDF can't take these types as options - if type(metadata_value) in [type(None), type(True)]: + if metadata_value is None or isinstance(metadata_value, bool): output_value = str(metadata_value) else: output_value = metadata_value # Get the type from the metadata itself - nc_field_file.createVariable(metadata_key, type(output_value), []) - nc_field_file.variables[metadata_key][0] = output_value + nc_field_file.createDimension(metadata_key, 1) + nc_field_file.createVariable(metadata_key, type(output_value), (metadata_key,)) + nc_field_file.variables[metadata_key] = output_value - # Add coordinates if they are not already in the file - for space_name in space_names: - if space_name not in self.domain.coords.chi_coords.keys(): - # Space not registered - # TODO: we should fail here, but currently there are some spaces - # that we can't output for so instead just skip outputting - pass - else: - coord_fields = self.domain.coords.global_chi_coords[space_name] - num_points = len(self.domain.coords.global_chi_coords[space_name][0]) - - nc_field_file.createDimension('coords_'+space_name, num_points) - - for (coord_name, coord_field) in zip(self.domain.coords.coords_name, coord_fields): - nc_field_file.createVariable(coord_name+'_'+space_name, float, 'coords_'+space_name) - nc_field_file.variables[coord_name+'_'+space_name][:] = coord_field[:] - - # Create variable for storing the field values - for field in self.to_dump: - field_name = field.name() - space_name = field.function_space().name - if space_name not in self.domain.coords.chi_coords.keys(): - # Space not registered - # TODO: we should fail here, but currently there are some spaces - # that we can't output for so instead just skip outputting - logger.warning(f'netCDF outputting for space {space_name} ' - + 'not yet implemented, so unable to output ' - + f'{field_name} field') + # Add coordinates if they are not already in the file + for space_name in space_names: + if space_name not in self.domain.coords.chi_coords.keys(): + # Space not registered + # TODO: we should fail here, but currently there are some spaces + # that we can't output for so instead just skip outputting + pass + else: + coord_fields = self.domain.coords.chi_coords[space_name] + ndofs = coord_fields[0].function_space().dim() + + if nc_supports_parallel or comm.rank == 0: + nc_field_file.createDimension(f'coords_{space_name}', ndofs) + + for i, (coord_name, coord_field) in enumerate(zip(self.domain.coords.coords_name, coord_fields)): + if nc_supports_parallel or comm.rank == 0: + nc_field_file.createVariable(f'{coord_name}_{space_name}', float, f'coords_{space_name}') + + if nc_supports_parallel: + start, stop = self.domain.coords.parallel_array_lims[space_name] + nc_field_file.variables[f'{coord_name}_{space_name}'][start:stop] = coord_field.dat.data_ro else: - nc_field_file.createGroup(field_name) - nc_field_file[field_name].createVariable('field_values', float, ('coords_'+space_name, 'time')) + global_coord_fields = gather_field_data(coord_field, i, self.domain) + if comm.rank == 0: + nc_field_file.variables[f'{coord_name}_{space_name}'][...] = global_coord_field + # Create variable for storing the field values + for field in self.to_dump: + field_name = field.name() + space_name = field.function_space().name + if space_name not in self.domain.coords.chi_coords.keys(): + # Space not registered + # TODO: we should fail here, but currently there are some spaces + # that we can't output for so instead just skip outputting + logger.warning(f'netCDF outputting for space {space_name} ' + + 'not yet implemented, so unable to output ' + + f'{field_name} field') + else: + if nc_supports_parallel or comm.rank == 0: + nc_field_file.createGroup(field_name) + nc_field_file[field_name].createVariable('field_values', float, (f'coords_{space_name}', 'time')) + if nc_supports_parallel or comm.rank == 0: nc_field_file.close() - # NOT PARALLEL SAFE! def write_nc_dump(self, t): - comm = self.mesh.comm - my_rank = comm.Get_rank() - comm_size = comm.Get_size() + nc_field_file, nc_supports_parallel = make_nc_dataset(self.nc_filename, 'a', comm) # Open file to add time - if my_rank == 0: - nc_field_file = Dataset(self.nc_filename, 'a') + if nc_supports_parallel or comm.rank == 0: nc_field_file['time'][self.field_t_idx] = t # Loop through output field data here - num_fields = len(self.to_dump) for i, field in enumerate(self.to_dump): field_name = field.name() space_name = field.function_space().name @@ -844,29 +853,17 @@ def write_nc_dump(self, t): # Scalar elements # -------------------------------------------------------- # else: - j = 0 - # For most processors send data to first processor - if my_rank != 0: - # Make a tag to uniquely identify this call - my_tag = comm_size*(num_fields*j + i) + my_rank - comm.send(field.dat.data_ro[:], dest=0, tag=my_tag) + nc_field_file[field_name]['field_values'].set_collective(True) + + if nc_supports_parallel: + start, stop = self.domain.coords.parallel_array_lims[space_name] + nc_field_file[field_name]['field_values'][start:stop, self.field_t_idx] = field.dat.data_ro else: - # Set up array to store full data in - total_data_size = self.domain.coords.parallel_array_lims[space_name][comm_size-1][1]+1 - single_proc_data = np.zeros(total_data_size) - # Get data for this processor first - (low_lim, up_lim) = self.domain.coords.parallel_array_lims[space_name][my_rank][:] - single_proc_data[low_lim:up_lim+1] = field.dat.data_ro[:] - # Receive data from other processors - for procid in range(1, comm_size): - my_tag = comm_size*(num_fields*j + i) + procid - incoming_data = comm.recv(source=procid, tag=my_tag) - (low_lim, up_lim) = self.domain.coords.parallel_array_lims[space_name][procid][:] - single_proc_data[low_lim:up_lim+1] = incoming_data[:] - # Store whole field data - nc_field_file[field_name].variables['field_values'][:, self.field_t_idx] = single_proc_data[:] - - if my_rank == 0: + global_field_data = gather_field_data(field, i, self.domain) + if comm.rank == 0: + nc_field_file[field_name]['field_values'][:, self.field_t_idx] = global_field_data + + if nc_supports_parallel or comm.rank == 0: nc_field_file.close() self.field_t_idx += 1 @@ -916,3 +913,72 @@ def topo_sort(field_deps): if f not in schedule) raise RuntimeError("Field dependencies have a cycle:\n\n%s" % cycle) return list(map(name2field.__getitem__, schedule)) + + +def make_nc_dataset(filename, access, comm): + """Create a netCDF data set, possibly in parallel. + + Args: + filename (str): The filename. + access (str): The access descriptor - ``r``, ``w`` or ``a``. + comm: The communicator. + + Returns: + tuple: 2-tuple of :class:`netCDF4_netCDF4.Dataset` (or `None`) and `bool` + indicating whether the + + """ + try: + nc_field_file = Dataset(filename, access, parallel=True) + nc_supports_parallel = True + + # https://unidata.github.io/netcdf4-python/#parallel-io + if 'time' in nc_field_file.variables.keys(): + nc_field_file['time'].set_collective(True) + except ValueError: + # parallel netCDF not available, use the serial version instead + if comm.size > 1: + import warnings + warnings.warn( + "Serial netCDF in use even though you are running in parallel. This " + "is especially inefficient at high core counts. Please refer to the " + "documentation for information about installing a parallel version " + "of netCDF.", + UserWarning, + ) + + if comm.rank == 0: + nc_field_file = Dataset(filename, access, parallel=False) + else: + nc_field_file = None + nc_supports_parallel = False + return nc_field_file, nc_supports_parallel + + +def gather_field_data(field, field_index, domain): + """TODO.""" + if domain.comm.size == 1: + return field.dat.data_ro + + comm = domain.comm + space_name = field.function_space().name() + + if comm.rank == 0: + # Set up array to store full data in + global_data = np.zeros(field.function_space().dim(), dtype=field.dat.dtype) + + # Store data for this processor first + (start, stop) = domain.coords.parallel_array_lims[space_name] + global_data[start:stop] = field.dat.data_ro[...] + + # Receive data from other processors + for rank in range(1, comm.size): + incoming_data = comm.recv(source=rank, tag=comm.size*field_index + rank) + start, stop = stop, stop + incoming_data.size + global_data[start:stop] = incoming_data + + else: + comm.send(field.dat.data_ro, dest=0, tag=comm.size*field_index + comm.rank) + global_data = None + + return global_data From fa02bc7a441853d9238fb194c830d603f5e59fff Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 11:42:31 +0000 Subject: [PATCH 03/11] DO NOT MERGE, tweak CI --- .github/workflows/build.yml | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a94805e68..40a64e6ba 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,12 +33,33 @@ jobs: python -m pip install -e . python -m pip install \ pytest-timeout pytest-xdist - - name: Gusto tests + - name: Gusto tests (new netcdf) run: | . /home/firedrake/firedrake/bin/activate firedrake-clean export GUSTO_PARALLEL_LOG=FILE export PYOP2_CFLAGS=-O0 + python -m pip uninstall netCDF4 + export HDF5_DIR=$PETSC_DIR/packages + export NETCDF4_DIR=$PETSC_DIR/packages + python -m pip install --no-binary netCDF4 netCDF4 + python -m pytest \ + -n 12 --dist worksteal \ + --durations=100 \ + --timeout=3600 \ + --timeout-method=thread \ + -o faulthandler_timeout=3660 \ + -v unit-tests integration-tests examples + timeout-minutes: 120 + - name: Gusto tests (old netcdf) + run: | + . /home/firedrake/firedrake/bin/activate + python -m pip uninstall netCDF4 + python -m pip cache remove netCDF4 + python -m pip install --only-binary netCDF4 netCDF4 + firedrake-clean + export GUSTO_PARALLEL_LOG=FILE + export PYOP2_CFLAGS=-O0 python -m pytest \ -n 12 --dist worksteal \ --durations=100 \ From 58c44396abf24e33fe5bae4ef64ba27d8afc3a5e Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 11:45:44 +0000 Subject: [PATCH 04/11] fixup --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 40a64e6ba..874d94273 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -42,7 +42,7 @@ jobs: python -m pip uninstall netCDF4 export HDF5_DIR=$PETSC_DIR/packages export NETCDF4_DIR=$PETSC_DIR/packages - python -m pip install --no-binary netCDF4 netCDF4 + python -m pip install --no-binary netCDF4 --no-build-isolation netCDF4 python -m pytest \ -n 12 --dist worksteal \ --durations=100 \ From bc78d0c8c4377451c6066577e2788faa9c334702 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 14:20:38 +0000 Subject: [PATCH 05/11] fixups --- .github/workflows/build.yml | 2 +- gusto/core/io.py | 65 +++++++++++++++++++++++-------------- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 874d94273..25ccc9f96 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -39,7 +39,7 @@ jobs: firedrake-clean export GUSTO_PARALLEL_LOG=FILE export PYOP2_CFLAGS=-O0 - python -m pip uninstall netCDF4 + python -m pip uninstall -y netCDF4 export HDF5_DIR=$PETSC_DIR/packages export NETCDF4_DIR=$PETSC_DIR/packages python -m pip install --no-binary netCDF4 --no-build-isolation netCDF4 diff --git a/gusto/core/io.py b/gusto/core/io.py index 6c2cc6efe..80fd072c7 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -2,7 +2,7 @@ from os import path, makedirs import itertools -from netCDF4 import Dataset +from netCDF4 import Dataset, stringtochar import sys import time from gusto.diagnostics import Diagnostics, CourantNumber @@ -769,10 +769,10 @@ def create_nc_dump(self, filename, space_names): if nc_supports_parallel or comm.rank == 0: nc_field_file.createDimension('time', None) nc_field_file.createVariable('time', float, ('time',)) - # https://unidata.github.io/netcdf4-python/#parallel-io - nc_field_file['time'].set_collective(True) # Add mesh metadata + nc_field_file.createDimension("dim_one", 1) + nc_field_file.createDimension("dim_string", 256) for metadata_key, metadata_value in self.domain.metadata.items(): # If the metadata is None or a Boolean, try converting to string # This is because netCDF can't take these types as options @@ -781,10 +781,15 @@ def create_nc_dump(self, filename, space_names): else: output_value = metadata_value - # Get the type from the metadata itself - nc_field_file.createDimension(metadata_key, 1) - nc_field_file.createVariable(metadata_key, type(output_value), (metadata_key,)) - nc_field_file.variables[metadata_key] = output_value + # TODO: Add comment explaining things + if isinstance(output_value, str): + nc_field_file.createVariable(metadata_key, 'S1', ('dim_one', 'dim_string')) + nc_field_file[metadata_key].set_collective(True) + output_char_array = np.array([output_value], dtype='S256') + nc_field_file[metadata_key][:] = stringtochar(output_char_array) + else: + nc_field_file.createVariable(metadata_key, type(output_value), ('dim_one',)) + nc_field_file[metadata_key][0] = output_value # Add coordinates if they are not already in the file for space_name in space_names: @@ -804,13 +809,13 @@ def create_nc_dump(self, filename, space_names): if nc_supports_parallel or comm.rank == 0: nc_field_file.createVariable(f'{coord_name}_{space_name}', float, f'coords_{space_name}') - if nc_supports_parallel: - start, stop = self.domain.coords.parallel_array_lims[space_name] - nc_field_file.variables[f'{coord_name}_{space_name}'][start:stop] = coord_field.dat.data_ro - else: - global_coord_fields = gather_field_data(coord_field, i, self.domain) - if comm.rank == 0: - nc_field_file.variables[f'{coord_name}_{space_name}'][...] = global_coord_field + if nc_supports_parallel: + start, stop = self.domain.coords.parallel_array_lims[space_name] + nc_field_file.variables[f'{coord_name}_{space_name}'][start:stop] = coord_field.dat.data_ro + else: + global_coord_field = gather_field_data(coord_field, i, self.domain) + if comm.rank == 0: + nc_field_file.variables[f'{coord_name}_{space_name}'][...] = global_coord_field # Create variable for storing the field values for field in self.to_dump: @@ -829,15 +834,21 @@ def create_nc_dump(self, filename, space_names): nc_field_file[field_name].createVariable('field_values', float, (f'coords_{space_name}', 'time')) if nc_supports_parallel or comm.rank == 0: nc_field_file.close() + print("X", flush=True) def write_nc_dump(self, t): comm = self.mesh.comm + print(comm.rank, "AA", flush=True) nc_field_file, nc_supports_parallel = make_nc_dataset(self.nc_filename, 'a', comm) - # Open file to add time - if nc_supports_parallel or comm.rank == 0: + if nc_field_file and 'time' in nc_field_file.variables.keys(): + # https://unidata.github.io/netcdf4-python/#parallel-io + if nc_supports_parallel: + nc_field_file['time'].set_collective(True) nc_field_file['time'][self.field_t_idx] = t + print(comm.rank, "A", flush=True) + # Loop through output field data here for i, field in enumerate(self.to_dump): field_name = field.name() @@ -853,18 +864,23 @@ def write_nc_dump(self, t): # Scalar elements # -------------------------------------------------------- # else: - nc_field_file[field_name]['field_values'].set_collective(True) - + print(comm.rank, "AB", flush=True) if nc_supports_parallel: + nc_field_file[field_name]['field_values'].set_collective(True) start, stop = self.domain.coords.parallel_array_lims[space_name] nc_field_file[field_name]['field_values'][start:stop, self.field_t_idx] = field.dat.data_ro else: + print("BA", flush=True) global_field_data = gather_field_data(field, i, self.domain) if comm.rank == 0: + print("BB", flush=True) nc_field_file[field_name]['field_values'][:, self.field_t_idx] = global_field_data + print("BC", flush=True) + print("C", flush=True) if nc_supports_parallel or comm.rank == 0: nc_field_file.close() + print("D", flush=True) self.field_t_idx += 1 @@ -931,10 +947,6 @@ def make_nc_dataset(filename, access, comm): try: nc_field_file = Dataset(filename, access, parallel=True) nc_supports_parallel = True - - # https://unidata.github.io/netcdf4-python/#parallel-io - if 'time' in nc_field_file.variables.keys(): - nc_field_file['time'].set_collective(True) except ValueError: # parallel netCDF not available, use the serial version instead if comm.size > 1: @@ -957,11 +969,12 @@ def make_nc_dataset(filename, access, comm): def gather_field_data(field, field_index, domain): """TODO.""" - if domain.comm.size == 1: + comm = domain.mesh.comm + + if comm.size == 1: return field.dat.data_ro - comm = domain.comm - space_name = field.function_space().name() + space_name = field.function_space().name if comm.rank == 0: # Set up array to store full data in @@ -973,11 +986,13 @@ def gather_field_data(field, field_index, domain): # Receive data from other processors for rank in range(1, comm.size): + print("receiving on ",comm.size*field_index + rank) incoming_data = comm.recv(source=rank, tag=comm.size*field_index + rank) start, stop = stop, stop + incoming_data.size global_data[start:stop] = incoming_data else: + print("sending on ",comm.size*field_index + comm.rank) comm.send(field.dat.data_ro, dest=0, tag=comm.size*field_index + comm.rank) global_data = None From 9aaee9adaf503d8533949ec29f9bce6755dc7f59 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 15:48:22 +0000 Subject: [PATCH 06/11] fixups --- gusto/core/coordinates.py | 7 ++----- gusto/core/io.py | 12 ------------ 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/gusto/core/coordinates.py b/gusto/core/coordinates.py index e27cb065e..b4e1b7135 100644 --- a/gusto/core/coordinates.py +++ b/gusto/core/coordinates.py @@ -111,12 +111,9 @@ def register_space(self, domain, space_name): self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i])) # Determine the offsets of the local piece of data into the global array - # NOTE: Could come from a section?? getOffsetRange? nlocal_dofs = len(self.chi_coords[space_name][0].dat.data_ro) - sendbuf = np.asarray([nlocal_dofs], dtype=IntType) - comm.scan(sendbuf) - stop = sendbuf[0] - start = stop - nlocal_dofs + start = comm.exscan(nlocal_dofs) or 0 + stop = start + nlocal_dofs self.parallel_array_lims[space_name] = (start, stop) def get_column_data(self, field, domain): diff --git a/gusto/core/io.py b/gusto/core/io.py index 80fd072c7..ea64865d8 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -834,11 +834,9 @@ def create_nc_dump(self, filename, space_names): nc_field_file[field_name].createVariable('field_values', float, (f'coords_{space_name}', 'time')) if nc_supports_parallel or comm.rank == 0: nc_field_file.close() - print("X", flush=True) def write_nc_dump(self, t): comm = self.mesh.comm - print(comm.rank, "AA", flush=True) nc_field_file, nc_supports_parallel = make_nc_dataset(self.nc_filename, 'a', comm) if nc_field_file and 'time' in nc_field_file.variables.keys(): @@ -847,8 +845,6 @@ def write_nc_dump(self, t): nc_field_file['time'].set_collective(True) nc_field_file['time'][self.field_t_idx] = t - print(comm.rank, "A", flush=True) - # Loop through output field data here for i, field in enumerate(self.to_dump): field_name = field.name() @@ -864,23 +860,17 @@ def write_nc_dump(self, t): # Scalar elements # -------------------------------------------------------- # else: - print(comm.rank, "AB", flush=True) if nc_supports_parallel: nc_field_file[field_name]['field_values'].set_collective(True) start, stop = self.domain.coords.parallel_array_lims[space_name] nc_field_file[field_name]['field_values'][start:stop, self.field_t_idx] = field.dat.data_ro else: - print("BA", flush=True) global_field_data = gather_field_data(field, i, self.domain) if comm.rank == 0: - print("BB", flush=True) nc_field_file[field_name]['field_values'][:, self.field_t_idx] = global_field_data - print("BC", flush=True) - print("C", flush=True) if nc_supports_parallel or comm.rank == 0: nc_field_file.close() - print("D", flush=True) self.field_t_idx += 1 @@ -986,13 +976,11 @@ def gather_field_data(field, field_index, domain): # Receive data from other processors for rank in range(1, comm.size): - print("receiving on ",comm.size*field_index + rank) incoming_data = comm.recv(source=rank, tag=comm.size*field_index + rank) start, stop = stop, stop + incoming_data.size global_data[start:stop] = incoming_data else: - print("sending on ",comm.size*field_index + comm.rank) comm.send(field.dat.data_ro, dest=0, tag=comm.size*field_index + comm.rank) global_data = None From cf9ce3948f328d1cf0346ed314e305745b0f4aaf Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 16:49:08 +0000 Subject: [PATCH 07/11] linting and docstrings --- gusto/core/coordinates.py | 1 - gusto/core/io.py | 22 +++++++++++++++++-- integration-tests/model/test_nc_outputting.py | 6 ++--- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/gusto/core/coordinates.py b/gusto/core/coordinates.py index b4e1b7135..a2eedd488 100644 --- a/gusto/core/coordinates.py +++ b/gusto/core/coordinates.py @@ -6,7 +6,6 @@ from gusto.core.coord_transforms import lonlatr_from_xyz, rotated_lonlatr_coords from gusto.core.logging import logger from firedrake import SpatialCoordinate, Function -from firedrake.utils import IntType import numpy as np import pandas as pd diff --git a/gusto/core/io.py b/gusto/core/io.py index ea64865d8..98ac10f5a 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -931,7 +931,11 @@ def make_nc_dataset(filename, access, comm): Returns: tuple: 2-tuple of :class:`netCDF4_netCDF4.Dataset` (or `None`) and `bool` - indicating whether the + indicating whether netCDF supports parallel usage. If parallel is not + supported then the dataset will be `None` on all but rank 0. + + A warning will be thrown if this function is called in parallel and a + non-parallel netCDF4 is used. """ try: @@ -958,7 +962,21 @@ def make_nc_dataset(filename, access, comm): def gather_field_data(field, field_index, domain): - """TODO.""" + """Gather global field data into a single array on rank 0. + + Args: + field (:class:`firedrake.Function`): The field to gather. + field_index (int): Index used to identify the field. + domain (:class:`Domain`): The domain. + + Returns: + :class:`numpy.ndarray` that is the concatenation of all DoFs on + all ranks. + + Note that this operation is *extremely inefficient* when run with large + amounts of parallelism. Avoid calling if at all possible. + + """ comm = domain.mesh.comm if comm.size == 1: diff --git a/integration-tests/model/test_nc_outputting.py b/integration-tests/model/test_nc_outputting.py index 0f8d7f112..3462fd4be 100644 --- a/integration-tests/model/test_nc_outputting.py +++ b/integration-tests/model/test_nc_outputting.py @@ -11,7 +11,7 @@ ForwardEuler, OutputParameters, XComponent, YComponent, ZComponent, MeridionalComponent, ZonalComponent, RadialComponent, DGUpwind) -from netCDF4 import Dataset +from netCDF4 import Dataset, chartostring import pytest @@ -139,7 +139,7 @@ def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): output_data = Dataset(f'{dirname}/field_output.nc', 'r') for metadata_key, metadata_value in mesh_details.items(): # Convert None or booleans to strings - if type(metadata_value) in [type(None), type(True)]: + if metadata_value is None or isinstance(metadata_value, bool): output_value = str(metadata_value) else: output_value = metadata_value @@ -148,4 +148,4 @@ def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): if type(output_value) == float: assert output_data[metadata_key][0] - output_value < 1e-14, error_message else: - assert output_data[metadata_key][0] == output_value, error_message + assert str(chartostring(output_data[metadata_key][0])) == output_value, error_message From 60833601f8cf00d876b50aaec5a0754751a33d06 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Tue, 17 Dec 2024 17:28:34 +0000 Subject: [PATCH 08/11] fixup --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 25ccc9f96..06f68b3cd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -54,7 +54,7 @@ jobs: - name: Gusto tests (old netcdf) run: | . /home/firedrake/firedrake/bin/activate - python -m pip uninstall netCDF4 + python -m pip uninstall -y netCDF4 python -m pip cache remove netCDF4 python -m pip install --only-binary netCDF4 netCDF4 firedrake-clean From 6de65ad78e909a646ac8c33ae9245c168c8b0d92 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Wed, 18 Dec 2024 10:19:12 +0000 Subject: [PATCH 09/11] Add netCDF4 info to website --- docs/about_gusto.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/about_gusto.md b/docs/about_gusto.md index bf1719fd2..3b301211c 100644 --- a/docs/about_gusto.md +++ b/docs/about_gusto.md @@ -20,6 +20,30 @@ cd firedrake/src/gusto make test ``` +#### Parallel output with netCDF + +The [`netCDF4`](https://pypi.org/project/netCDF4/) package installed by Gusto does not support parallel usage. +This means that, when Gusto is run in parallel, distributed data structures must first be gathered onto rank 0 before they can be output. +This is *extremely inefficient* at high levels of parallelism. + +To avoid this it is possible to build a parallel-aware version of `netCDF4`. +The steps to do this are as follows: + +1. Activate the Firedrake virtual environment. +2. Uninstall the existing `netCDF4` package: + ``` + $ pip uninstall netCDF4 + ``` +3. Set necessary environment variables (note that this assumes that PETSc was built by Firedrake): + ``` + $ export HDF5_DIR=$VIRTUAL_ENV/src/petsc/default + $ export NETCDF4_DIR=$HDF5_DIR + ``` +4. Install the parallel version of `netCDF4`: + ``` + $ pip install --no-build-isolation --no-binary netCDF4 netCDF4 + ``` + ### Getting Started Once you have a working installation, the best way to get started with Gusto is to play with some of the examples in the `gusto/examples` directory. From 5e885cdb7e8e02eb7bebe44ef40573344f576e98 Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Wed, 18 Dec 2024 10:20:45 +0000 Subject: [PATCH 10/11] Apply suggestions from code review --- gusto/core/io.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/gusto/core/io.py b/gusto/core/io.py index 98ac10f5a..3b3772312 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -766,7 +766,7 @@ def create_nc_dump(self, filename, space_names): comm = self.mesh.comm nc_field_file, nc_supports_parallel = make_nc_dataset(filename, 'w', comm) - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.createDimension('time', None) nc_field_file.createVariable('time', float, ('time',)) @@ -781,10 +781,10 @@ def create_nc_dump(self, filename, space_names): else: output_value = metadata_value - # TODO: Add comment explaining things + # At present parallel netCDF crashes when saving strings. To get around this + # we instead save string metadata as char arrays of fixed length. if isinstance(output_value, str): nc_field_file.createVariable(metadata_key, 'S1', ('dim_one', 'dim_string')) - nc_field_file[metadata_key].set_collective(True) output_char_array = np.array([output_value], dtype='S256') nc_field_file[metadata_key][:] = stringtochar(output_char_array) else: @@ -802,11 +802,11 @@ def create_nc_dump(self, filename, space_names): coord_fields = self.domain.coords.chi_coords[space_name] ndofs = coord_fields[0].function_space().dim() - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.createDimension(f'coords_{space_name}', ndofs) for i, (coord_name, coord_field) in enumerate(zip(self.domain.coords.coords_name, coord_fields)): - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.createVariable(f'{coord_name}_{space_name}', float, f'coords_{space_name}') if nc_supports_parallel: @@ -829,10 +829,10 @@ def create_nc_dump(self, filename, space_names): + 'not yet implemented, so unable to output ' + f'{field_name} field') else: - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.createGroup(field_name) nc_field_file[field_name].createVariable('field_values', float, (f'coords_{space_name}', 'time')) - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.close() def write_nc_dump(self, t): @@ -840,7 +840,8 @@ def write_nc_dump(self, t): nc_field_file, nc_supports_parallel = make_nc_dataset(self.nc_filename, 'a', comm) if nc_field_file and 'time' in nc_field_file.variables.keys(): - # https://unidata.github.io/netcdf4-python/#parallel-io + # Unbounded variables need to be accessed collectively + # (https://unidata.github.io/netcdf4-python/#parallel-io) if nc_supports_parallel: nc_field_file['time'].set_collective(True) nc_field_file['time'][self.field_t_idx] = t @@ -869,7 +870,7 @@ def write_nc_dump(self, t): if comm.rank == 0: nc_field_file[field_name]['field_values'][:, self.field_t_idx] = global_field_data - if nc_supports_parallel or comm.rank == 0: + if nc_field_file: nc_field_file.close() self.field_t_idx += 1 @@ -935,7 +936,7 @@ def make_nc_dataset(filename, access, comm): supported then the dataset will be `None` on all but rank 0. A warning will be thrown if this function is called in parallel and a - non-parallel netCDF4 is used. + non-parallel netCDF4 is installed. """ try: From ba92b94fc38ee22069c3ff8b3f87110818c00a0d Mon Sep 17 00:00:00 2001 From: Connor Ward Date: Wed, 18 Dec 2024 14:59:47 +0000 Subject: [PATCH 11/11] Parallel netCDF testing --- .github/workflows/build.yml | 6 ++-- integration-tests/model/test_nc_outputting.py | 35 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 06f68b3cd..468809a33 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -33,7 +33,7 @@ jobs: python -m pip install -e . python -m pip install \ pytest-timeout pytest-xdist - - name: Gusto tests (new netcdf) + - name: Gusto tests run: | . /home/firedrake/firedrake/bin/activate firedrake-clean @@ -51,7 +51,7 @@ jobs: -o faulthandler_timeout=3660 \ -v unit-tests integration-tests examples timeout-minutes: 120 - - name: Gusto tests (old netcdf) + - name: Test serial netCDF run: | . /home/firedrake/firedrake/bin/activate python -m pip uninstall -y netCDF4 @@ -66,7 +66,7 @@ jobs: --timeout=3600 \ --timeout-method=thread \ -o faulthandler_timeout=3660 \ - -v unit-tests integration-tests examples + -v integration-tests/models/test_nc_outputting.py timeout-minutes: 120 - name: Prepare logs if: always() diff --git a/integration-tests/model/test_nc_outputting.py b/integration-tests/model/test_nc_outputting.py index 3462fd4be..7a03396c2 100644 --- a/integration-tests/model/test_nc_outputting.py +++ b/integration-tests/model/test_nc_outputting.py @@ -11,8 +11,17 @@ ForwardEuler, OutputParameters, XComponent, YComponent, ZComponent, MeridionalComponent, ZonalComponent, RadialComponent, DGUpwind) +from mpi4py import MPI from netCDF4 import Dataset, chartostring import pytest +from pytest_mpi import parallel_assert + + +def make_dirname(test_name): + if MPI.COMM_WORLD.size > 1: + return f'pytest_{test_name}_parallel' + else: + return f'pytest_{test_name}' @pytest.fixture @@ -53,17 +62,19 @@ def domain_and_mesh_details(geometry): return (domain, mesh_details) -# TODO: make parallel configurations of this test +@pytest.mark.parallel([1, 2]) @pytest.mark.parametrize("geometry", ["interval", "vertical_slice", "plane", "extruded_plane", "spherical_shell", "extruded_spherical_shell"]) -def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): +def test_nc_outputting(geometry, domain_and_mesh_details): # ------------------------------------------------------------------------ # # Make model objects # ------------------------------------------------------------------------ # - dirname = str(tmpdir) + # Make sure all ranks use the same file + dirname = make_dirname("nc_outputting") + domain, mesh_details = domain_and_mesh_details V = domain.spaces('DG') if geometry == "interval": @@ -136,7 +147,15 @@ def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): # ------------------------------------------------------------------------ # # Check that metadata is correct - output_data = Dataset(f'{dirname}/field_output.nc', 'r') + try: + output_data = Dataset(f'results/{dirname}/field_output.nc', 'r', parallel=True) + except ValueError: + # serial netCDF4, do everything on rank 0 + if MPI.COMM_WORLD.rank == 0: + output_data = Dataset(f'results/{dirname}/field_output.nc', 'r', parallel=False) + else: + output_data = None + for metadata_key, metadata_value in mesh_details.items(): # Convert None or booleans to strings if metadata_value is None or isinstance(metadata_value, bool): @@ -146,6 +165,10 @@ def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): error_message = f'Metadata {metadata_key} for geometry {geometry} is incorrect' if type(output_value) == float: - assert output_data[metadata_key][0] - output_value < 1e-14, error_message + def assertion(): + return output_data[metadata_key][0] - output_value < 1e-14 else: - assert str(chartostring(output_data[metadata_key][0])) == output_value, error_message + def assertion(): + return str(chartostring(output_data[metadata_key][0])) == output_value + + parallel_assert(assertion, participating=output_data is not None, msg=error_message)