Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Create webhooks only resync #1225

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.15.0 (2024-12-12)


### Features

- Added new webhooks only event listener mode. This event listener handles only webhook invocations and raises error once used for resync.


## 0.14.7 (2024-12-09)


Expand Down
15 changes: 12 additions & 3 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from port_ocean.ocean import Ocean
from port_ocean.clients.port.client import PortClient

from loguru import logger


class PortOceanContext:
def __init__(self, app: Union["Ocean", None]) -> None:
Expand Down Expand Up @@ -63,14 +65,21 @@ def port_client(self) -> "PortClient":
return self.app.port_client

@property
def event_listener_type(self) -> Literal["WEBHOOK", "KAFKA", "POLLING", "ONCE"]:
def event_listener_type(
self,
) -> Literal["WEBHOOK", "KAFKA", "POLLING", "ONCE", "WEBHOOKS_ONLY"]:
return self.app.config.event_listener.type

def on_resync(
self,
kind: str | None = None,
) -> Callable[[RESYNC_EVENT_LISTENER], RESYNC_EVENT_LISTENER]:
def wrapper(function: RESYNC_EVENT_LISTENER) -> RESYNC_EVENT_LISTENER:
) -> Callable[[RESYNC_EVENT_LISTENER], RESYNC_EVENT_LISTENER | None]:
def wrapper(function: RESYNC_EVENT_LISTENER) -> RESYNC_EVENT_LISTENER | None:
if self.app.config.event_listener.type == "WEBHOOKS_ONLY":
logger.debug(
"Webhook only event listener is used, resync events are ignored"
)
return None
Comment on lines +79 to +82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this?

return self.integration.on_resync(function, kind)

return wrapper
Expand Down
8 changes: 8 additions & 0 deletions port_ocean/core/event_listener/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
OnceEventListener,
)

from port_ocean.core.event_listener.webhooks_only import (
WebhooksOnlyEventListener,
WebhooksOnlyEventListenerSettings,
)


EventListenerSettingsType = (
HttpEventListenerSettings
| KafkaEventListenerSettings
| PollingEventListenerSettings
| OnceEventListenerSettings
| WebhooksOnlyEventListenerSettings
)

__all__ = [
Expand All @@ -34,4 +40,6 @@
"PollingEventListenerSettings",
"OnceEventListener",
"OnceEventListenerSettings",
"WebhooksOnlyEventListener",
"WebhooksOnlyEventListenerSettings",
]
10 changes: 9 additions & 1 deletion port_ocean/core/event_listener/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
BaseEventListener,
EventListenerEvents,
)
from port_ocean.core.event_listener.webhooks_only import (
WebhooksOnlyEventListener,
WebhooksOnlyEventListenerSettings,
)
from port_ocean.exceptions.core import UnsupportedEventListenerTypeException


Expand Down Expand Up @@ -88,7 +92,11 @@ async def create_event_listener(self) -> BaseEventListener:
config, OnceEventListenerSettings
), assert_message.format(type(config))
event_listener = OnceEventListener(wrapped_events, config)

case "webhooks_only":
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we manage those consts from one place?

assert isinstance(
config, WebhooksOnlyEventListenerSettings
), assert_message.format(type(config))
event_listener = WebhooksOnlyEventListener(wrapped_events, config)
case _:
raise UnsupportedEventListenerTypeException(
f"Event listener {_type} not supported"
Expand Down
43 changes: 43 additions & 0 deletions port_ocean/core/event_listener/webhooks_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any, Literal

from loguru import logger

from port_ocean.core.event_listener.base import (
BaseEventListener,
EventListenerEvents,
EventListenerSettings,
)


class WebhooksOnlyEventListenerSettings(EventListenerSettings):
"""
This class inherits from `EventListenerSettings`, which provides a foundation for creating event listener settings.
"""

type: Literal["WEBHOOKS_ONLY"]

def to_request(self) -> dict[str, Any]:
return {}


matan84 marked this conversation as resolved.
Show resolved Hide resolved
class WebhooksOnlyEventListener(BaseEventListener):
"""
No resync event listener.

It is used to handle events exclusively through webhooks without supporting resync events.

Parameters:
events (EventListenerEvents): A dictionary containing event types and their corresponding event handlers.
event_listener_config (OnceEventListenerSettings): The event listener configuration settings.
"""

def __init__(
self,
events: EventListenerEvents,
event_listener_config: WebhooksOnlyEventListenerSettings,
):
super().__init__(events)
self.event_listener_config = event_listener_config

async def _start(self) -> None:
logger.info("Starting Webhooks-only event listener.")
matan84 marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 5 additions & 1 deletion port_ocean/core/integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ async def start(self) -> None:
"""
Initializes handlers, establishes integration at the specified port, and starts the event listener.
"""
logger.info("Starting integration", integration_type=self.context.config.integration.type)
logger.info(
"Starting integration",
integration_type=self.context.config.integration.type,
)
if self.started:
raise IntegrationAlreadyStartedException("Integration already started")

if (
not self.event_strategy["resync"]
and self.__class__._on_resync == BaseIntegration._on_resync
and self.context.event_listener_type != "WEBHOOKS_ONLY"
matan84 marked this conversation as resolved.
Show resolved Hide resolved
):
raise NotImplementedError("on_resync is not implemented")

Expand Down
9 changes: 7 additions & 2 deletions port_ocean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ def run(
# Override config with arguments
if initialize_port_resources is not None:
app.config.initialize_port_resources = initialize_port_resources

initialize_defaults(app.integration.AppConfigHandlerClass.CONFIG_CLASS, app.config)
if (
app.integration.event_listener_factory.context.event_listener_type
!= "WEBHOOKS_ONLY"
matan84 marked this conversation as resolved.
Show resolved Hide resolved
):
initialize_defaults(
app.integration.AppConfigHandlerClass.CONFIG_CLASS, app.config
)

uvicorn.run(app, host="0.0.0.0", port=application_settings.port)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.14.7"
version = "0.15.0"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down
Loading