diff --git a/CHANGELOG.md b/CHANGELOG.md index 5926a9c8af..d640ceabea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.10.0 (2024-08-19) + +### Improvements + +- Add support for reporting the integration resync state to expose more information about the integration state in the portal + + ## 0.9.14 (2024-08-19) diff --git a/port_ocean/clients/port/client.py b/port_ocean/clients/port/client.py index 1a805bfd79..2b323f139a 100644 --- a/port_ocean/clients/port/client.py +++ b/port_ocean/clients/port/client.py @@ -13,6 +13,7 @@ get_internal_http_client, ) from port_ocean.exceptions.clients import KafkaCredentialsNotFound +from typing import Any class PortClient( @@ -75,3 +76,19 @@ async def get_org_id(self) -> str: handle_status_code(response) return response.json()["organization"]["id"] + + async def update_integration_state( + self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True + ) -> dict[str, Any]: + if should_log: + logger.debug(f"Updating integration resync state with: {state}") + response = await self.client.patch( + f"{self.api_url}/integration/{self.integration_identifier}/resync-state", + headers=await self.auth.headers(), + json=state, + ) + handle_status_code(response, should_raise, should_log) + if response.is_success and should_log: + logger.info("Integration resync state updated successfully") + + return response.json().get("integration", {}) diff --git a/port_ocean/config/settings.py b/port_ocean/config/settings.py index 1955ed7364..1de4829211 100644 --- a/port_ocean/config/settings.py +++ b/port_ocean/config/settings.py @@ -76,7 +76,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow): integration: IntegrationSettings = Field( default_factory=lambda: IntegrationSettings(type="", identifier="") ) - runtime: Runtime = "OnPrem" + runtime: Runtime = Runtime.OnPrem @root_validator() def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]: @@ -100,8 +100,8 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel: return values @validator("runtime") - def validate_runtime(cls, runtime: Literal["OnPrem", "Saas"]) -> Runtime: - if runtime == "Saas": + def validate_runtime(cls, runtime: Runtime) -> Runtime: + if runtime == Runtime.Saas: spec = get_spec_file() if spec is None: raise ValueError( diff --git a/port_ocean/core/event_listener/base.py b/port_ocean/core/event_listener/base.py index cf3934764e..04c43d87f1 100644 --- a/port_ocean/core/event_listener/base.py +++ b/port_ocean/core/event_listener/base.py @@ -5,6 +5,8 @@ from port_ocean.config.base import BaseOceanModel from port_ocean.utils.signal import signal_handler +from port_ocean.context.ocean import ocean +from port_ocean.utils.misc import IntegrationStateStatus class EventListenerEvents(TypedDict): @@ -36,6 +38,41 @@ def _stop(self) -> None: """ pass + async def _before_resync(self) -> None: + """ + Can be used for event listeners that need to perform some action before resync. + """ + await ocean.app.resync_state_updater.update_before_resync() + + async def _after_resync(self) -> None: + """ + Can be used for event listeners that need to perform some action after resync. + """ + await ocean.app.resync_state_updater.update_after_resync() + + async def _on_resync_failure(self, e: Exception) -> None: + """ + Can be used for event listeners that need to handle resync failures. + """ + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed + ) + + async def _resync( + self, + resync_args: dict[Any, Any], + ) -> None: + """ + Triggers the "on_resync" event. + """ + await self._before_resync() + try: + await self.events["on_resync"](resync_args) + await self._after_resync() + except Exception as e: + await self._on_resync_failure(e) + raise e + class EventListenerSettings(BaseOceanModel, extra=Extra.allow): type: str diff --git a/port_ocean/core/event_listener/http.py b/port_ocean/core/event_listener/http.py index 69083daa00..ca9bd4a3cb 100644 --- a/port_ocean/core/event_listener/http.py +++ b/port_ocean/core/event_listener/http.py @@ -64,6 +64,6 @@ async def _start(self) -> None: @target_channel_router.post("/resync") async def resync() -> None: - await self.events["on_resync"]({}) + await self._resync({}) ocean.app.fast_api_app.include_router(target_channel_router) diff --git a/port_ocean/core/event_listener/kafka.py b/port_ocean/core/event_listener/kafka.py index ba93915266..f9c749e767 100644 --- a/port_ocean/core/event_listener/kafka.py +++ b/port_ocean/core/event_listener/kafka.py @@ -122,7 +122,7 @@ async def _handle_message(self, raw_msg: Message) -> None: if "change.log" in topic and message is not None: try: - await self.events["on_resync"](message) + await self._resync(message) except Exception as e: _type, _, tb = sys.exc_info() logger.opt(exception=(_type, None, tb)).error( diff --git a/port_ocean/core/event_listener/once.py b/port_ocean/core/event_listener/once.py index 15154a4c35..9952b19f32 100644 --- a/port_ocean/core/event_listener/once.py +++ b/port_ocean/core/event_listener/once.py @@ -1,3 +1,4 @@ +import datetime import signal from typing import Literal, Any @@ -9,6 +10,9 @@ EventListenerSettings, ) from port_ocean.utils.repeat import repeat_every +from port_ocean.context.ocean import ocean +from port_ocean.utils.time import convert_str_to_utc_datetime, convert_to_minutes +from port_ocean.utils.misc import IntegrationStateStatus class OnceEventListenerSettings(EventListenerSettings): @@ -41,6 +45,97 @@ def __init__( ): super().__init__(events) self.event_listener_config = event_listener_config + self.cached_integration: dict[str, Any] | None = None + + async def get_current_integration_cached(self) -> dict[str, Any]: + if self.cached_integration: + return self.cached_integration + + self.cached_integration = await ocean.port_client.get_current_integration() + return self.cached_integration + + async def get_saas_resync_initialization_and_interval( + self, + ) -> tuple[int | None, datetime.datetime | None]: + """ + Get the scheduled resync interval and the last updated time of the integration config for the saas application. + interval is the saas configured resync interval time. + start_time is the last updated time of the integration config. + return: (interval, start_time) + """ + if not ocean.app.is_saas(): + return (None, None) + + try: + integration = await self.get_current_integration_cached() + except Exception as e: + logger.exception(f"Error occurred while getting current integration {e}") + return (None, None) + + interval_str = ( + integration.get("spec", {}) + .get("appSpec", {}) + .get("scheduledResyncInterval") + ) + + if not interval_str: + logger.error( + "Unexpected: scheduledResyncInterval not found for Saas integration, Cannot predict the next resync" + ) + return (None, None) + + last_updated_saas_integration_config_str = integration.get( + "statusInfo", {} + ).get("updatedAt") + + # we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal + if not last_updated_saas_integration_config_str: + logger.error( + "Unexpected: updatedAt not found for Saas integration, Cannot predict the next resync" + ) + return (None, None) + + return ( + convert_to_minutes(interval_str), + convert_str_to_utc_datetime(last_updated_saas_integration_config_str), + ) + + async def _before_resync(self) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._before_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_before_resync(interval, start_time) + + async def _after_resync(self) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._after_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Completed, interval, start_time + ) + + async def _on_resync_failure(self, e: Exception) -> None: + if not ocean.app.is_saas(): + # in case of non-saas, we still want to update the state before and after the resync + await super()._after_resync() + return + + (interval, start_time) = ( + await self.get_saas_resync_initialization_and_interval() + ) + await ocean.app.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed, interval, start_time + ) async def _start(self) -> None: """ @@ -53,7 +148,7 @@ async def _start(self) -> None: async def resync_and_exit() -> None: logger.info("Once event listener started") try: - await self.events["on_resync"]({}) + await self._resync({}) except Exception: # we catch all exceptions here to make sure the application will exit gracefully logger.exception("Error occurred while resyncing") diff --git a/port_ocean/core/event_listener/polling.py b/port_ocean/core/event_listener/polling.py index 867c1aa439..c7fef4b8e0 100644 --- a/port_ocean/core/event_listener/polling.py +++ b/port_ocean/core/event_listener/polling.py @@ -49,7 +49,16 @@ def __init__( ): super().__init__(events) self.event_listener_config = event_listener_config - self._last_updated_at = None + + def should_resync(self, last_updated_at: str) -> bool: + _last_updated_at = ( + ocean.app.resync_state_updater.last_integration_state_updated_at + ) + + if _last_updated_at is None: + return self.event_listener_config.resync_on_start + + return _last_updated_at != last_updated_at async def _start(self) -> None: """ @@ -69,17 +78,12 @@ async def resync() -> None: integration = await ocean.app.port_client.get_current_integration() last_updated_at = integration["updatedAt"] - should_resync = ( - self._last_updated_at is not None - or self.event_listener_config.resync_on_start - ) and self._last_updated_at != last_updated_at - - if should_resync: + if self.should_resync(last_updated_at): logger.info("Detected change in integration, resyncing") - self._last_updated_at = last_updated_at - running_task: Task[Any] = get_event_loop().create_task( - self.events["on_resync"]({}) # type: ignore + ocean.app.resync_state_updater.last_integration_state_updated_at = ( + last_updated_at ) + running_task: Task[Any] = get_event_loop().create_task(self._resync({})) signal_handler.register(running_task.cancel) await running_task diff --git a/port_ocean/core/handlers/resync_state_updater/__init__.py b/port_ocean/core/handlers/resync_state_updater/__init__.py new file mode 100644 index 0000000000..162ff8a61b --- /dev/null +++ b/port_ocean/core/handlers/resync_state_updater/__init__.py @@ -0,0 +1,5 @@ +from .updater import ResyncStateUpdater + +__all__ = [ + "ResyncStateUpdater", +] diff --git a/port_ocean/core/handlers/resync_state_updater/updater.py b/port_ocean/core/handlers/resync_state_updater/updater.py new file mode 100644 index 0000000000..0e4c8b011c --- /dev/null +++ b/port_ocean/core/handlers/resync_state_updater/updater.py @@ -0,0 +1,84 @@ +import datetime +from typing import Any, Literal +from port_ocean.clients.port.client import PortClient +from port_ocean.utils.misc import IntegrationStateStatus +from port_ocean.utils.time import get_next_occurrence + + +class ResyncStateUpdater: + def __init__(self, port_client: PortClient, scheduled_resync_interval: int | None): + self.port_client = port_client + self.initiated_at = datetime.datetime.now(tz=datetime.timezone.utc) + self.scheduled_resync_interval = scheduled_resync_interval + + # This is used to differ between integration changes that require a full resync and state changes + # So that the polling event-listener can decide whether to perform a full resync or not + # TODO: remove this once we separate the state from the integration + self.last_integration_state_updated_at: str = "" + + def _calculate_next_scheduled_resync( + self, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> str | None: + if interval is None: + return None + return get_next_occurrence( + interval * 60, custom_start_time or self.initiated_at + ).isoformat() + + async def update_before_resync( + self, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> None: + _interval = interval or self.scheduled_resync_interval + nest_resync = self._calculate_next_scheduled_resync( + _interval, custom_start_time + ) + state: dict[str, Any] = { + "status": IntegrationStateStatus.Running.value, + "lastResyncEnd": None, + "lastResyncStart": datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat(), + "nextResync": nest_resync, + "intervalInMinuets": _interval, + } + + integration = await self.port_client.update_integration_state( + state, should_raise=False + ) + if integration: + self.last_integration_state_updated_at = integration["resyncState"][ + "updatedAt" + ] + + async def update_after_resync( + self, + status: Literal[ + IntegrationStateStatus.Completed, IntegrationStateStatus.Failed + ] = IntegrationStateStatus.Completed, + interval: int | None = None, + custom_start_time: datetime.datetime | None = None, + ) -> None: + _interval = interval or self.scheduled_resync_interval + nest_resync = self._calculate_next_scheduled_resync( + _interval, custom_start_time + ) + state: dict[str, Any] = { + "status": status.value, + "lastResyncEnd": datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat(), + "nextResync": nest_resync, + "intervalInMinuets": _interval, + } + + integration = await self.port_client.update_integration_state( + state, should_raise=False + ) + if integration: + self.last_integration_state_updated_at = integration["resyncState"][ + "updatedAt" + ] diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index 9535fa6c9d..8da040c912 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -1,11 +1,14 @@ from dataclasses import dataclass, field -from typing import Any, Literal +from enum import Enum +from typing import Any from pydantic import BaseModel from pydantic.fields import Field -Runtime = Literal["OnPrem", "Saas"] +class Runtime(Enum): + Saas = "Saas" + OnPrem = "OnPrem" class Entity(BaseModel): diff --git a/port_ocean/core/utils.py b/port_ocean/core/utils.py index ce26318d97..4adc576412 100644 --- a/port_ocean/core/utils.py +++ b/port_ocean/core/utils.py @@ -35,12 +35,13 @@ def is_same_entity(first_entity: Entity, second_entity: Entity) -> bool: async def validate_integration_runtime( - port_client: PortClient, requested_runtime: Runtime + port_client: PortClient, + requested_runtime: Runtime, ) -> None: logger.debug("Validating integration runtime") current_integration = await port_client.get_current_integration(should_raise=False) current_runtime = current_integration.get("installationType", "OnPrem") - if current_integration and current_runtime != requested_runtime: + if current_integration and current_runtime != requested_runtime.value: raise IntegrationRuntimeException( f"Invalid Runtime! Requested to run existing {current_runtime} integration in {requested_runtime} runtime." ) diff --git a/port_ocean/ocean.py b/port_ocean/ocean.py index 6c157a7b89..3c6827552a 100644 --- a/port_ocean/ocean.py +++ b/port_ocean/ocean.py @@ -9,6 +9,8 @@ from pydantic import BaseModel from starlette.types import Scope, Receive, Send +from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater +from port_ocean.core.models import Runtime from port_ocean.clients.port.client import PortClient from port_ocean.config.settings import ( IntegrationConfiguration, @@ -24,6 +26,7 @@ from port_ocean.utils.repeat import repeat_every from port_ocean.utils.signal import signal_handler from port_ocean.version import __integration_version__ +from port_ocean.utils.misc import IntegrationStateStatus class Ocean: @@ -63,16 +66,27 @@ def __init__( integration_class(ocean) if integration_class else BaseIntegration(ocean) ) + self.resync_state_updater = ResyncStateUpdater( + self.port_client, self.config.scheduled_resync_interval + ) + + def is_saas(self) -> bool: + return self.config.runtime == Runtime.Saas + async def _setup_scheduled_resync( self, ) -> None: - def execute_resync_all() -> None: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - + async def execute_resync_all() -> None: + await self.resync_state_updater.update_before_resync() logger.info("Starting a new scheduled resync") - loop.run_until_complete(self.integration.sync_raw_all()) - loop.close() + try: + await self.integration.sync_raw_all() + await self.resync_state_updater.update_after_resync() + except Exception as e: + await self.resync_state_updater.update_after_resync( + IntegrationStateStatus.Failed + ) + raise e interval = self.config.scheduled_resync_interval if interval is not None: @@ -83,7 +97,11 @@ def execute_resync_all() -> None: seconds=interval * 60, # Not running the resync immediately because the event listener should run resync on startup wait_first=True, - )(lambda: threading.Thread(target=execute_resync_all).start()) + )( + lambda: threading.Thread( + target=lambda: asyncio.run(execute_resync_all()) + ).start() + ) await repeated_function() async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: diff --git a/port_ocean/utils/misc.py b/port_ocean/utils/misc.py index c21cccc577..59029144b1 100644 --- a/port_ocean/utils/misc.py +++ b/port_ocean/utils/misc.py @@ -1,3 +1,4 @@ +from enum import Enum import inspect from importlib.util import spec_from_file_location, module_from_spec from pathlib import Path @@ -5,11 +6,16 @@ from types import ModuleType from typing import Callable, Any from uuid import uuid4 - import tomli import yaml +class IntegrationStateStatus(Enum): + Running = "running" + Failed = "failed" + Completed = "completed" + + def get_time(seconds_precision: bool = True) -> float: """Return current time as Unix/Epoch timestamp, in seconds. :param seconds_precision: if True, return with seconds precision as integer (default). diff --git a/port_ocean/utils/time.py b/port_ocean/utils/time.py new file mode 100644 index 0000000000..4b3553aeb4 --- /dev/null +++ b/port_ocean/utils/time.py @@ -0,0 +1,54 @@ +import datetime +from loguru import logger + + +def convert_str_to_utc_datetime(time_str: str) -> datetime.datetime | None: + """ + Convert a string representing time to a datetime object. + :param time_str: a string representing time in the format "2021-09-01T12:00:00Z" + """ + aware_date = datetime.datetime.fromisoformat(time_str) + if time_str.endswith("Z"): + aware_date = datetime.datetime.fromisoformat(time_str.replace("Z", "+00:00")) + return aware_date.astimezone(datetime.timezone.utc) + + +def convert_to_minutes(s: str) -> int: + minutes_per_unit = {"s": 1 / 60, "m": 1, "h": 60, "d": 1440, "w": 10080} + try: + return int(int(s[:-1]) * minutes_per_unit[s[-1]]) + except Exception: + logger.error(f"Failed converting string to minutes, {s}") + raise ValueError( + f"Invalid format. Expected a string ending with {minutes_per_unit.keys()}" + ) + + +def get_next_occurrence( + interval_seconds: int, + start_time: datetime.datetime, + now: datetime.datetime | None = None, +) -> datetime.datetime: + """ + Predict the next occurrence of an event based on interval, start time, and current time. + + :param interval_minutes: Interval between occurrences in minutes. + :param start_time: Start time of the event as a datetime object. + :param now: Current time as a datetime object. + :return: The next occurrence time as a datetime object. + """ + + if now is None: + now = datetime.datetime.now(tz=datetime.timezone.utc) + # Calculate the total seconds elapsed since the start time + elapsed_seconds = (now - start_time).total_seconds() + + # Calculate the number of intervals that have passed + intervals_passed = int(elapsed_seconds // interval_seconds) + + # Calculate the next occurrence time + next_occurrence = start_time + datetime.timedelta( + seconds=(intervals_passed + 1) * interval_seconds + ) + + return next_occurrence diff --git a/pyproject.toml b/pyproject.toml index 638b265ea1..c0bf6bab0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.9.14" +version = "0.10.0" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"