Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] ocean core next resync #835

Merged
merged 66 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f1c9d90
feat: add update port-api method
Jul 21, 2024
446dbb0
feat: update port-api when sync is done WIP
Jul 21, 2024
329245c
fix: remove dev file
Jul 23, 2024
d73a3ec
feat: revert execute_resync_all and calculate next_resync
Jul 23, 2024
076cbae
feat: move convert function to utils
Jul 23, 2024
383d99b
fix: use the once event listener
Jul 25, 2024
b7b9c4a
fix: remove update from the initialization itself
Jul 25, 2024
79fde73
feat: add enum
Jul 28, 2024
3f82fc2
fix: use enum value
Jul 28, 2024
d208f84
fix: don't kill integration on update state error
Jul 28, 2024
b33fbfb
refactor: move calculate_next_resync to misc
Jul 28, 2024
521c4cb
fix: no need for await for a regular function
Jul 28, 2024
44b2c68
refactor: remove unnecessary variable
Jul 28, 2024
253440a
Revert "fix: remove dev file"
Jul 28, 2024
a3b1961
refactor: remove unused imports
Jul 28, 2024
89d5e27
feat: add comment
Jul 28, 2024
b756c67
feat: wrap error bound request
Jul 28, 2024
47312d4
feat: add update state context
Jul 29, 2024
a98e22f
fix: timestamp issues
Jul 29, 2024
048b687
fix: remove unused function
Jul 29, 2024
dda5eef
refactor: change log
Jul 29, 2024
0b1cc58
fix: remove redundant type casting
Jul 29, 2024
1bb8ed1
refactor: replace type with enum
Jul 29, 2024
d38271c
refactor: update function name
Jul 30, 2024
14c4da0
feat: add logs
Jul 30, 2024
bde30d9
fix: enums bug
Jul 30, 2024
becf0b1
fix: override updatedAt eternal resync issue
Aug 6, 2024
058fd22
feat: add saas support
Aug 6, 2024
b939443
fix: remove test comments
Aug 7, 2024
baba43e
fix: datetime comparing issues in ocean saas
Aug 7, 2024
6abb7b7
fix: conflicts
Aug 7, 2024
bab26f3
fix: explain comment
Aug 7, 2024
98978ff
feat: use statusInfo updated_at on ocean saas next resync prediction
Aug 7, 2024
7aad044
feat: remove unused variable
Aug 11, 2024
996be85
fix: after once resync bug
Aug 11, 2024
b0b44d4
feat: delete unused functions
Aug 11, 2024
1325efa
feat: remove unnecessary GET integration call
Aug 11, 2024
930ea51
feat: rename variable
Aug 11, 2024
8f9a22e
feat: remove redundant spacing in imports
Aug 11, 2024
4316fde
fix: PR issues
Aug 11, 2024
c5d20d3
feat: use resync function from BaseEventListener
Aug 13, 2024
e6b3626
fix: key error
Aug 13, 2024
d9a21c8
fix: PR issues
Aug 14, 2024
4628fe4
fix: lint issue
Aug 14, 2024
425b9ec
feat: update state endpoint
Aug 15, 2024
c272e5c
fix: datetime objects to utc
Aug 15, 2024
b4d6fc5
feat: add type to state object
Aug 15, 2024
8f66686
feat: update state parameters
Aug 18, 2024
53a40c3
Merge branch 'main' into PORT-9302-ocean-core-send-data-to-integratio…
Aug 18, 2024
5595405
feat: do silent updates
Aug 18, 2024
f614823
feat: add status as constant
Aug 19, 2024
e442985
feat: add explanation comment
Aug 19, 2024
a8e022c
feat: add better logs
Aug 19, 2024
6d1e81f
feat: add should_raise and should_log
Aug 19, 2024
35c5688
refactor: rename variable
Aug 19, 2024
0ef8b52
feat: change default update state behavior
Aug 19, 2024
23d4ee9
feat: bump version
Aug 19, 2024
975f226
Merge branch 'main' into PORT-9302-ocean-core-send-data-to-integratio…
Aug 19, 2024
adb7967
feat: change changelog
Aug 19, 2024
661bea8
Merge branch 'main' into PORT-9302-ocean-core-send-data-to-integratio…
Aug 20, 2024
7b229fd
fix: scheduled resync blocked the main thread
Aug 21, 2024
0a50bff
Merge branch 'main' into PORT-9302-ocean-core-send-data-to-integratio…
Tankilevitch Aug 21, 2024
36774ea
refactor: move all update state functionality to a new class
Aug 21, 2024
e793681
feat: update resync-state new endpoint
Aug 21, 2024
19f93df
fix: lint checks
Aug 21, 2024
21848be
Merge branch 'main' into PORT-9302-ocean-core-send-data-to-integratio…
shalev007 Aug 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.10.0 (2024-08-19)

