From 47212a6336537a73cca9ffadf65c05d6b8b3860f Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Sat, 2 Nov 2024 18:21:42 +0500 Subject: [PATCH 1/2] fix: set adapter.connected = False on channel closing --- pyzeebe/grpc_internals/zeebe_adapter_base.py | 8 +++++++- tests/unit/worker/job_poller_test.py | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pyzeebe/grpc_internals/zeebe_adapter_base.py b/pyzeebe/grpc_internals/zeebe_adapter_base.py index 4269099f..a343bc41 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_base.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_base.py @@ -21,11 +21,15 @@ class ZeebeAdapterBase: def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = -1): self._channel = grpc_channel self._gateway_stub = GatewayStub(grpc_channel) - self.connected = True + self._connected = True self.retrying_connection = False self._max_connection_retries = max_connection_retries self._current_connection_retries = 0 + @property + def connected(self) -> bool: + return self._connected + def _should_retry(self) -> bool: return self._max_connection_retries == -1 or self._current_connection_retries < self._max_connection_retries @@ -44,6 +48,8 @@ async def _close(self) -> None: await self._channel.close() except Exception as exception: logger.exception("Failed to close channel, %s exception was raised", type(exception).__name__) + finally: + self._connected = False def _create_pyzeebe_error_from_grpc_error(grpc_error: grpc.aio.AioRpcError) -> PyZeebeError: diff --git a/tests/unit/worker/job_poller_test.py b/tests/unit/worker/job_poller_test.py index 7d8cf69e..1ba247cb 100644 --- a/tests/unit/worker/job_poller_test.py +++ b/tests/unit/worker/job_poller_test.py @@ -46,13 +46,13 @@ async def test_job_is_added_to_task_state( class TestShouldPoll: def test_should_poll_returns_expected_result_when_disconnected(self, job_poller: JobPoller): - job_poller.zeebe_adapter.connected = False + job_poller.zeebe_adapter._connected = False job_poller.zeebe_adapter.retrying_connection = False assert not job_poller.should_poll() def test_continues_polling_when_retrying_connection(self, job_poller: JobPoller): - job_poller.zeebe_adapter.connected = False + job_poller.zeebe_adapter._connected = False job_poller.zeebe_adapter.retrying_connection = True assert job_poller.should_poll() From a595db8729e9af8e7406178490fb9683014f4612 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 4 Nov 2024 11:54:00 +0500 Subject: [PATCH 2/2] add test --- tests/unit/grpc_internals/zeebe_adapter_base_test.py | 11 +++++++---- tests/unit/worker/worker_test.py | 7 ++++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/unit/grpc_internals/zeebe_adapter_base_test.py b/tests/unit/grpc_internals/zeebe_adapter_base_test.py index ed30bda8..11bf59f2 100644 --- a/tests/unit/grpc_internals/zeebe_adapter_base_test.py +++ b/tests/unit/grpc_internals/zeebe_adapter_base_test.py @@ -69,18 +69,21 @@ async def test_raises_unkown_grpc_status_code_on_unkown_status_code( async def test_closes_after_retries_exceeded(self, zeebe_adapter: ZeebeAdapterBase): error = grpc.aio.AioRpcError(grpc.StatusCode.UNAVAILABLE, None, None) - zeebe_adapter._close = AsyncMock() + zeebe_adapter._channel.close = AsyncMock() zeebe_adapter._max_connection_retries = 1 with pytest.raises(ZeebeGatewayUnavailableError): await zeebe_adapter._handle_grpc_error(error) - zeebe_adapter._close.assert_called_once() + assert zeebe_adapter.connected is False + zeebe_adapter._channel.close.assert_awaited_once() async def test_closes_after_internal_error(self, zeebe_adapter: ZeebeAdapterBase): error = grpc.aio.AioRpcError(grpc.StatusCode.INTERNAL, None, None) - zeebe_adapter._close = AsyncMock() + + zeebe_adapter._channel.close = AsyncMock() zeebe_adapter._max_connection_retries = 1 with pytest.raises(ZeebeInternalError): await zeebe_adapter._handle_grpc_error(error) - zeebe_adapter._close.assert_called_once() + assert zeebe_adapter.connected is False + zeebe_adapter._channel.close.assert_awaited_once() diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index ef77d820..9f234604 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from unittest.mock import AsyncMock, Mock from uuid import uuid4 @@ -277,12 +278,12 @@ async def test_poller_failed(self, zeebe_worker: ZeebeWorker): async def test_second_poller_should_cancel(self, zeebe_worker: ZeebeWorker): zeebe_worker._init_tasks = Mock() - poller2_cancel_event = anyio.Event() + poller2_cancel_event = asyncio.Event() async def poll2(): try: - await anyio.Event().wait() - except anyio.get_cancelled_exc_class(): + await asyncio.Event().wait() + except asyncio.CancelledError: poller2_cancel_event.set() poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")]))