Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Added reading integration configuration from file #1254

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.15.3 (2024-12-19)

### Features

- Added capability to read configurations from a file, And periodically keep the integration's configuration updated with the file's configuration.


## 0.15.2 (2024-12-15)

### Improvements
Expand Down
23 changes: 20 additions & 3 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from typing import Any, Literal, Type, cast

from pydantic import Extra, AnyHttpUrl, parse_obj_as, parse_raw_as
import yaml
from loguru import logger
from pydantic import AnyHttpUrl, Extra, parse_obj_as, parse_raw_as
from pydantic.class_validators import root_validator, validator
from pydantic.env_settings import InitSettingsSource, EnvSettingsSource, BaseSettings
from pydantic.env_settings import BaseSettings, EnvSettingsSource, InitSettingsSource
from pydantic.fields import Field
from pydantic.main import BaseModel

from port_ocean.config.base import BaseOceanSettings, BaseOceanModel
from port_ocean.config.base import BaseOceanModel, BaseOceanSettings
from port_ocean.core.event_listener import EventListenerSettingsType
from port_ocean.core.models import Runtime
from port_ocean.utils.misc import get_integration_name, get_spec_file
Expand Down Expand Up @@ -60,6 +62,19 @@ def root_validator(cls, values: dict[str, Any]) -> dict[str, Any]:

return values

async def load_config_from_file(self, config_file_path: str) -> None:
logger.debug(f"Loading configuration from file: {config_file_path}")
try:
with open(config_file_path, "r") as f:
file_config = yaml.safe_load(f)

if isinstance(self.config, dict):
self.config = {**self.config, **file_config}
else:
self.config = {**self.config.dict(), **file_config}
except Exception as e:
raise ValueError(f"Failed to load configuration from file: {e}")


class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
_integration_config_model: BaseModel | None = None
Expand All @@ -69,6 +84,8 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
scheduled_resync_interval: int | None = None
client_timeout: int = 60
send_raw_data_examples: bool = True
config_file_path: str | None = None
config_reload_interval: int = 10
port: PortSettings
event_listener: EventListenerSettingsType = Field(
default=cast(EventListenerSettingsType, {"type": "POLLING"})
Expand Down
40 changes: 19 additions & 21 deletions port_ocean/ocean.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
import asyncio
import sys
import threading
from contextlib import asynccontextmanager
from typing import Callable, Any, Dict, AsyncIterator, Type
from typing import Any, AsyncIterator, Callable, Dict, Type

from fastapi import FastAPI, APIRouter
from fastapi import APIRouter, FastAPI
from loguru import logger
from pydantic import BaseModel
from starlette.types import Scope, Receive, Send
from starlette.types import Receive, Scope, Send

from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater
from port_ocean.clients.port.client import PortClient
from port_ocean.config.settings import (
IntegrationConfiguration,
)
from port_ocean.context.ocean import (
PortOceanContext,
ocean,
initialize_port_ocean_context,
ocean,
)
from port_ocean.core.handlers.resync_state_updater import ResyncStateUpdater
from port_ocean.core.integrations.base import BaseIntegration
from port_ocean.log.sensetive import sensitive_log_filter
from port_ocean.middlewares import request_handler
from port_ocean.utils.repeat import repeat_every
from port_ocean.utils.misc import IntegrationStateStatus
from port_ocean.utils.repeat import schedule_repeated_task
from port_ocean.utils.signal import signal_handler
from port_ocean.version import __integration_version__
from port_ocean.utils.misc import IntegrationStateStatus


class Ocean:
Expand Down Expand Up @@ -94,24 +93,21 @@ async def execute_resync_all() -> None:
raise e

interval = self.config.scheduled_resync_interval
loop = asyncio.get_event_loop()
if interval is not None:
logger.info(
f"Setting up scheduled resync, the integration will automatically perform a full resync every {interval} minutes)",
scheduled_interval=interval,
)
repeated_function = repeat_every(
seconds=interval * 60,
# Not running the resync immediately because the event listener should run resync on startup
wait_first=True,
)(
lambda: threading.Thread(
target=lambda: asyncio.run_coroutine_threadsafe(
execute_resync_all(), loop
)
).start()
)
await repeated_function()
await schedule_repeated_task(execute_resync_all, interval * 60)

async def _setup_scheduled_config_loading(self) -> None:
seconds = self.config.config_reload_interval
config_file_path = self.config.config_file_path
await schedule_repeated_task(
self.config.integration.load_config_from_file,
seconds,
config_file_path=config_file_path,
)

def initialize_app(self) -> None:
self.fast_api_app.include_router(self.integration_router, prefix="/integration")
Expand All @@ -121,6 +117,8 @@ async def lifecycle(_: FastAPI) -> AsyncIterator[None]:
try:
await self.integration.start()
await self._setup_scheduled_resync()
if self.config.config_file_path:
await self._setup_scheduled_config_loading()
yield None
except Exception:
logger.exception("Integration had a fatal error. Shutting down.")
Expand Down
26 changes: 25 additions & 1 deletion port_ocean/utils/repeat.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import threading
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Callable, Coroutine, Any
from typing import Any, Callable, Coroutine

from loguru import logger
from starlette.concurrency import run_in_threadpool
Expand Down Expand Up @@ -80,3 +81,26 @@ async def loop() -> None:
return wrapped

return decorator


async def schedule_repeated_task(
function: Callable[..., Coroutine[Any, Any, None]],
interval: int,
*args: Any,
**kwargs: Any,
) -> None:
"""
Schedule a repeated task that will run the given function every `interval` seconds
"""
loop = asyncio.get_event_loop()
repeated_function = repeat_every(
seconds=interval,
wait_first=True,
)(
lambda: threading.Thread(
target=lambda: asyncio.run_coroutine_threadsafe(
function(*args, **kwargs), loop
)
).start()
)
await repeated_function()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.15.2"
version = "0.15.3"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down
Loading