Skip to content

Commit

Permalink
feat: use resync function from BaseEventListener
Browse files Browse the repository at this point in the history
  • Loading branch information
Shalev Avhar committed Aug 14, 2024
1 parent d97228e commit 54b4dcb
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
11 changes: 11 additions & 0 deletions port_ocean/core/event_listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ async def _after_resync(self) -> None:
"""
await ocean.app.update_state_after_scheduled_sync()

async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
await self.events["on_resync"](resync_args)
await self._after_resync()


class EventListenerSettings(BaseOceanModel, extra=Extra.allow):
type: str
Expand Down
4 changes: 1 addition & 3 deletions port_ocean/core/event_listener/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ async def _start(self) -> None:

@target_channel_router.post("/resync")
async def resync() -> None:
await self._before_resync()
await self.events["on_resync"]({})
await self._after_resync()
await self._resync({})

ocean.app.fast_api_app.include_router(target_channel_router)
4 changes: 1 addition & 3 deletions port_ocean/core/event_listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ async def _handle_message(self, raw_msg: Message) -> None:

if "change.log" in topic and message is not None:
try:
await self._before_resync()
await self.events["on_resync"](message)
await self._after_resync()
await self._resync(message)
except Exception as e:
_type, _, tb = sys.exc_info()
logger.opt(exception=(_type, None, tb)).error(
Expand Down
4 changes: 1 addition & 3 deletions port_ocean/core/event_listener/once.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ async def _start(self) -> None:
async def resync_and_exit() -> None:
logger.info("Once event listener started")
try:
await self._before_resync()
await self.events["on_resync"]({})
await self._after_resync()
await self._resync({})
except Exception:
# we catch all exceptions here to make sure the application will exit gracefully
logger.exception("Error occurred while resyncing")
Expand Down
4 changes: 1 addition & 3 deletions port_ocean/core/event_listener/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ async def resync() -> None:
last_updated_at = integration["updatedAt"]

if self.should_resync(last_updated_at):
await self._before_resync()
logger.info("Detected change in integration, resyncing")
ocean.app.last_integration_updated_at = last_updated_at
running_task: Task[Any] = get_event_loop().create_task(
self.events["on_resync"]({}) # type: ignore
self._resync({}) # type: ignore
)
signal_handler.register(running_task.cancel)

await running_task
await self._after_resync()

# Execute resync repeatedly task
await resync()

0 comments on commit 54b4dcb

Please sign in to comment.