diff --git a/tests/handling/daemons/conftest.py b/tests/handling/daemons/conftest.py index d64e3c9a..2f3d39aa 100644 --- a/tests/handling/daemons/conftest.py +++ b/tests/handling/daemons/conftest.py @@ -1,10 +1,10 @@ import asyncio +import collections import contextlib -import time +from typing import Optional -import freezegun import pytest -from mock import MagicMock, patch +from mock import MagicMock import kopf from kopf._cogs.aiokits.aiotoggles import ToggleSet @@ -20,23 +20,32 @@ class DaemonDummy: def __init__(self): super().__init__() self.mock = MagicMock() - self.kwargs = {} + self._flag_statuses = collections.defaultdict(lambda: False) self.steps = { 'called': asyncio.Event(), 'finish': asyncio.Event(), 'error': asyncio.Event(), } + self.called = asyncio.Condition() + self.failed = asyncio.Event() + self.finished = asyncio.Event() - async def wait_for_daemon_done(self): - stopped = self.kwargs['stopped'] + async def wait_for_daemon_done(self) -> None: + stopped = self.mock.call_args[1]['stopped'] await stopped.wait() - while not stopped.reason & stopped.reason.DONE: + while stopped.reason is None or not stopped.reason & stopped.reason.DONE: await asyncio.sleep(0) # give control back to asyncio event loop @pytest.fixture() -def dummy(): - return DaemonDummy() +async def dummy(simulate_cycle): + dummy = DaemonDummy() + yield dummy + + # Cancel the background tasks, if any. + event_object = {'metadata': {'deletionTimestamp': '...'}} + await simulate_cycle(event_object) + await dummy.wait_for_daemon_done() @pytest.fixture() @@ -52,7 +61,11 @@ def _merge_dicts(src, dst): else: dst[key] = val - async def _simulate_cycle(event_object: RawBody): + async def _simulate_cycle( + event_object: RawBody, + *, + stream_pressure: Optional[asyncio.Event] = None, + ) -> None: mocker.resetall() await process_resource_event( @@ -65,6 +78,7 @@ async def _simulate_cycle(event_object: RawBody): indexers=OperatorIndexers(), raw_event={'type': 'irrelevant', 'object': event_object}, event_queue=asyncio.Queue(), + stream_pressure=stream_pressure, ) # Do the same as k8s does: merge the patches into the object. @@ -96,33 +110,3 @@ async def background_daemon_killer(settings, memories, operator_paused): with contextlib.suppress(asyncio.CancelledError): task.cancel() await task - - -@pytest.fixture() -async def frozen_time(): - """ - A helper to simulate time movements to step over long sleeps/timeouts. - """ - with freezegun.freeze_time("2020-01-01 00:00:00") as frozen: - # Use freezegun-supported time instead of system clocks -- for testing purposes only. - # NB: Patch strictly after the time is frozen -- to use fake_time(), not real time(). - # NB: StdLib's event loops use time.monotonic(), but uvloop uses its own C-level time, - # so patch the loop object directly instead of its implied underlying implementation. - with patch.object(asyncio.get_running_loop(), 'time', time.time): - yield frozen - - -# The time-driven tests mock the sleeps, and shift the time as much as it was requested to sleep. -# This makes the sleep realistic for the app code, though executed instantly for the tests. -@pytest.fixture() -def manual_time(k8s_mocked, frozen_time): - async def sleep_substitute(delay, *_, **__): - if delay is None: - pass - elif isinstance(delay, float): - frozen_time.tick(delay) - else: - frozen_time.tick(min(delay)) - - k8s_mocked.sleep.side_effect = sleep_substitute - yield frozen_time diff --git a/tests/handling/daemons/test_daemon_errors.py b/tests/handling/daemons/test_daemon_errors.py index 1bdd7bf1..12ffb716 100644 --- a/tests/handling/daemons/test_daemon_errors.py +++ b/tests/handling/daemons/test_daemon_errors.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -5,26 +6,21 @@ async def test_daemon_stopped_on_permanent_error( - settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.daemon(*resource, id='fn', backoff=0.01) + @kopf.daemon(*resource, id='fn', backoff=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) raise PermanentError("boo!") finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 assert dummy.mock.call_count == 1 - assert k8s_mocked.patch.call_count == 0 - assert k8s_mocked.sleep.call_count == 0 assert_logs([ "Daemon 'fn' failed permanently: boo!", @@ -35,25 +31,21 @@ async def fn(**kwargs): async def test_daemon_stopped_on_arbitrary_errors_with_mode_permanent( - settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=0.01) + @kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) raise Exception("boo!") finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 0 assert_logs([ "Daemon 'fn' failed with an exception and will stop now: boo!", @@ -64,31 +56,25 @@ async def fn(**kwargs): async def test_daemon_retried_on_temporary_error( - registry, settings, resource, dummy, manual_time, - caplog, assert_logs, k8s_mocked, simulate_cycle): + registry, settings, resource, dummy, + caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.daemon(*resource, id='fn', backoff=1.0) + @kopf.daemon(*resource, id='fn', backoff=1.23) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: - raise TemporaryError("boo!", delay=1.0) + raise TemporaryError("boo!", delay=3.45) else: - dummy.steps['finish'].set() + finished.set() finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - + assert looptime == 3.45 assert_logs([ "Daemon 'fn' failed temporarily: boo!", "Daemon 'fn' succeeded.", @@ -97,70 +83,66 @@ async def fn(retry, **kwargs): async def test_daemon_retried_on_arbitrary_error_with_mode_temporary( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.0) + @kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.23) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: raise Exception("boo!") else: - dummy.steps['finish'].set() + finished.set() finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - + assert looptime == 1.23 assert_logs([ - "Daemon 'fn' failed with an exception and will try again in 1.0 seconds: boo!", + "Daemon 'fn' failed with an exception and will try again in 1.23 seconds: boo!", "Daemon 'fn' succeeded.", "Daemon 'fn' has exited on its own", ]) async def test_daemon_retried_until_retries_limit( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.daemon(*resource, id='fn', retries=3) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=1.23) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 2.46 + assert dummy.mock.call_count == 3 async def test_daemon_retried_until_timeout( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.daemon(*resource, id='fn', timeout=3.0) + @kopf.daemon(*resource, id='fn', timeout=4) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=1.23) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 3.69 + assert dummy.mock.call_count == 4 diff --git a/tests/handling/daemons/test_daemon_filtration.py b/tests/handling/daemons/test_daemon_filtration.py index b7a48c73..ea5bdf71 100644 --- a/tests/handling/daemons/test_daemon_filtration.py +++ b/tests/handling/daemons/test_daemon_filtration.py @@ -1,3 +1,4 @@ +import asyncio import logging import pytest @@ -11,22 +12,23 @@ async def test_daemon_filtration_satisfied( settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn', labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() finalizer = settings.persistence.finalizer event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'}, 'annotations': {'x': 'value', 'y': '...'}, 'finalizers': [finalizer]}} await simulate_cycle(event_body) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + assert dummy.mock.call_count == 1 @pytest.mark.parametrize('labels, annotations', [ @@ -56,6 +58,7 @@ async def fn(**kwargs): 'annotations': annotations, 'finalizers': [finalizer]}} await simulate_cycle(event_body) + await asyncio.sleep(123) # give it enough time to do something when nothing is expected assert spawn_daemons.called assert spawn_daemons.call_args_list[0][1]['handlers'] == [] diff --git a/tests/handling/daemons/test_daemon_rematching.py b/tests/handling/daemons/test_daemon_rematching.py index 4442f743..36bf39a9 100644 --- a/tests/handling/daemons/test_daemon_rematching.py +++ b/tests/handling/daemons/test_daemon_rematching.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -7,19 +8,19 @@ async def test_running_daemon_is_stopped_when_mismatches( resource, dummy, looptime, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn', when=lambda **_: is_matching) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await kwargs['stopped'].wait() # Ensure it is spawned while it is matching. (The same as the spawning tests.) mocker.resetall() is_matching = True await simulate_cycle({}) - await dummy.steps['called'].wait() + await executed.wait() assert dummy.mock.call_count == 1 # Ensure it is stopped once it stops matching. (The same as the termination tests.) @@ -29,5 +30,5 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() assert looptime == 0 - stopped = dummy.kwargs['stopped'] + stopped = dummy.mock.call_args[1]['stopped'] assert DaemonStoppingReason.FILTERS_MISMATCH in stopped.reason diff --git a/tests/handling/daemons/test_daemon_spawning.py b/tests/handling/daemons/test_daemon_spawning.py index 46b7d959..d341981a 100644 --- a/tests/handling/daemons/test_daemon_spawning.py +++ b/tests/handling/daemons/test_daemon_spawning.py @@ -1,41 +1,37 @@ +import asyncio import logging import kopf async def test_daemon_is_spawned_at_least_once( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await simulate_cycle({}) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 0 assert dummy.mock.call_count == 1 # not restarted async def test_daemon_initial_delay_obeyed( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() - @kopf.daemon(*resource, id='fn', initial_delay=1.0) + @kopf.daemon(*resource, id='fn', initial_delay=5.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await simulate_cycle({}) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count >= 1 - assert k8s_mocked.sleep.call_count <= 2 # one optional extra call for sleep(None) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 5.0 diff --git a/tests/handling/daemons/test_daemon_termination.py b/tests/handling/daemons/test_daemon_termination.py index 3a0e97e1..6228198a 100644 --- a/tests/handling/daemons/test_daemon_termination.py +++ b/tests/handling/daemons/test_daemon_termination.py @@ -9,13 +9,13 @@ async def test_daemon_exits_gracefully_and_instantly_on_resource_deletion( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() await kwargs['stopped'].wait() @@ -34,20 +34,19 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() assert looptime == 0 - assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] async def test_daemon_exits_gracefully_and_instantly_on_operator_exiting( settings, resource, dummy, simulate_cycle, background_daemon_killer, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() await kwargs['stopped'].wait() @@ -65,7 +64,6 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() assert looptime == 0 - assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 0 # To prevent double-cancelling of the scheduler's system tasks in the fixture, let them finish: @@ -76,13 +74,13 @@ async def fn(**kwargs): @pytest.mark.usefixtures('background_daemon_killer') async def test_daemon_exits_gracefully_and_instantly_on_operator_pausing( settings, memories, resource, dummy, simulate_cycle, conflicts_found, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, looptime): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() await kwargs['stopped'].wait() @@ -107,21 +105,17 @@ async def fn(**kwargs): assert not memory.daemons_memory.forever_stopped -async def test_daemon_exits_instantly_via_cancellation_with_backoff( +async def test_daemon_exits_instantly_on_cancellation_with_backoff( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) - dummy.steps['finish'].set() # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_backoff=5, cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_backoff=1.23, cancellation_timeout=10) async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() - try: - await asyncio.Event().wait() # this one is cancelled. - except asyncio.CancelledError: - await dummy.steps['finish'].wait() # simulated slow (non-instant) exiting. + await asyncio.Event().wait() # this one is cancelled. # Trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer @@ -134,17 +128,15 @@ async def fn(**kwargs): event_object.setdefault('metadata', {}).update({'deletionTimestamp': '...'}) await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 + assert looptime == 1.23 # i.e. the slept through the whole backoff time assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2nd cycle: cancelling after the backoff is reached. Wait for cancellation timeout. mocker.resetall() - frozen_time.tick(5) # backoff time or slightly above it await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1.23 # i.e. no additional sleeps happened assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] @@ -152,15 +144,15 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() -async def test_daemon_exits_slowly_via_cancellation_with_backoff( +async def test_daemon_exits_slowly_on_cancellation_with_backoff( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_backoff=5, cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_backoff=1.23, cancellation_timeout=4.56) async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() try: await asyncio.Event().wait() # this one is cancelled. @@ -178,43 +170,39 @@ async def fn(**kwargs): event_object.setdefault('metadata', {}).update({'deletionTimestamp': '...'}) await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 + assert looptime == 1.23 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2nd cycle: cancelling after the backoff is reached. Wait for cancellation timeout. mocker.resetall() - frozen_time.tick(5) # backoff time or slightly above it await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 10.0 + assert looptime == 1.23 + 4.56 # i.e. it really spent all the timeout assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 3rd cycle: the daemon has exited, the resource should be unblocked from actual deletion. mocker.resetall() - frozen_time.tick(1) # any time below timeout dummy.steps['finish'].set() - await asyncio.sleep(0) + await asyncio.sleep(0) # let the daemon to exit and all the routines to trigger await simulate_cycle(event_object) await dummy.wait_for_daemon_done() - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1.23 + 4.56 # i.e. not additional sleeps happened assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] async def test_daemon_is_abandoned_due_to_cancellation_timeout_reached( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_timeout=4.56) async def fn(**kwargs): - dummy.kwargs = kwargs + dummy.mock(**kwargs) dummy.steps['called'].set() try: await dummy.steps['finish'].wait() # this one is cancelled. @@ -232,18 +220,17 @@ async def fn(**kwargs): event_object.setdefault('metadata', {}).update({'deletionTimestamp': '...'}) await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 10.0 + assert looptime == 4.56 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2rd cycle: the daemon has exited, the resource should be unblocked from actual deletion. mocker.resetall() - frozen_time.tick(50) + await asyncio.sleep(1000) # unnecessary, but let's fast-forward time just in case with pytest.warns(ResourceWarning, match=r"Daemon .+ did not exit in time"): await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1000 + 4.56 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] assert_logs(["Daemon 'fn' did not exit in time. Leaving it orphaned."]) diff --git a/tests/handling/daemons/test_timer_errors.py b/tests/handling/daemons/test_timer_errors.py index 934ff4fe..bd69e737 100644 --- a/tests/handling/daemons/test_timer_errors.py +++ b/tests/handling/daemons/test_timer_errors.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -5,26 +6,20 @@ async def test_timer_stopped_on_permanent_error( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.timer(*resource, id='fn', backoff=0.01, interval=1.0) + @kopf.timer(*resource, id='fn', backoff=1.23, interval=999) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) raise PermanentError("boo!") event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. retry) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 # no intervals used, as there were no retries assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 assert_logs([ "Timer 'fn' failed permanently: boo!", @@ -34,26 +29,20 @@ async def fn(**kwargs): async def test_timer_stopped_on_arbitrary_errors_with_mode_permanent( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.timer(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=0.01, interval=1.0) + @kopf.timer(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=1.23, interval=999) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) raise Exception("boo!") event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. retry) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 # no intervals used, as there were no retries assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 assert_logs([ "Timer 'fn' failed with an exception and will stop now: boo!", @@ -63,32 +52,23 @@ async def fn(**kwargs): async def test_timer_retried_on_temporary_error( - settings, resource, dummy, manual_time, - caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.timer(*resource, id='fn', backoff=1.0, interval=1.0) + @kopf.timer(*resource, id='fn', backoff=1.23, interval=2.34) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: - raise TemporaryError("boo!", delay=1.0) + raise TemporaryError("boo!", delay=3.45) else: - kwargs['stopped']._setter.set() # to exit the cycle - dummy.steps['finish'].set() + finished.set() event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 2 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # interval - + assert looptime == 3.45 assert_logs([ "Timer 'fn' failed temporarily: boo!", "Timer 'fn' succeeded.", @@ -96,78 +76,73 @@ async def fn(retry, **kwargs): async def test_timer_retried_on_arbitrary_error_with_mode_temporary( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.timer(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.0, interval=1.0) + @kopf.timer(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.23, interval=2.34) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: raise Exception("boo!") else: - kwargs['stopped']._setter.set() # to exit the cycle - dummy.steps['finish'].set() + finished.set() event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 2 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # interval - + assert looptime == 1.23 assert_logs([ - "Timer 'fn' failed with an exception and will try again in 1.0 seconds: boo!", + "Timer 'fn' failed with an exception and will try again in 1.23 seconds: boo!", "Timer 'fn' succeeded.", ]) async def test_timer_retried_until_retries_limit( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.timer(*resource, id='fn', retries=3, interval=1.0) + @kopf.timer(*resource, id='fn', retries=3, interval=2.34) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - if dummy.mock.call_count >= 5: - kwargs['stopped']._setter.set() # to exit the cycle - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=3.45) - await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} + await simulate_cycle(event_object) + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count >= 3 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[2][0][0] == 1.0 # interval + assert looptime == 6.9 # 2*3.45 -- 2 sleeps between 3 attempts + assert dummy.mock.call_count == 3 async def test_timer_retried_until_timeout( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.timer(*resource, id='fn', timeout=3.0, interval=1.0) + @kopf.timer(*resource, id='fn', timeout=10.0, interval=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - if dummy.mock.call_count >= 5: - kwargs['stopped']._setter.set() # to exit the cycle - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=3.45) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} + await simulate_cycle(event_object) + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) + + assert looptime == 6.9 # 2*3.45 -- 2 sleeps between 3 attempts + assert dummy.mock.call_count == 3 + - assert k8s_mocked.sleep.call_count >= 3 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[2][0][0] == 1.0 # interval +# TODO: next steps: +# extract the PR with test refactoring, where it is all simplified: +# - "dummy" is converted to Condition, not just Events. +# - in all daemons & timers, not only here! diff --git a/tests/handling/daemons/test_timer_filtration.py b/tests/handling/daemons/test_timer_filtration.py index c2979188..84b09ca8 100644 --- a/tests/handling/daemons/test_timer_filtration.py +++ b/tests/handling/daemons/test_timer_filtration.py @@ -1,3 +1,4 @@ +import asyncio import logging import pytest @@ -11,21 +12,22 @@ async def test_timer_filtration_satisfied( settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.timer(*resource, id='fn', labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'}, 'annotations': {'x': 'value', 'y': '...'}, 'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_body) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + assert dummy.mock.call_count == 1 @pytest.mark.parametrize('labels, annotations', [ @@ -54,6 +56,7 @@ async def fn(**kwargs): 'annotations': annotations, 'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_body) + await asyncio.sleep(123) # give it enough time to do something when nothing is expected assert spawn_daemons.called assert spawn_daemons.call_args_list[0][1]['handlers'] == [] diff --git a/tests/handling/daemons/test_timer_intervals.py b/tests/handling/daemons/test_timer_intervals.py index 637234c5..bd064a4d 100644 --- a/tests/handling/daemons/test_timer_intervals.py +++ b/tests/handling/daemons/test_timer_intervals.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -6,49 +7,41 @@ async def test_timer_regular_interval( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, frozen_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0, sharp=False) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - frozen_time.tick(0.3) - if dummy.mock.call_count >= 2: - dummy.steps['finish'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + await asyncio.sleep(0.3) # simulate a slow operation await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait() + await trigger.wait() assert dummy.mock.call_count == 2 - assert k8s_mocked.sleep.call_count == 2 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 + assert looptime == 1.3 async def test_timer_sharp_interval( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, frozen_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0, sharp=True) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - frozen_time.tick(0.3) - if dummy.mock.call_count >= 2: - dummy.steps['finish'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + await asyncio.sleep(0.3) # simulate a slow operation await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: dummy.mock.call_count >= 2) assert dummy.mock.call_count == 2 - assert k8s_mocked.sleep.call_count == 2 - assert 0.7 <= k8s_mocked.sleep.call_args_list[0][0][0] < 0.71 - assert 0.7 <= k8s_mocked.sleep.call_args_list[1][0][0] < 0.71 + assert looptime == 1 # not 1.3! diff --git a/tests/handling/daemons/test_timer_triggering.py b/tests/handling/daemons/test_timer_triggering.py index ed074979..c203c4af 100644 --- a/tests/handling/daemons/test_timer_triggering.py +++ b/tests/handling/daemons/test_timer_triggering.py @@ -1,48 +1,44 @@ +import asyncio import logging import kopf async def test_timer_is_spawned_at_least_once( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() await simulate_cycle({}) - await dummy.steps['called'].wait() + async with trigger: + await trigger.wait() + await trigger.wait() - assert dummy.mock.call_count == 1 - assert dummy.kwargs['retry'] == 0 - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 - - await dummy.wait_for_daemon_done() + assert looptime == 1 + assert dummy.mock.call_count == 2 async def test_timer_initial_delay_obeyed( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', initial_delay=5.0, interval=1.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() await simulate_cycle({}) - await dummy.steps['called'].wait() - - assert dummy.mock.call_count == 1 - assert dummy.kwargs['retry'] == 0 - assert k8s_mocked.sleep.call_count == 2 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 + async with trigger: + await trigger.wait() + await trigger.wait() - await dummy.wait_for_daemon_done() + assert looptime == 6 + assert dummy.mock.call_count == 2