Skip to content

Commit

Permalink
[Core] Create webhooks only resync (#1225)
Browse files Browse the repository at this point in the history
# Description

What - Added new event listener type- Webhooks only

Why - In order to enable 2 different integration types

How - Added new event listener without resync logic

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Tom Tankilevitch <[email protected]>
  • Loading branch information
matan84 and Tankilevitch authored Dec 31, 2024
1 parent 504e3a6 commit 82a0c7d
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 7 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.17.0 (2024-12-31)


### Features

- Added new webhooks only event listener mode. This event listener handles only webhook invocations and raises error once used for resync.


## 0.16.1 (2024-12-25)

### Bug Fixes
Expand Down
15 changes: 12 additions & 3 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from port_ocean.ocean import Ocean
from port_ocean.clients.port.client import PortClient

from loguru import logger


class PortOceanContext:
def __init__(self, app: Union["Ocean", None]) -> None:
Expand Down Expand Up @@ -63,14 +65,21 @@ def port_client(self) -> "PortClient":
return self.app.port_client

@property
def event_listener_type(self) -> Literal["WEBHOOK", "KAFKA", "POLLING", "ONCE"]:
def event_listener_type(
self,
) -> Literal["WEBHOOK", "KAFKA", "POLLING", "ONCE", "WEBHOOKS_ONLY"]:
return self.app.config.event_listener.type

def on_resync(
self,
kind: str | None = None,
) -> Callable[[RESYNC_EVENT_LISTENER], RESYNC_EVENT_LISTENER]:
def wrapper(function: RESYNC_EVENT_LISTENER) -> RESYNC_EVENT_LISTENER:
) -> Callable[[RESYNC_EVENT_LISTENER], RESYNC_EVENT_LISTENER | None]:
def wrapper(function: RESYNC_EVENT_LISTENER) -> RESYNC_EVENT_LISTENER | None:
if not self.app.config.event_listener.should_resync:
logger.debug(
"Webhook only event listener is used, resync events are ignored"
)
return None
return self.integration.on_resync(function, kind)

return wrapper
Expand Down
8 changes: 8 additions & 0 deletions port_ocean/core/event_listener/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
OnceEventListener,
)

from port_ocean.core.event_listener.webhooks_only import (
WebhooksOnlyEventListener,
WebhooksOnlyEventListenerSettings,
)


EventListenerSettingsType = (
HttpEventListenerSettings
| KafkaEventListenerSettings
| PollingEventListenerSettings
| OnceEventListenerSettings
| WebhooksOnlyEventListenerSettings
)

__all__ = [
Expand All @@ -34,4 +40,6 @@
"PollingEventListenerSettings",
"OnceEventListener",
"OnceEventListenerSettings",
"WebhooksOnlyEventListener",
"WebhooksOnlyEventListenerSettings",
]
1 change: 1 addition & 0 deletions port_ocean/core/event_listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async def _resync(

class EventListenerSettings(BaseOceanModel, extra=Extra.allow):
type: str
should_resync: bool = True

def to_request(self) -> dict[str, Any]:
"""
Expand Down
10 changes: 9 additions & 1 deletion port_ocean/core/event_listener/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
BaseEventListener,
EventListenerEvents,
)
from port_ocean.core.event_listener.webhooks_only import (
WebhooksOnlyEventListener,
WebhooksOnlyEventListenerSettings,
)
from port_ocean.exceptions.core import UnsupportedEventListenerTypeException


Expand Down Expand Up @@ -88,7 +92,11 @@ async def create_event_listener(self) -> BaseEventListener:
config, OnceEventListenerSettings
), assert_message.format(type(config))
event_listener = OnceEventListener(wrapped_events, config)

case "webhooks_only":
assert isinstance(
config, WebhooksOnlyEventListenerSettings
), assert_message.format(type(config))
event_listener = WebhooksOnlyEventListener(wrapped_events, config)
case _:
raise UnsupportedEventListenerTypeException(
f"Event listener {_type} not supported"
Expand Down
41 changes: 41 additions & 0 deletions port_ocean/core/event_listener/webhooks_only.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Literal

from loguru import logger

from port_ocean.core.event_listener.base import (
BaseEventListener,
EventListenerEvents,
EventListenerSettings,
)


class WebhooksOnlyEventListenerSettings(EventListenerSettings):
"""
This class inherits from `EventListenerSettings`, which provides a foundation for creating event listener settings.
"""

type: Literal["WEBHOOKS_ONLY"]
should_resync: bool = False


class WebhooksOnlyEventListener(BaseEventListener):
"""
No resync event listener.
It is used to handle events exclusively through webhooks without supporting resync events.
Parameters:
events (EventListenerEvents): A dictionary containing event types and their corresponding event handlers.
event_listener_config (OnceEventListenerSettings): The event listener configuration settings.
"""

def __init__(
self,
events: EventListenerEvents,
event_listener_config: WebhooksOnlyEventListenerSettings,
):
super().__init__(events)
self.event_listener_config = event_listener_config

async def _start(self) -> None:
logger.info("Starting Webhooks-only event listener")
6 changes: 5 additions & 1 deletion port_ocean/core/integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ async def start(self) -> None:
"""
Initializes handlers, establishes integration at the specified port, and starts the event listener.
"""
logger.info("Starting integration", integration_type=self.context.config.integration.type)
logger.info(
"Starting integration",
integration_type=self.context.config.integration.type,
)
if self.started:
raise IntegrationAlreadyStartedException("Integration already started")

if (
not self.event_strategy["resync"]
and self.__class__._on_resync == BaseIntegration._on_resync
and self.context.config.event_listener.should_resync
):
raise NotImplementedError("on_resync is not implemented")

Expand Down
1 change: 0 additions & 1 deletion port_ocean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def run(
# Override config with arguments
if initialize_port_resources is not None:
app.config.initialize_port_resources = initialize_port_resources

initialize_defaults(app.integration.AppConfigHandlerClass.CONFIG_CLASS, app.config)

uvicorn.run(app, host="0.0.0.0", port=application_settings.port)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.16.1"
version = "0.17.0"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit 82a0c7d

Please sign in to comment.