Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make extract_levels preprocessor function lazy #1761

Merged
merged 9 commits into from
May 30, 2023
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies:
- py-cordex
- pybtex
- python>=3.9
- python-stratify
- python-stratify>=0.3
- pyyaml
- requests
- scipy>=1.6
Expand Down
14 changes: 14 additions & 0 deletions esmvalcore/preprocessor/_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections import defaultdict

import dask.array as da
import numpy as np

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,3 +72,16 @@ def grouper(product):

grouped = _groupby(products, keyfunc=grouper)
return grouped.items()


def get_array_module(*args):
"""Return the best matching array module.

If at least one of the arguments is a :class:`dask.array.Array` object,
the :mod:`dask.array` module is returned. In all other cases the
:mod:`numpy` module is returned.
"""
for arg in args:
if isinstance(arg, da.Array):
return da
return np
Comment on lines +77 to +87
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function shouldn't be needed thanks to the dispatch mechanism from NEP-18. However, it is needed, which indicates a bug in numpy wrt dispatching in np.ma. If you have the stomach for it, it would be nice to follow up with an issue upstream to sort this out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's work in progress: numpy/numpy#22913

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a TODO to remove when that'll be in, and add the link to the Numpy PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too keen on that, it will take quite a while before we drop support for the last numpy version that doesn't have the feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! We can always pin numpy - but I wouldn't do that

13 changes: 5 additions & 8 deletions esmvalcore/preprocessor/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
import iris
import numpy as np
import stratify
from dask import array as da
from geopy.geocoders import Nominatim
from iris.analysis import AreaWeighted, Linear, Nearest, UnstructuredNearest
from iris.util import broadcast_to_shape

from ..cmor._fixes.shared import add_altitude_from_plev, add_plev_from_altitude
from ..cmor.table import CMOR_TABLES
from ._other import get_array_module
from ._regrid_esmpy import ESMF_REGRID_METHODS
from ._regrid_esmpy import regrid as esmpy_regrid
from ._supplementary_vars import add_ancillary_variable, add_cell_measure
Expand Down Expand Up @@ -797,22 +797,19 @@ def _vertical_interpolate(cube, src_levels, levels, interpolation,
cube.coord_dims(src_levels))

# force mask onto data as nan's
cube.data = da.ma.filled(cube.core_data(), np.nan)
npx = get_array_module(cube.core_data())
data = npx.ma.filled(cube.core_data(), np.nan)

# Now perform the actual vertical interpolation.
new_data = stratify.interpolate(levels,
src_levels_broadcast,
cube.core_data(),
data,
axis=z_axis,
interpolation=interpolation,
extrapolation=extrapolation)

# Calculate the mask based on the any NaN values in the interpolated data.
mask = np.isnan(new_data)

if np.any(mask):
# Ensure that the data is masked appropriately.
new_data = np.ma.array(new_data, mask=mask, fill_value=_MDI)
new_data = npx.ma.masked_where(npx.isnan(new_data), new_data)
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved

# Construct the resulting cube with the interpolated data.
return _create_cube(cube, new_data, src_levels, levels.astype(float))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
'scipy>=1.6',
'scitools-iris>=3.4.0',
'shapely[vectorized]',
'stratify',
'stratify>=0.3',
'yamale',
],
# Test dependencies
Expand Down
45 changes: 39 additions & 6 deletions tests/integration/preprocessor/_regrid/test_extract_levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


class Test(tests.Test):

def setUp(self):
"""Prepare tests."""
shape = (3, 2, 2)
Expand Down Expand Up @@ -57,20 +58,52 @@ def test_interpolation__linear(self):
levels = [0.5, 1.5]
scheme = 'linear'
result = extract_levels(self.cube, levels, scheme)
expected = np.array([[[[2., 3.], [4., 5.]], [[6., 7.], [8., 9.]]],
[[[14., 15.], [16., 17.]], [[18., 19.],
[20., 21.]]]])
expected = np.ma.array([
[
[[2., 3.], [4., 5.]],
[[6., 7.], [8., 9.]],
],
[
[[14., 15.], [16., 17.]],
[[18., 19.], [20., 21.]],
],
])
self.assert_array_equal(result.data, expected)
self.shape[self.z_dim] = len(levels)
self.assertEqual(result.shape, tuple(self.shape))

