Skip to content

Commit

Permalink
[Integration][Gitlab] Handle cases where handling event takes more th…
Browse files Browse the repository at this point in the history
…an the allowed timeout (#1326)
  • Loading branch information
No0b1t0 authored Jan 15, 2025
1 parent d3fabb1 commit d21ea6c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
7 changes: 7 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.2.19 (2025-01-15)
===================

### Improvements

- Event handler now retries handling events if it takes too long to complete

0.2.18 (2025-01-15)
===================

Expand Down
47 changes: 36 additions & 11 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
event_context,
EventContext,
)
import time

Observer = Callable[[str, dict[str, Any]], Awaitable[Any]]

Expand All @@ -29,7 +30,10 @@ async def _start_event_processor(self) -> None:
logger.info(f"Started {self.__class__.__name__} worker")
while True:
event_ctx, event_id, body = await self.webhook_tasks_queue.get()
logger.debug(f"Retrieved event: {event_id} from Queue, notifying observers")
logger.debug(
f"Retrieved event: {event_id} from Queue, notifying observers",
queue_size=self.webhook_tasks_queue.qsize(),
)
try:
async with event_context(
"gitlab_http_event_async_worker", parent_override=event_ctx
Expand Down Expand Up @@ -69,6 +73,9 @@ async def notify(self, event_id: str, body: dict[str, Any]) -> None:


class EventHandler(BaseEventHandler):
MAXIMUM_RETRIES = 3
TIMEOUT = 90

def __init__(self) -> None:
super().__init__()
self._observers: dict[str, list[Observer]] = defaultdict(list)
Expand All @@ -85,20 +92,38 @@ async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
)
return
for observer in observers_list:
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
retries_left = self.MAXIMUM_RETRIES
observer_time = time.time()
while retries_left > 0:
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id} at {observer_time}",
event_id=event_id,
handler=handler,
)
await asyncio.wait_for(
observer(event_id, body), self.TIMEOUT
) # Sequentially call each observer
logger.debug(
f"Notifying observer: {handler}, for event: {event_id}",
f"Observer {handler} completed work at {time.time() - observer_time}",
event_id=event_id,
handler=handler,
)
await observer(event_id, body) # Sequentially call each observer
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {str(e)}"
)
break
except asyncio.TimeoutError:
logger.error(
f"{handler} started work at {observer_time}, did not complete handling event {event_id} within {self.TIMEOUT} seconds, retrying"
)
retries_left -= 1
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {e}",
exc_info=True,
)
break


class SystemEventHandler(BaseEventHandler):
Expand Down
2 changes: 1 addition & 1 deletion integrations/gitlab/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gitlab"
version = "0.2.18"
version = "0.2.19"
description = "Gitlab integration for Port using Port-Ocean Framework"
authors = ["Yair Siman-Tov <[email protected]>"]

Expand Down

0 comments on commit d21ea6c

Please sign in to comment.