Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel netcdf #596

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ jobs:
firedrake-clean
export GUSTO_PARALLEL_LOG=FILE
export PYOP2_CFLAGS=-O0
python -m pip uninstall -y netCDF4
export HDF5_DIR=$PETSC_DIR/packages
export NETCDF4_DIR=$PETSC_DIR/packages
python -m pip install --no-binary netCDF4 --no-build-isolation netCDF4
python -m pytest \
-n 12 --dist worksteal \
--durations=100 \
Expand All @@ -47,6 +51,23 @@ jobs:
-o faulthandler_timeout=3660 \
-v unit-tests integration-tests examples
timeout-minutes: 120
- name: Test serial netCDF
run: |
. /home/firedrake/firedrake/bin/activate
python -m pip uninstall -y netCDF4
python -m pip cache remove netCDF4
python -m pip install --only-binary netCDF4 netCDF4
firedrake-clean
export GUSTO_PARALLEL_LOG=FILE
export PYOP2_CFLAGS=-O0
python -m pytest \
-n 12 --dist worksteal \
--durations=100 \
--timeout=3600 \
--timeout-method=thread \
-o faulthandler_timeout=3660 \
-v integration-tests/models/test_nc_outputting.py
timeout-minutes: 120
- name: Prepare logs
if: always()
run: |
Expand Down
24 changes: 24 additions & 0 deletions docs/about_gusto.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ cd firedrake/src/gusto
make test
```

#### Parallel output with netCDF

The [`netCDF4`](https://pypi.org/project/netCDF4/) package installed by Gusto does not support parallel usage.
This means that, when Gusto is run in parallel, distributed data structures must first be gathered onto rank 0 before they can be output.
This is *extremely inefficient* at high levels of parallelism.

To avoid this it is possible to build a parallel-aware version of `netCDF4`.
The steps to do this are as follows:

1. Activate the Firedrake virtual environment.
2. Uninstall the existing `netCDF4` package:
```
$ pip uninstall netCDF4
```
3. Set necessary environment variables (note that this assumes that PETSc was built by Firedrake):
```
$ export HDF5_DIR=$VIRTUAL_ENV/src/petsc/default
$ export NETCDF4_DIR=$HDF5_DIR
```
4. Install the parallel version of `netCDF4`:
```
$ pip install --no-build-isolation --no-binary netCDF4 netCDF4
```

### Getting Started

Once you have a working installation, the best way to get started with Gusto is to play with some of the examples in the `gusto/examples` directory.
Expand Down
57 changes: 8 additions & 49 deletions gusto/core/coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def __init__(self, mesh, on_sphere=False, rotated_pole=None, radius=None):
# -------------------------------------------------------------------- #

self.chi_coords = {} # Dict of natural coords by space
self.global_chi_coords = {} # Dict of whole coords stored on first proc
self.parallel_array_lims = {} # Dict of array lengths for each proc

def register_space(self, domain, space_name):
Expand All @@ -87,8 +86,6 @@ def register_space(self, domain, space_name):
"""

comm = self.mesh.comm
comm_size = comm.Get_size()
my_rank = comm.Get_rank()
topological_dimension = self.mesh.topological_dimension()

if space_name in self.chi_coords.keys():
Expand All @@ -112,49 +109,11 @@ def register_space(self, domain, space_name):
for i in range(topological_dimension):
self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i]))

# -------------------------------------------------------------------- #
# Code for settings up coordinates for parallel-serial IO
# -------------------------------------------------------------------- #

len_coords = space.dim()
my_num_dofs = len(self.chi_coords[space_name][0].dat.data_ro[:])

if my_rank != 0:
# Do not make full coordinate array
self.global_chi_coords[space_name] = None
self.parallel_array_lims[space_name] = None
# Find number of DoFs on this processor
comm.send(my_num_dofs, dest=0)
else:
# First processor has one large array of the global chi data
self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords))
# Store the limits inside this array telling us how data is partitioned
self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int)
# First processor has the first bit of data
self.parallel_array_lims[space_name][my_rank][0] = 0
self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1
# Receive number of DoFs on other processors
for procid in range(1, comm_size):
other_num_dofs = comm.recv(source=procid)
self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1
self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1

# Now move coordinates to first processor
for i in range(topological_dimension):
if my_rank != 0:
# Send information to rank 0
my_tag = comm_size*i + my_rank
comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag)
else:
# Rank 0 -- the receiver
(low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:]
self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:]
# Receive coords from each processor and put them into array
for procid in range(1, comm_size):
my_tag = comm_size*i + procid
new_coords = comm.recv(source=procid, tag=my_tag)
(low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:]
self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords
# Determine the offsets of the local piece of data into the global array
nlocal_dofs = len(self.chi_coords[space_name][0].dat.data_ro)
start = comm.exscan(nlocal_dofs) or 0
stop = start + nlocal_dofs
self.parallel_array_lims[space_name] = (start, stop)

def get_column_data(self, field, domain):
"""
Expand All @@ -177,9 +136,9 @@ def get_column_data(self, field, domain):
coords = self.chi_coords[space_name]

data_is_3d = (len(coords) == 3)
coords_X = coords[0].dat.data
coords_Y = coords[1].dat.data if data_is_3d else None
coords_Z = coords[-1].dat.data
coords_X = coords[0].dat.data_ro
coords_Y = coords[1].dat.data_ro if data_is_3d else None
coords_Z = coords[-1].dat.data_ro

# ------------------------------------------------------------------------ #
# Round data to ensure sorting in dataframe is OK
Expand Down
40 changes: 21 additions & 19 deletions gusto/core/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
inner, grad, VectorFunctionSpace, Function, FunctionSpace,
perp)
import numpy as np
from mpi4py import MPI


class Domain(object):
Expand Down Expand Up @@ -218,25 +219,26 @@ def construct_domain_metadata(mesh, coords, on_sphere):
else:
raise ValueError('Unable to determine domain type')

# Determine domain properties
chi = coords.chi_coords['DG1_equispaced']
comm = mesh.comm
my_rank = comm.Get_rank()

# Properties of domain will be determined from full coords, so need
# doing on the first processor then broadcasting to others

if my_rank == 0:
chi = coords.global_chi_coords['DG1_equispaced']
if not on_sphere:
metadata['domain_extent_x'] = np.max(chi[0, :]) - np.min(chi[0, :])
if metadata['domain_type'] in ['plane', 'extruded_plane']:
metadata['domain_extent_y'] = np.max(chi[1, :]) - np.min(chi[1, :])
if mesh.extruded:
metadata['domain_extent_z'] = np.max(chi[-1, :]) - np.min(chi[-1, :])

else:
metadata = {}

# Send information to other processors
metadata = comm.bcast(metadata, root=0)
if not on_sphere:
_min_x = np.min(chi[0].dat.data_ro)
_max_x = np.max(chi[0].dat.data_ro)
min_x = comm.allreduce(_min_x, MPI.MIN)
max_x = comm.allreduce(_max_x, MPI.MAX)
metadata['domain_extent_x'] = max_x - min_x
if metadata['domain_type'] in ['plane', 'extruded_plane']:
_min_y = np.min(chi[1].dat.data_ro)
_max_y = np.max(chi[1].dat.data_ro)
min_y = comm.allreduce(_min_y, MPI.MIN)
max_y = comm.allreduce(_max_y, MPI.MAX)
metadata['domain_extent_y'] = max_y - min_y
if mesh.extruded:
_min_z = np.min(chi[-1].dat.data_ro)
_max_z = np.max(chi[-1].dat.data_ro)
min_z = comm.allreduce(_min_z, MPI.MIN)
max_z = comm.allreduce(_max_z, MPI.MAX)
metadata['domain_extent_z'] = max_z - min_z
connorjward marked this conversation as resolved.
Show resolved Hide resolved

return metadata
Loading
Loading