Skip to content

Commit

Permalink
Relax concatenation checks for --check_level=relax and `--check_lev…
Browse files Browse the repository at this point in the history
…el=ignore` (#2144)

Co-authored-by: Valeriu Predoi <[email protected]>
  • Loading branch information
sloosvel and valeriupredoi authored Oct 6, 2023
1 parent 9b323aa commit 469fd09
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 201 deletions.
12 changes: 7 additions & 5 deletions doc/quickstart/run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,20 @@ or
This feature is available for projects that are hosted on the ESGF, i.e.
CMIP3, CMIP5, CMIP6, CORDEX, and obs4MIPs.

To control the strictness of the CMOR checker, use the flag ``--check_level``:
To control the strictness of the CMOR checker and the checks during concatenation
on auxiliary coordinates, supplementary variables, and derived coordinates,
use the flag ``--check_level``:

.. code:: bash
esmvaltool run --check_level=relaxed recipe_example.yml
Possible values are:

- `ignore`: all errors will be reported as warnings
- `relaxed`: only fail if there are critical errors
- `default`: fail if there are any errors
- `strict`: fail if there are any warnings
- `ignore`: all errors will be reported as warnings. Concatenation will be performed without checks.
- `relaxed`: only fail if there are critical errors. Concatenation will be performed without checks.
- `default`: fail if there are any errors.
- `strict`: fail if there are any warnings.

To re-use pre-processed files from a previous run of the same recipe, you can
use
Expand Down
4 changes: 3 additions & 1 deletion esmvalcore/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,9 @@ def _load(self) -> Cube:
'session': self.session,
**self.facets,
}
settings['concatenate'] = {}
settings['concatenate'] = {
'check_level': self.session['check_level']
}
settings['cmor_check_metadata'] = {
'check_level': self.session['check_level'],
'cmor_table': self.facets['project'],
Expand Down
277 changes: 152 additions & 125 deletions esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import iris
import iris.aux_factory
import iris.exceptions
import isodate
import numpy as np
import yaml
from cf_units import suppress_errors
from iris.cube import CubeList

from esmvalcore.cmor.check import CheckLevels
from esmvalcore.iris_helpers import merge_cube_attributes

from .._task import write_ncl_settings
from ._time import extract_time
from ._time import clip_timerange

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,10 +98,8 @@ def _get_attr_from_field_coord(ncfield, coord_name, attr):
def _load_callback(raw_cube, field, _):
"""Use this callback to fix anything Iris tries to break."""
# Remove attributes that cause issues with merging and concatenation
_delete_attributes(
raw_cube,
('creation_date', 'tracking_id', 'history', 'comment')
)
_delete_attributes(raw_cube,
('creation_date', 'tracking_id', 'history', 'comment'))
for coord in raw_cube.coords():
# Iris chooses to change longitude and latitude units to degrees
# regardless of value in file, so reinstating file value
Expand Down Expand Up @@ -184,17 +185,110 @@ def load(
return raw_cubes


def _by_two_concatenation(cubes):
"""Perform a by-2 concatenation to avoid gaps."""
concatenated = iris.cube.CubeList(cubes).concatenate()
if len(concatenated) == 1:
return concatenated[0]
def _concatenate_cubes(cubes, check_level):
"""Concatenate cubes according to the check_level."""
kwargs = {
'check_aux_coords': True,
'check_cell_measures': True,
'check_ancils': True,
'check_derived_coords': True
}

concatenated = _concatenate_overlapping_cubes(concatenated)
if len(concatenated) == 2:
_get_concatenation_error(concatenated)
else:
return concatenated[0]
if check_level > CheckLevels.DEFAULT:
kwargs = dict.fromkeys(kwargs, False)
logger.debug(
'Concatenation will be performed without checking '
'auxiliary coordinates, cell measures, ancillaries '
'and derived coordinates present in the cubes.', )

concatenated = iris.cube.CubeList(cubes).concatenate(**kwargs)

return concatenated


def _check_time_overlaps(cubes):
"""Handle time overlaps."""
times = [cube.coord('time').core_points() for cube in cubes]
for index, _ in enumerate(times[:-1]):
overlap = np.intersect1d(times[index], times[index + 1])
if overlap.size != 0:
overlapping_cubes = cubes[index:index + 2]
time_1 = overlapping_cubes[0].coord('time').core_points()
time_2 = overlapping_cubes[1].coord('time').core_points()

# case 1: both cubes start at the same time -> return longer cube
if time_1[0] == time_2[0]:
if time_1[-1] <= time_2[-1]:
cubes.pop(index)
discarded_cube_index = 0
used_cube_index = 1
else:
cubes.pop(index + 1)
discarded_cube_index = 1
used_cube_index = 0
logger.debug(
"Both cubes start at the same time but cube %s "
"ends before %s",
overlapping_cubes[discarded_cube_index],
overlapping_cubes[used_cube_index],
)
logger.debug(
"Cube %s contains all needed data so using it fully",
overlapping_cubes[used_cube_index],
)

# case 2: cube1 starts before cube2
# case 2.1: cube1 ends after cube2 -> return cube1
elif time_1[-1] > time_2[-1]:
cubes.pop(index + 1)
logger.debug("Using only data from %s", overlapping_cubes[0])

# case 2.2: cube1 ends before cube2 -> use full cube2
# and shorten cube1
else:
new_time = np.delete(
time_1,
np.argwhere(np.in1d(time_1, overlap)),
)
new_dates = overlapping_cubes[0].coord('time').units.num2date(
new_time)
logger.debug(
"Extracting time slice between %s and %s from cube %s "
"to use it for concatenation with cube %s",
new_dates[0],
new_dates[-1],
overlapping_cubes[0],
overlapping_cubes[1],
)

start_point = isodate.date_isoformat(
new_dates[0], format=isodate.isostrf.DATE_BAS_COMPLETE)
end_point = isodate.date_isoformat(
new_dates[-1], format=isodate.isostrf.DATE_BAS_COMPLETE)
new_cube = clip_timerange(overlapping_cubes[0],
f'{start_point}/{end_point}')

cubes[index] = new_cube
return cubes


def _fix_calendars(cubes):
"""Check and homogenise calendars, if possible."""
calendars = [cube.coord('time').units.calendar for cube in cubes]
unique_calendars = np.unique(calendars)

calendar_ocurrences = np.array(
[calendars.count(calendar) for calendar in unique_calendars])
calendar_index = int(
np.argwhere(calendar_ocurrences == calendar_ocurrences.max()))

for cube in cubes:
time_coord = cube.coord('time')
old_calendar = time_coord.units.calendar
if old_calendar != unique_calendars[calendar_index]:
new_unit = time_coord.units.change_calendar(
unique_calendars[calendar_index])
time_coord.units = new_unit


def _get_concatenation_error(cubes):
Expand All @@ -214,28 +308,56 @@ def _get_concatenation_error(cubes):
raise ValueError(f'Can not concatenate cubes: {msg}')


def concatenate(cubes):
"""Concatenate all cubes after fixing metadata."""
def _sort_cubes_by_time(cubes):
"""Sort CubeList by time coordinate."""
try:
cubes = sorted(cubes, key=lambda c: c.coord("time").cell(0).point)
except iris.exceptions.CoordinateNotFoundError as exc:
msg = "One or more cubes {} are missing".format(cubes) + \
" time coordinate: {}".format(str(exc))
raise ValueError(msg)
except TypeError as error:
msg = ("Cubes cannot be sorted "
f"due to differing time units: {str(error)}")
raise TypeError(msg) from error
return cubes


def concatenate(cubes, check_level=CheckLevels.DEFAULT):
"""Concatenate all cubes after fixing metadata.
Parameters
----------
cubes: iterable of iris.cube.Cube
Data cubes to be concatenated
check_level: CheckLevels
Level of strictness of the checks in the concatenation.
Returns
-------
cube: iris.cube.Cube
Resulting concatenated cube.
Raises
------
ValueError
Concatenation was not possible.
"""
if not cubes:
return cubes
if len(cubes) == 1:
return cubes[0]

merge_cube_attributes(cubes)
cubes = _sort_cubes_by_time(cubes)
_fix_calendars(cubes)
cubes = _check_time_overlaps(cubes)
result = _concatenate_cubes(cubes, check_level=check_level)

if len(cubes) > 1:
# order cubes by first time point
try:
cubes = sorted(cubes, key=lambda c: c.coord("time").cell(0).point)
except iris.exceptions.CoordinateNotFoundError as exc:
msg = "One or more cubes {} are missing".format(cubes) + \
" time coordinate: {}".format(str(exc))
raise ValueError(msg)

# iteratively concatenate starting with first cube
result = cubes[0]
for cube in cubes[1:]:
result = _by_two_concatenation([result, cube])
if len(result) == 1:
result = result[0]
else:
_get_concatenation_error(result)

_fix_aux_factories(result)

Expand Down Expand Up @@ -410,98 +532,3 @@ def _write_ncl_metadata(output_dir, metadata):
write_ncl_settings(info, filename)

return filename


def _concatenate_overlapping_cubes(cubes):
"""Concatenate time-overlapping cubes (two cubes only)."""
# we arrange [cube1, cube2] so that cube1.start <= cube2.start
if cubes[0].coord('time').points[0] <= cubes[1].coord('time').points[0]:
cubes = [cubes[0], cubes[1]]
logger.debug(
"Will attempt to concatenate cubes %s "
"and %s in this order", cubes[0], cubes[1])
else:
cubes = [cubes[1], cubes[0]]
logger.debug(
"Will attempt to concatenate cubes %s "
"and %s in this order", cubes[1], cubes[0])

# get time end points
time_1 = cubes[0].coord('time')
time_2 = cubes[1].coord('time')
if time_1.units != time_2.units:
raise ValueError(
f"Cubes\n{cubes[0]}\nand\n{cubes[1]}\ncan not be concatenated: "
f"time units {time_1.units}, calendar {time_1.units.calendar} "
f"and {time_2.units}, calendar {time_2.units.calendar} differ")
data_start_1 = time_1.cell(0).point
data_start_2 = time_2.cell(0).point
data_end_1 = time_1.cell(-1).point
data_end_2 = time_2.cell(-1).point

# case 1: both cubes start at the same time -> return longer cube
if data_start_1 == data_start_2:
if data_end_1 <= data_end_2:
logger.debug(
"Both cubes start at the same time but cube %s "
"ends before %s", cubes[0], cubes[1])
logger.debug("Cube %s contains all needed data so using it fully",
cubes[1])
cubes = [cubes[1]]
else:
logger.debug(
"Both cubes start at the same time but cube %s "
"ends before %s", cubes[1], cubes[0])
logger.debug("Cube %s contains all needed data so using it fully",
cubes[0])
cubes = [cubes[0]]

# case 2: cube1 starts before cube2
else:
# find time overlap, if any
start_overlap = next((time_1.units.num2date(t)
for t in time_1.points if t in time_2.points),
None)
# case 2.0: no overlap (new iris implementation does allow
# concatenation of cubes with no overlap)
if not start_overlap:
logger.debug(
"Unable to concatenate non-overlapping cubes\n%s\nand\n%s"
"separated in time.", cubes[0], cubes[1])
# case 2.1: cube1 ends after cube2 -> return cube1
elif data_end_1 > data_end_2:
cubes = [cubes[0]]
logger.debug("Using only data from %s", cubes[0])
# case 2.2: cube1 ends before cube2 -> use full cube2 and shorten cube1
else:
logger.debug(
"Extracting time slice between %s and %s from cube %s to use "
"it for concatenation with cube %s", "-".join([
str(data_start_1.year),
str(data_start_1.month),
str(data_start_1.day)
]), "-".join([
str(start_overlap.year),
str(start_overlap.month),
str(start_overlap.day)
]), cubes[0], cubes[1])
c1_delta = extract_time(cubes[0], data_start_1.year,
data_start_1.month, data_start_1.day,
start_overlap.year, start_overlap.month,
start_overlap.day)
# convert c1_delta scalar cube to vector cube, if needed
if c1_delta.shape == ():
c1_delta = iris.util.new_axis(c1_delta, scalar_coord="time")
cubes = iris.cube.CubeList([c1_delta, cubes[1]])
logger.debug("Attempting concatenatenation of %s with %s",
c1_delta, cubes[1])
try:
cubes = [iris.cube.CubeList(cubes).concatenate_cube()]
except iris.exceptions.ConcatenateError as ex:
logger.error('Can not concatenate cubes: %s', ex)
logger.error('Cubes:')
for cube in cubes:
logger.error(cube)
raise ex

return cubes
Loading

0 comments on commit 469fd09

Please sign in to comment.