Skip to content

Commit

Permalink
Merge pull request #963 from wjsi/enh/handle_watch_req
Browse files Browse the repository at this point in the history
Add retry after receiving `too many requests` error from kubernetes
  • Loading branch information
nolar authored Feb 7, 2023
2 parents a7aa153 + a499244 commit b65aad0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ env

# VSCode
.vscode

# Idea / PyCharm
.idea
20 changes: 17 additions & 3 deletions kopf/_cogs/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import aiohttp

from kopf._cogs.aiokits import aiotasks, aiotoggles
from kopf._cogs.clients import api, fetching
from kopf._cogs.clients import api, errors, fetching
from kopf._cogs.configs import configuration
from kopf._cogs.structs import bodies, references

logger = logging.getLogger(__name__)

HTTP_TOO_MANY_REQUESTS_CODE = 429
DEFAULT_RETRY_DELAY_SECONDS = 1


class WatchingError(Exception):
"""
Expand Down Expand Up @@ -79,8 +82,19 @@ async def infinite_watch(
namespace=namespace,
operator_pause_waiter=operator_pause_waiter,
)
async for raw_event in stream:
yield raw_event
try:
async for raw_event in stream:
yield raw_event
except errors.APIClientError as ex:
if ex.code != HTTP_TOO_MANY_REQUESTS_CODE:
raise

retry_wait = ex.details.get("retryAfterSeconds") or DEFAULT_RETRY_DELAY_SECONDS
logger.warning(
f"Receiving `too many requests` error from server, will retry after "
f"{retry_wait} seconds. Error details: {ex}"
)
await asyncio.sleep(retry_wait)
await asyncio.sleep(settings.watching.reconnect_backoff)
finally:
logger.debug(f"Stopping the watch-stream for {resource} {where}.")
Expand Down
27 changes: 26 additions & 1 deletion tests/k8s/test_watching_infinitely.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from kopf._cogs.clients.errors import APIError
from kopf._cogs.clients.errors import APIClientError, APIError
from kopf._cogs.clients.watching import Bookmark, infinite_watch

STREAM_WITH_UNKNOWN_EVENT = [
Expand Down Expand Up @@ -63,3 +63,28 @@ async def test_infinite_watch_never_exits_normally(
assert events[2] == Bookmark.LISTED
assert events[3]['object']['spec'] == 'a'
assert events[4]['object']['spec'] == 'b'


async def test_too_many_requests_exception(
settings, resource, stream, namespace, enforced_session, mocker):

exc = APIClientError({
"apiVersion": "v1",
"code": 429,
"status": "Failure",
"details": {
"retryAfterSeconds": 1,
}
}, status=429)
enforced_session.request = mocker.Mock(side_effect=exc)
stream.feed([], namespace=namespace)
stream.close(namespace=namespace)

events = []
async for event in infinite_watch(settings=settings,
resource=resource,
namespace=namespace,
_iterations=1):
events.append(event)

assert len(events) == 0

0 comments on commit b65aad0

Please sign in to comment.