def test_interpolation__linear_lazy(self):
levels = [0.5, 1.5]
scheme = 'linear'
cube = self.cube.copy(self.cube.lazy_data())
result = extract_levels(cube, levels, scheme)
self.assertTrue(result.has_lazy_data())
expected = np.ma.array([
[
[[2., 3.], [4., 5.]],
[[6., 7.], [8., 9.]],
],
[
[[14., 15.], [16., 17.]],
[[18., 19.], [20., 21.]],
],
])
self.assert_array_equal(result.data, expected)

def test_interpolation__nearest(self):
levels = [0.49, 1.51]
scheme = 'nearest'
result = extract_levels(self.cube, levels, scheme)
expected = np.array([[[[0., 1.], [2., 3.]], [[8., 9.], [10., 11.]]],
[[[12., 13.], [14., 15.]], [[20., 21.],
[22., 23.]]]])
expected = np.ma.array([
[
[[0., 1.], [2., 3.]],
[[8., 9.], [10., 11.]],
],
[
[[12., 13.], [14., 15.]],
[[20., 21.], [22., 23.]],
],
])
self.assert_array_equal(result.data, expected)
self.shape[self.z_dim] = len(levels)
self.assertEqual(result.shape, tuple(self.shape))
Expand Down
26 changes: 25 additions & 1 deletion tests/unit/preprocessor/_other/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import unittest

import dask.array as da
import iris.coord_categorisation
import iris.coords
import numpy as np
Expand All @@ -10,11 +11,16 @@
from numpy.testing import assert_array_equal

from esmvalcore.preprocessor import PreprocessorFile
from esmvalcore.preprocessor._other import _group_products, clip
from esmvalcore.preprocessor._other import (
_group_products,
clip,
get_array_module,
)


class TestOther(unittest.TestCase):
"""Test class for _other."""

def test_clip(self):
"""Test clip function."""
cube = Cube(np.array([-10, 0, 10]))
Expand Down Expand Up @@ -67,5 +73,23 @@ def test_group_products_string_list():
assert grouped_by_list == grouped_by_string


def test_get_array_module_da():

npx = get_array_module(da.array([1, 2]))
assert npx is da


def test_get_array_module_np():

npx = get_array_module(np.array([1, 2]))
assert npx is np


def test_get_array_module_mixed():

npx = get_array_module(da.array([1]), np.array([1]))
assert npx is da


if __name__ == '__main__':
unittest.main()
15 changes: 2 additions & 13 deletions tests/unit/preprocessor/_regrid/test_extract_levels.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""Unit tests for :func:`esmvalcore.preprocessor.regrid.extract_levels`."""
import os
import tempfile
import unittest
from unittest import mock

Expand Down Expand Up @@ -35,8 +33,6 @@ def setUp(self):
self.schemes = [
'linear', 'nearest', 'linear_extrapolate', 'nearest_extrapolate',
]
descriptor, self.filename = tempfile.mkstemp('.nc')
os.close(descriptor)

def test_invalid_scheme__unknown(self):
levels = mock.sentinel.levels
Expand Down Expand Up @@ -160,7 +156,7 @@ def test_interpolation(self):
# Check the _create_cube args ...
self.assertEqual(len(args), 4)
self.assertEqual(args[0], self.cube)
self.assert_array_equal(args[1], new_data)
self.assert_array_equal(args[1], np.ma.array(new_data))
self.assert_array_equal(args[2],
self.cube.coord(axis='z', dim_coords=True))
self.assert_array_equal(args[3], levels)
Expand Down Expand Up @@ -281,15 +277,8 @@ def test_interpolation__masked(self):
masked = ma.empty(self.shape)
masked.mask = mask
cube = _make_cube(masked, dtype=self.dtype)
# save cube to test the lazy data interpolation too
iris.save(cube, self.filename)
with mock.patch('stratify.interpolate',
return_value=new_data) as mocker:
# first test lazy
loaded_cube = iris.load_cube(self.filename)
result_from_lazy = extract_levels(loaded_cube, levels, scheme)
self.assertEqual(result_from_lazy, self.created_cube)
# then test realized
result = extract_levels(cube, levels, scheme)
self.assertEqual(result, self.created_cube)
args, kwargs = mocker.call_args
Expand All @@ -300,7 +289,7 @@ def test_interpolation__masked(self):
src_levels_broadcast = np.broadcast_to(pts.reshape(self.z, 1, 1),
cube.shape)
self.assert_array_equal(args[1], src_levels_broadcast)
self.assert_array_equal(args[2], cube.data)
self.assert_array_equal(args[2], np.ma.filled(masked, np.nan))
# Check the stratify.interpolate kwargs ...
self.assertEqual(
kwargs, dict(axis=0, interpolation=scheme,
Expand Down