### Improvements

- Add support for reporting the integration resync state to expose more information about the integration state in the portal


## 0.9.13 (2024-08-13)

### Improvements
Expand Down
17 changes: 17 additions & 0 deletions port_ocean/clients/port/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_internal_http_client,
)
from port_ocean.exceptions.clients import KafkaCredentialsNotFound
from typing import Any


class PortClient(
Expand Down Expand Up @@ -75,3 +76,19 @@ async def get_org_id(self) -> str:
handle_status_code(response)

return response.json()["organization"]["id"]

async def update_integration_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration state updated successfully")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def update_integration_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration state updated successfully")
async def update_integration_resync_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/resync-state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration resync state updated successfully")


return response.json().get("integration", {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return response.json().get("integration", {})
return response.json()["integration"]

6 changes: 3 additions & 3 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
event_listener: EventListenerSettingsType
# If an identifier or type is not provided, it will be generated based on the integration name
integration: IntegrationSettings = IntegrationSettings(type="", identifier="")
runtime: Runtime = "OnPrem"
runtime: Runtime = Runtime.OnPrem

@root_validator()
def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -98,8 +98,8 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel:
return values

@validator("runtime")
def validate_runtime(cls, runtime: Literal["OnPrem", "Saas"]) -> Runtime:
if runtime == "Saas":
def validate_runtime(cls, runtime: Runtime) -> Runtime:
if runtime == Runtime.Saas:
spec = get_spec_file()
if spec is None:
raise ValueError(
Expand Down
35 changes: 35 additions & 0 deletions port_ocean/core/event_listener/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from port_ocean.config.base import BaseOceanModel
from port_ocean.utils.signal import signal_handler
from port_ocean.context.ocean import ocean
from port_ocean.utils.misc import IntegrationStateStatus


class EventListenerEvents(TypedDict):
Expand Down Expand Up @@ -36,6 +38,39 @@ def _stop(self) -> None:
"""
pass

async def _before_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action before resync.
"""
await ocean.app.update_state_before_scheduled_sync()

async def _after_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action after resync.
"""
await ocean.app.update_state_after_scheduled_sync()

async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)

async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e
Copy link
Collaborator

@yairsimantov20 yairsimantov20 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead use a context manager syntax?

@asynccontextmanager
async resync_life_cycle():
    await ocean.app.update_state_before_scheduled_sync()
    try:
        yield
    except Exception as e:
            await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)
            raise e
    else:
        await ocean.app.update_state_after_scheduled_sync()
Suggested change
async def _before_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action before resync.
"""
await ocean.app.update_state_before_scheduled_sync()
async def _after_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action after resync.
"""
await ocean.app.update_state_after_scheduled_sync()
async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)
async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e
async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
async with resync_life_cycle():
await self.events["on_resync"](resync_args)



class EventListenerSettings(BaseOceanModel, extra=Extra.allow):
type: str
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/event_listener/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ async def _start(self) -> None:

@target_channel_router.post("/resync")
async def resync() -> None:
await self.events["on_resync"]({})
await self._resync({})

ocean.app.fast_api_app.include_router(target_channel_router)
2 changes: 1 addition & 1 deletion port_ocean/core/event_listener/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def _handle_message(self, raw_msg: Message) -> None:

if "change.log" in topic and message is not None:
try:
await self.events["on_resync"](message)
await self._resync(message)
except Exception as e:
_type, _, tb = sys.exc_info()
logger.opt(exception=(_type, None, tb)).error(
Expand Down
97 changes: 96 additions & 1 deletion port_ocean/core/event_listener/once.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import signal
from typing import Literal, Any

Expand All @@ -9,6 +10,9 @@
EventListenerSettings,
)
from port_ocean.utils.repeat import repeat_every
from port_ocean.context.ocean import ocean
from port_ocean.utils.time import convert_str_to_utc_datetime, convert_to_minutes
from port_ocean.utils.misc import IntegrationStateStatus


class OnceEventListenerSettings(EventListenerSettings):
Expand Down Expand Up @@ -41,6 +45,97 @@ def __init__(
):
super().__init__(events)
self.event_listener_config = event_listener_config
self.cached_integration: dict[str, Any] | None = None

async def get_current_integration_cached(self) -> dict[str, Any]:
if self.cached_integration:
return self.cached_integration

self.cached_integration = await ocean.port_client.get_current_integration()
return self.cached_integration

async def get_saas_resync_initialization_and_interval(
self,
) -> tuple[int | None, datetime.datetime | None]:
"""
Get the scheduled resync interval and the last updated time of the integration config for the saas application.
interval is the saas configured resync interval time.
start_time is the last updated time of the integration config.
return: (interval, start_time)
"""
if not ocean.app.is_saas():
return (None, None)

try:
integration = await self.get_current_integration_cached()
except Exception as e:
logger.exception(f"Error occurred while getting current integration {e}")
return (None, None)

interval_str = (
integration.get("spec", {})
.get("appSpec", {})
.get("scheduledResyncInterval")
)

if not interval_str:
logger.error(
"Unexpected: scheduledResyncInterval not found for Saas integration, Cannot predict the next resync"
)
return (None, None)

last_updated_saas_integration_config_str = integration.get(
"statusInfo", {}
).get("updatedAt")

# we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal
if not last_updated_saas_integration_config_str:
logger.error(
"Unexpected: updatedAt not found for Saas integration, Cannot predict the next resync"
)
return (None, None)

return (
convert_to_minutes(interval_str),
convert_str_to_utc_datetime(last_updated_saas_integration_config_str),
)

async def _before_resync(self) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._before_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_before_scheduled_sync(interval, start_time)

async def _after_resync(self) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._after_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_after_scheduled_sync(
IntegrationStateStatus.Completed, interval, start_time
)

async def _on_resync_failure(self, e: Exception) -> None:
if not ocean.app.is_saas():
# in case of non-saas, we still want to update the state before and after the resync
await super()._after_resync()
return

(interval, start_time) = (
await self.get_saas_resync_initialization_and_interval()
)
await ocean.app.update_state_after_scheduled_sync(
IntegrationStateStatus.Failed, interval, start_time
)

async def _start(self) -> None:
"""
Expand All @@ -53,7 +148,7 @@ async def _start(self) -> None:
async def resync_and_exit() -> None:
logger.info("Once event listener started")
try:
await self.events["on_resync"]({})
await self._resync({})
except Exception:
# we catch all exceptions here to make sure the application will exit gracefully
logger.exception("Error occurred while resyncing")
Expand Down
22 changes: 11 additions & 11 deletions port_ocean/core/event_listener/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ def __init__(
):
super().__init__(events)
self.event_listener_config = event_listener_config
self._last_updated_at = None

def should_resync(self, last_updated_at: str) -> bool:
_last_updated_at = ocean.app.last_integration_state_updated_at

if _last_updated_at is None:
return self.event_listener_config.resync_on_start

return _last_updated_at != last_updated_at

async def _start(self) -> None:
"""
Expand All @@ -69,17 +76,10 @@ async def resync() -> None:
integration = await ocean.app.port_client.get_current_integration()
last_updated_at = integration["updatedAt"]

should_resync = (
self._last_updated_at is not None
or self.event_listener_config.resync_on_start
) and self._last_updated_at != last_updated_at

if should_resync:
if self.should_resync(last_updated_at):
logger.info("Detected change in integration, resyncing")
self._last_updated_at = last_updated_at
running_task: Task[Any] = get_event_loop().create_task(
self.events["on_resync"]({}) # type: ignore
)
ocean.app.last_integration_state_updated_at = last_updated_at
running_task: Task[Any] = get_event_loop().create_task(self._resync({}))
signal_handler.register(running_task.cancel)

await running_task
Expand Down
7 changes: 5 additions & 2 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dataclasses import dataclass, field
from typing import Any, Literal
from enum import Enum
from typing import Any

from pydantic import BaseModel
from pydantic.fields import Field


Runtime = Literal["OnPrem", "Saas"]
class Runtime(Enum):
Saas = "Saas"
OnPrem = "OnPrem"


class Entity(BaseModel):
Expand Down
5 changes: 3 additions & 2 deletions port_ocean/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ def is_same_entity(first_entity: Entity, second_entity: Entity) -> bool:


async def validate_integration_runtime(
port_client: PortClient, requested_runtime: Runtime
port_client: PortClient,
requested_runtime: Runtime,
) -> None:
logger.debug("Validating integration runtime")
current_integration = await port_client.get_current_integration(should_raise=False)
current_runtime = current_integration.get("installationType", "OnPrem")
if current_integration and current_runtime != requested_runtime:
if current_integration and current_runtime != requested_runtime.value:
raise IntegrationRuntimeException(
f"Invalid Runtime! Requested to run existing {current_runtime} integration in {requested_runtime} runtime."
)
Expand Down
Loading