diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a94805e68..468809a33 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -39,6 +39,10 @@ jobs: firedrake-clean export GUSTO_PARALLEL_LOG=FILE export PYOP2_CFLAGS=-O0 + python -m pip uninstall -y netCDF4 + export HDF5_DIR=$PETSC_DIR/packages + export NETCDF4_DIR=$PETSC_DIR/packages + python -m pip install --no-binary netCDF4 --no-build-isolation netCDF4 python -m pytest \ -n 12 --dist worksteal \ --durations=100 \ @@ -47,6 +51,23 @@ jobs: -o faulthandler_timeout=3660 \ -v unit-tests integration-tests examples timeout-minutes: 120 + - name: Test serial netCDF + run: | + . /home/firedrake/firedrake/bin/activate + python -m pip uninstall -y netCDF4 + python -m pip cache remove netCDF4 + python -m pip install --only-binary netCDF4 netCDF4 + firedrake-clean + export GUSTO_PARALLEL_LOG=FILE + export PYOP2_CFLAGS=-O0 + python -m pytest \ + -n 12 --dist worksteal \ + --durations=100 \ + --timeout=3600 \ + --timeout-method=thread \ + -o faulthandler_timeout=3660 \ + -v integration-tests/models/test_nc_outputting.py + timeout-minutes: 120 - name: Prepare logs if: always() run: | diff --git a/docs/about_gusto.md b/docs/about_gusto.md index bf1719fd2..3b301211c 100644 --- a/docs/about_gusto.md +++ b/docs/about_gusto.md @@ -20,6 +20,30 @@ cd firedrake/src/gusto make test ``` +#### Parallel output with netCDF + +The [`netCDF4`](https://pypi.org/project/netCDF4/) package installed by Gusto does not support parallel usage. +This means that, when Gusto is run in parallel, distributed data structures must first be gathered onto rank 0 before they can be output. +This is *extremely inefficient* at high levels of parallelism. + +To avoid this it is possible to build a parallel-aware version of `netCDF4`. +The steps to do this are as follows: + +1. Activate the Firedrake virtual environment. +2. Uninstall the existing `netCDF4` package: + ``` + $ pip uninstall netCDF4 + ``` +3. Set necessary environment variables (note that this assumes that PETSc was built by Firedrake): + ``` + $ export HDF5_DIR=$VIRTUAL_ENV/src/petsc/default + $ export NETCDF4_DIR=$HDF5_DIR + ``` +4. Install the parallel version of `netCDF4`: + ``` + $ pip install --no-build-isolation --no-binary netCDF4 netCDF4 + ``` + ### Getting Started Once you have a working installation, the best way to get started with Gusto is to play with some of the examples in the `gusto/examples` directory. diff --git a/gusto/core/coordinates.py b/gusto/core/coordinates.py index 28862e176..a2eedd488 100644 --- a/gusto/core/coordinates.py +++ b/gusto/core/coordinates.py @@ -67,7 +67,6 @@ def __init__(self, mesh, on_sphere=False, rotated_pole=None, radius=None): # -------------------------------------------------------------------- # self.chi_coords = {} # Dict of natural coords by space - self.global_chi_coords = {} # Dict of whole coords stored on first proc self.parallel_array_lims = {} # Dict of array lengths for each proc def register_space(self, domain, space_name): @@ -87,8 +86,6 @@ def register_space(self, domain, space_name): """ comm = self.mesh.comm - comm_size = comm.Get_size() - my_rank = comm.Get_rank() topological_dimension = self.mesh.topological_dimension() if space_name in self.chi_coords.keys(): @@ -112,49 +109,11 @@ def register_space(self, domain, space_name): for i in range(topological_dimension): self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i])) - # -------------------------------------------------------------------- # - # Code for settings up coordinates for parallel-serial IO - # -------------------------------------------------------------------- # - - len_coords = space.dim() - my_num_dofs = len(self.chi_coords[space_name][0].dat.data_ro[:]) - - if my_rank != 0: - # Do not make full coordinate array - self.global_chi_coords[space_name] = None - self.parallel_array_lims[space_name] = None - # Find number of DoFs on this processor - comm.send(my_num_dofs, dest=0) - else: - # First processor has one large array of the global chi data - self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords)) - # Store the limits inside this array telling us how data is partitioned - self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int) - # First processor has the first bit of data - self.parallel_array_lims[space_name][my_rank][0] = 0 - self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1 - # Receive number of DoFs on other processors - for procid in range(1, comm_size): - other_num_dofs = comm.recv(source=procid) - self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1 - self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1 - - # Now move coordinates to first processor - for i in range(topological_dimension): - if my_rank != 0: - # Send information to rank 0 - my_tag = comm_size*i + my_rank - comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag) - else: - # Rank 0 -- the receiver - (low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:] - self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:] - # Receive coords from each processor and put them into array - for procid in range(1, comm_size): - my_tag = comm_size*i + procid - new_coords = comm.recv(source=procid, tag=my_tag) - (low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:] - self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords + # Determine the offsets of the local piece of data into the global array + nlocal_dofs = len(self.chi_coords[space_name][0].dat.data_ro) + start = comm.exscan(nlocal_dofs) or 0 + stop = start + nlocal_dofs + self.parallel_array_lims[space_name] = (start, stop) def get_column_data(self, field, domain): """ @@ -177,9 +136,9 @@ def get_column_data(self, field, domain): coords = self.chi_coords[space_name] data_is_3d = (len(coords) == 3) - coords_X = coords[0].dat.data - coords_Y = coords[1].dat.data if data_is_3d else None - coords_Z = coords[-1].dat.data + coords_X = coords[0].dat.data_ro + coords_Y = coords[1].dat.data_ro if data_is_3d else None + coords_Z = coords[-1].dat.data_ro # ------------------------------------------------------------------------ # # Round data to ensure sorting in dataframe is OK diff --git a/gusto/core/domain.py b/gusto/core/domain.py index 0cf23fb7f..e73496ab0 100644 --- a/gusto/core/domain.py +++ b/gusto/core/domain.py @@ -10,6 +10,7 @@ inner, grad, VectorFunctionSpace, Function, FunctionSpace, perp) import numpy as np +from mpi4py import MPI class Domain(object): @@ -218,25 +219,26 @@ def construct_domain_metadata(mesh, coords, on_sphere): else: raise ValueError('Unable to determine domain type') + # Determine domain properties + chi = coords.chi_coords['DG1_equispaced'] comm = mesh.comm - my_rank = comm.Get_rank() - - # Properties of domain will be determined from full coords, so need - # doing on the first processor then broadcasting to others - - if my_rank == 0: - chi = coords.global_chi_coords['DG1_equispaced'] - if not on_sphere: - metadata['domain_extent_x'] = np.max(chi[0, :]) - np.min(chi[0, :]) - if metadata['domain_type'] in ['plane', 'extruded_plane']: - metadata['domain_extent_y'] = np.max(chi[1, :]) - np.min(chi[1, :]) - if mesh.extruded: - metadata['domain_extent_z'] = np.max(chi[-1, :]) - np.min(chi[-1, :]) - - else: - metadata = {} - - # Send information to other processors - metadata = comm.bcast(metadata, root=0) + if not on_sphere: + _min_x = np.min(chi[0].dat.data_ro) + _max_x = np.max(chi[0].dat.data_ro) + min_x = comm.allreduce(_min_x, MPI.MIN) + max_x = comm.allreduce(_max_x, MPI.MAX) + metadata['domain_extent_x'] = max_x - min_x + if metadata['domain_type'] in ['plane', 'extruded_plane']: + _min_y = np.min(chi[1].dat.data_ro) + _max_y = np.max(chi[1].dat.data_ro) + min_y = comm.allreduce(_min_y, MPI.MIN) + max_y = comm.allreduce(_max_y, MPI.MAX) + metadata['domain_extent_y'] = max_y - min_y + if mesh.extruded: + _min_z = np.min(chi[-1].dat.data_ro) + _max_z = np.max(chi[-1].dat.data_ro) + min_z = comm.allreduce(_min_z, MPI.MIN) + max_z = comm.allreduce(_max_z, MPI.MAX) + metadata['domain_extent_z'] = max_z - min_z return metadata diff --git a/gusto/core/io.py b/gusto/core/io.py index 6f8f74712..3b3772312 100644 --- a/gusto/core/io.py +++ b/gusto/core/io.py @@ -2,7 +2,7 @@ from os import path, makedirs import itertools -from netCDF4 import Dataset +from netCDF4 import Dataset, stringtochar import sys import time from gusto.diagnostics import Diagnostics, CourantNumber @@ -393,7 +393,7 @@ def setup_dump(self, state_fields, t, pick_up=False): running_tests = '--running-tests' in sys.argv or "pytest" in self.output.dirname # Raising exceptions needs to be done in parallel - if self.mesh.comm.Get_rank() == 0: + if self.mesh.comm.rank == 0: # Create results directory if it doesn't already exist if not path.exists(self.dumpdir): try: @@ -444,7 +444,7 @@ def setup_dump(self, state_fields, t, pick_up=False): if pick_up: # Pick up t idx - if self.mesh.comm.Get_rank() == 0: + if self.mesh.comm.rank == 0: nc_field_file = Dataset(self.nc_filename, 'r') self.field_t_idx = len(nc_field_file['time'][:]) nc_field_file.close() @@ -761,74 +761,92 @@ def dump(self, state_fields, time_data): self.dumpfile_ll.write(*self.to_dump_latlon) def create_nc_dump(self, filename, space_names): - my_rank = self.mesh.comm.Get_rank() self.field_t_idx = 0 - if my_rank == 0: - nc_field_file = Dataset(filename, 'w') + comm = self.mesh.comm + nc_field_file, nc_supports_parallel = make_nc_dataset(filename, 'w', comm) + + if nc_field_file: nc_field_file.createDimension('time', None) nc_field_file.createVariable('time', float, ('time',)) # Add mesh metadata + nc_field_file.createDimension("dim_one", 1) + nc_field_file.createDimension("dim_string", 256) for metadata_key, metadata_value in self.domain.metadata.items(): # If the metadata is None or a Boolean, try converting to string # This is because netCDF can't take these types as options - if type(metadata_value) in [type(None), type(True)]: + if metadata_value is None or isinstance(metadata_value, bool): output_value = str(metadata_value) else: output_value = metadata_value - # Get the type from the metadata itself - nc_field_file.createVariable(metadata_key, type(output_value), []) - nc_field_file.variables[metadata_key][0] = output_value - - # Add coordinates if they are not already in the file - for space_name in space_names: - if space_name not in self.domain.coords.chi_coords.keys(): - # Space not registered - # TODO: we should fail here, but currently there are some spaces - # that we can't output for so instead just skip outputting - pass + # At present parallel netCDF crashes when saving strings. To get around this + # we instead save string metadata as char arrays of fixed length. + if isinstance(output_value, str): + nc_field_file.createVariable(metadata_key, 'S1', ('dim_one', 'dim_string')) + output_char_array = np.array([output_value], dtype='S256') + nc_field_file[metadata_key][:] = stringtochar(output_char_array) else: - coord_fields = self.domain.coords.global_chi_coords[space_name] - num_points = len(self.domain.coords.global_chi_coords[space_name][0]) - - nc_field_file.createDimension('coords_'+space_name, num_points) - - for (coord_name, coord_field) in zip(self.domain.coords.coords_name, coord_fields): - nc_field_file.createVariable(coord_name+'_'+space_name, float, 'coords_'+space_name) - nc_field_file.variables[coord_name+'_'+space_name][:] = coord_field[:] - - # Create variable for storing the field values - for field in self.to_dump: - field_name = field.name() - space_name = field.function_space().name - if space_name not in self.domain.coords.chi_coords.keys(): - # Space not registered - # TODO: we should fail here, but currently there are some spaces - # that we can't output for so instead just skip outputting - logger.warning(f'netCDF outputting for space {space_name} ' - + 'not yet implemented, so unable to output ' - + f'{field_name} field') - else: - nc_field_file.createGroup(field_name) - nc_field_file[field_name].createVariable('field_values', float, ('coords_'+space_name, 'time')) + nc_field_file.createVariable(metadata_key, type(output_value), ('dim_one',)) + nc_field_file[metadata_key][0] = output_value + + # Add coordinates if they are not already in the file + for space_name in space_names: + if space_name not in self.domain.coords.chi_coords.keys(): + # Space not registered + # TODO: we should fail here, but currently there are some spaces + # that we can't output for so instead just skip outputting + pass + else: + coord_fields = self.domain.coords.chi_coords[space_name] + ndofs = coord_fields[0].function_space().dim() + + if nc_field_file: + nc_field_file.createDimension(f'coords_{space_name}', ndofs) + for i, (coord_name, coord_field) in enumerate(zip(self.domain.coords.coords_name, coord_fields)): + if nc_field_file: + nc_field_file.createVariable(f'{coord_name}_{space_name}', float, f'coords_{space_name}') + + if nc_supports_parallel: + start, stop = self.domain.coords.parallel_array_lims[space_name] + nc_field_file.variables[f'{coord_name}_{space_name}'][start:stop] = coord_field.dat.data_ro + else: + global_coord_field = gather_field_data(coord_field, i, self.domain) + if comm.rank == 0: + nc_field_file.variables[f'{coord_name}_{space_name}'][...] = global_coord_field + + # Create variable for storing the field values + for field in self.to_dump: + field_name = field.name() + space_name = field.function_space().name + if space_name not in self.domain.coords.chi_coords.keys(): + # Space not registered + # TODO: we should fail here, but currently there are some spaces + # that we can't output for so instead just skip outputting + logger.warning(f'netCDF outputting for space {space_name} ' + + 'not yet implemented, so unable to output ' + + f'{field_name} field') + else: + if nc_field_file: + nc_field_file.createGroup(field_name) + nc_field_file[field_name].createVariable('field_values', float, (f'coords_{space_name}', 'time')) + if nc_field_file: nc_field_file.close() def write_nc_dump(self, t): - comm = self.mesh.comm - my_rank = comm.Get_rank() - comm_size = comm.Get_size() + nc_field_file, nc_supports_parallel = make_nc_dataset(self.nc_filename, 'a', comm) - # Open file to add time - if my_rank == 0: - nc_field_file = Dataset(self.nc_filename, 'a') + if nc_field_file and 'time' in nc_field_file.variables.keys(): + # Unbounded variables need to be accessed collectively + # (https://unidata.github.io/netcdf4-python/#parallel-io) + if nc_supports_parallel: + nc_field_file['time'].set_collective(True) nc_field_file['time'][self.field_t_idx] = t # Loop through output field data here - num_fields = len(self.to_dump) for i, field in enumerate(self.to_dump): field_name = field.name() space_name = field.function_space().name @@ -843,29 +861,16 @@ def write_nc_dump(self, t): # Scalar elements # -------------------------------------------------------- # else: - j = 0 - # For most processors send data to first processor - if my_rank != 0: - # Make a tag to uniquely identify this call - my_tag = comm_size*(num_fields*j + i) + my_rank - comm.send(field.dat.data_ro[:], dest=0, tag=my_tag) + if nc_supports_parallel: + nc_field_file[field_name]['field_values'].set_collective(True) + start, stop = self.domain.coords.parallel_array_lims[space_name] + nc_field_file[field_name]['field_values'][start:stop, self.field_t_idx] = field.dat.data_ro else: - # Set up array to store full data in - total_data_size = self.domain.coords.parallel_array_lims[space_name][comm_size-1][1]+1 - single_proc_data = np.zeros(total_data_size) - # Get data for this processor first - (low_lim, up_lim) = self.domain.coords.parallel_array_lims[space_name][my_rank][:] - single_proc_data[low_lim:up_lim+1] = field.dat.data_ro[:] - # Receive data from other processors - for procid in range(1, comm_size): - my_tag = comm_size*(num_fields*j + i) + procid - incoming_data = comm.recv(source=procid, tag=my_tag) - (low_lim, up_lim) = self.domain.coords.parallel_array_lims[space_name][procid][:] - single_proc_data[low_lim:up_lim+1] = incoming_data[:] - # Store whole field data - nc_field_file[field_name].variables['field_values'][:, self.field_t_idx] = single_proc_data[:] - - if my_rank == 0: + global_field_data = gather_field_data(field, i, self.domain) + if comm.rank == 0: + nc_field_file[field_name]['field_values'][:, self.field_t_idx] = global_field_data + + if nc_field_file: nc_field_file.close() self.field_t_idx += 1 @@ -915,3 +920,87 @@ def topo_sort(field_deps): if f not in schedule) raise RuntimeError("Field dependencies have a cycle:\n\n%s" % cycle) return list(map(name2field.__getitem__, schedule)) + + +def make_nc_dataset(filename, access, comm): + """Create a netCDF data set, possibly in parallel. + + Args: + filename (str): The filename. + access (str): The access descriptor - ``r``, ``w`` or ``a``. + comm: The communicator. + + Returns: + tuple: 2-tuple of :class:`netCDF4_netCDF4.Dataset` (or `None`) and `bool` + indicating whether netCDF supports parallel usage. If parallel is not + supported then the dataset will be `None` on all but rank 0. + + A warning will be thrown if this function is called in parallel and a + non-parallel netCDF4 is installed. + + """ + try: + nc_field_file = Dataset(filename, access, parallel=True) + nc_supports_parallel = True + except ValueError: + # parallel netCDF not available, use the serial version instead + if comm.size > 1: + import warnings + warnings.warn( + "Serial netCDF in use even though you are running in parallel. This " + "is especially inefficient at high core counts. Please refer to the " + "documentation for information about installing a parallel version " + "of netCDF.", + UserWarning, + ) + + if comm.rank == 0: + nc_field_file = Dataset(filename, access, parallel=False) + else: + nc_field_file = None + nc_supports_parallel = False + return nc_field_file, nc_supports_parallel + + +def gather_field_data(field, field_index, domain): + """Gather global field data into a single array on rank 0. + + Args: + field (:class:`firedrake.Function`): The field to gather. + field_index (int): Index used to identify the field. + domain (:class:`Domain`): The domain. + + Returns: + :class:`numpy.ndarray` that is the concatenation of all DoFs on + all ranks. + + Note that this operation is *extremely inefficient* when run with large + amounts of parallelism. Avoid calling if at all possible. + + """ + comm = domain.mesh.comm + + if comm.size == 1: + return field.dat.data_ro + + space_name = field.function_space().name + + if comm.rank == 0: + # Set up array to store full data in + global_data = np.zeros(field.function_space().dim(), dtype=field.dat.dtype) + + # Store data for this processor first + (start, stop) = domain.coords.parallel_array_lims[space_name] + global_data[start:stop] = field.dat.data_ro[...] + + # Receive data from other processors + for rank in range(1, comm.size): + incoming_data = comm.recv(source=rank, tag=comm.size*field_index + rank) + start, stop = stop, stop + incoming_data.size + global_data[start:stop] = incoming_data + + else: + comm.send(field.dat.data_ro, dest=0, tag=comm.size*field_index + comm.rank) + global_data = None + + return global_data diff --git a/integration-tests/model/test_nc_outputting.py b/integration-tests/model/test_nc_outputting.py index 0f8d7f112..7a03396c2 100644 --- a/integration-tests/model/test_nc_outputting.py +++ b/integration-tests/model/test_nc_outputting.py @@ -11,8 +11,17 @@ ForwardEuler, OutputParameters, XComponent, YComponent, ZComponent, MeridionalComponent, ZonalComponent, RadialComponent, DGUpwind) -from netCDF4 import Dataset +from mpi4py import MPI +from netCDF4 import Dataset, chartostring import pytest +from pytest_mpi import parallel_assert + + +def make_dirname(test_name): + if MPI.COMM_WORLD.size > 1: + return f'pytest_{test_name}_parallel' + else: + return f'pytest_{test_name}' @pytest.fixture @@ -53,17 +62,19 @@ def domain_and_mesh_details(geometry): return (domain, mesh_details) -# TODO: make parallel configurations of this test +@pytest.mark.parallel([1, 2]) @pytest.mark.parametrize("geometry", ["interval", "vertical_slice", "plane", "extruded_plane", "spherical_shell", "extruded_spherical_shell"]) -def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): +def test_nc_outputting(geometry, domain_and_mesh_details): # ------------------------------------------------------------------------ # # Make model objects # ------------------------------------------------------------------------ # - dirname = str(tmpdir) + # Make sure all ranks use the same file + dirname = make_dirname("nc_outputting") + domain, mesh_details = domain_and_mesh_details V = domain.spaces('DG') if geometry == "interval": @@ -136,16 +147,28 @@ def test_nc_outputting(tmpdir, geometry, domain_and_mesh_details): # ------------------------------------------------------------------------ # # Check that metadata is correct - output_data = Dataset(f'{dirname}/field_output.nc', 'r') + try: + output_data = Dataset(f'results/{dirname}/field_output.nc', 'r', parallel=True) + except ValueError: + # serial netCDF4, do everything on rank 0 + if MPI.COMM_WORLD.rank == 0: + output_data = Dataset(f'results/{dirname}/field_output.nc', 'r', parallel=False) + else: + output_data = None + for metadata_key, metadata_value in mesh_details.items(): # Convert None or booleans to strings - if type(metadata_value) in [type(None), type(True)]: + if metadata_value is None or isinstance(metadata_value, bool): output_value = str(metadata_value) else: output_value = metadata_value error_message = f'Metadata {metadata_key} for geometry {geometry} is incorrect' if type(output_value) == float: - assert output_data[metadata_key][0] - output_value < 1e-14, error_message + def assertion(): + return output_data[metadata_key][0] - output_value < 1e-14 else: - assert output_data[metadata_key][0] == output_value, error_message + def assertion(): + return str(chartostring(output_data[metadata_key][0])) == output_value + + parallel_assert(assertion, participating=output_data is not None, msg=error_message)