Skip to content

Commit

Permalink
Merge pull request #45 from openweathermap/feature/exception_group
Browse files Browse the repository at this point in the history
DekerVSubsetError is an exception group
  • Loading branch information
SerGeRybakov authored Apr 4, 2024
2 parents e1df41d + e30580e commit b992e58
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 21 deletions.
34 changes: 33 additions & 1 deletion deker/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List


class DekerBaseApplicationError(Exception):
Expand Down Expand Up @@ -107,7 +108,38 @@ class DekerSubsetError(DekerArrayError):


class DekerVSubsetError(DekerSubsetError):
"""If something goes wrong while VSubset managing."""
"""If something goes wrong while VSubset managing.
Regarding that VSubset's inner Subsets' processing
is made in an Executor, this exception actually is
an `exceptions messages group`.
If one or more threads finished with any exception,
name, message and reasonable tracebacks of all
of these exceptions shall be collected in a list
and passed to this class along with some message.
```
futures = [executor.submit(func, arg) for arg in iterable]
exceptions = []
for future in futures:
try:
future.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))
```
"""

def __init__(self, message: str, exceptions: List[str]):
self.message = message
self.exceptions = exceptions
super().__init__(message)

def __str__(self) -> str:
enumerated = [f"{num}) {e}" for num, e in enumerate(self.exceptions, start=1)]
joined = "\n".join(str(e) for e in enumerated)
return f"{self.message}; exceptions:\n\n{joined} "


class DekerMemoryError(DekerBaseApplicationError, MemoryError):
Expand Down
38 changes: 21 additions & 17 deletions deker/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import builtins
import traceback

from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Union

Expand All @@ -24,7 +25,7 @@
from numpy import ndarray

from deker.ABC.base_subset import BaseSubset
from deker.errors import DekerArrayError, DekerLockError, DekerVSubsetError
from deker.errors import DekerArrayError, DekerVSubsetError
from deker.locks import WriteVarrayLock
from deker.schemas import TimeDimensionSchema
from deker.tools import not_deleted
Expand Down Expand Up @@ -621,20 +622,23 @@ def _update(array_data: ArrayPositionedData) -> None:
self.__array.dtype, self.__array.shape, data, self.__bounds
)

results = self.__adapter.executor.map(
_update,
[
ArrayPositionedData(vpos, array_bounds, data[data_bounds])
for vpos, array_bounds, data_bounds in self.__arrays
],
)
try:
list(results)
except Exception as e:
if isinstance(e, DekerLockError):
raise e
else:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} IS NOW CORRUPTED due to the exception above"
).with_traceback(e.__traceback__)
positions = [
ArrayPositionedData(vpos, array_bounds, data[data_bounds])
for vpos, array_bounds, data_bounds in self.__arrays
]
futures = [self.__adapter.executor.submit(_update, position) for position in positions]

exceptions = []
for future in futures:
try:
future.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))

if exceptions:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} MAY BE NOW CORRUPTED due to the exceptions occurred in threads",
exceptions,
)

self.logger.info(f"{self!s} data updated OK")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ per-file-ignores = [
"deker/private/types/shell.py: D205, D400",
"deker/collection.py: D401",
"deker/subset.py: DAR101, E203",
"deker/errors.py: D301",
]
ignore = [
"B012",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cases/test_concurrency/test_in_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def test_intersecting_vsubsets_update_fail(
(np.index_exp[:5, 3:, :], 500), # shall be blocked
(np.index_exp[8:, 8:, 8:], blocking_value), # shall proceed as non-locked
)
# Call read process to lock arrays for reading
# Call read process to lock arrays
proc = Process(
target=call_array_method,
args=(
Expand Down
41 changes: 39 additions & 2 deletions tests/test_cases/test_subsets/test_varray_subset_methods.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import time
import traceback

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from random import randint
from typing import Tuple

import hdf5plugin
Expand All @@ -10,15 +15,15 @@

from tests.parameters.index_exp_params import valid_index_exp_params

from deker import Scale
from deker.arrays import VArray
from deker.client import Client
from deker.collection import Collection
from deker.errors import DekerArrayError, DekerSubsetError
from deker.errors import DekerArrayError, DekerSubsetError, DekerVSubsetError
from deker.schemas import AttributeSchema, DimensionSchema, TimeDimensionSchema, VArraySchema
from deker.subset import VSubset
from deker.types.private.classes import ArrayPosition
from deker.types.private.typings import Slice
from deker.types.public.classes import Scale


class TestVArraySubset:
Expand Down Expand Up @@ -523,6 +528,38 @@ def test_vsubset_arrays_calc_3dim(self, client):

coll.delete()

def test_DekerVSubsetError_exception_group(
self, inserted_varray: VArray, root_path, varray_collection: Collection
):
"""Test raise of DekerVSubsetError."""
excs = [ValueError, TypeError, FileExistsError, FileNotFoundError, EOFError]
sleep_time = randint(1, 5)

def raise_exc(exc):
time.sleep(sleep_time)
raise exc(f"This is {exc.__name__}")

def catch_errors(threads_results):
exceptions = []
for result in threads_results:
try:
result.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))
if exceptions:
raise DekerVSubsetError("ExceptionGroup works", exceptions)

timer = time.monotonic()
with ThreadPoolExecutor(len(excs)) as pool:
futures = [pool.submit(raise_exc, exc) for exc in excs]

with pytest.raises(DekerVSubsetError):
assert catch_errors(threads_results=futures)
# catch_errors(threads_results=futures)

test_time = int(time.monotonic() - timer)
assert test_time <= sleep_time


class TestVSubsetForXArray:
@pytest.fixture(scope="class")
Expand Down

0 comments on commit b992e58

Please sign in to comment.