Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] ocean core next resync #835

Merged
merged 66 commits into from
Aug 21, 2024

Conversation

shalev007
Copy link
Contributor

@shalev007 shalev007 commented Jul 23, 2024

Description

What:

  • Integrated functionality to send the state of the integration on each sync.
  • Implemented prediction of the next sync date.

Why:

  • To ensure that the integration state is consistently updated and monitored.
  • To enhance the accuracy and reliability of the next sync date prediction by utilizing server data for SaaS applications, rather than relying solely on environment variables.

How:

  • Updated the sync logic to include the current integration state in the payload sent to our monitoring system.
  • Modified the sync prediction mechanism for SaaS applications to use data from our servers, providing more accurate and context-aware predictions.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

@shalev007 shalev007 changed the title PORT 9302 ocean core send data to integration service [Core] ocean core send data to integration service Jul 23, 2024
Copy link
Contributor

@Tankilevitch Tankilevitch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI doesn't and left some more comments

.python-version Outdated Show resolved Hide resolved
port_ocean/ocean.py Outdated Show resolved Hide resolved
Comment on lines 83 to 87
interval_str = (
integration.get("spec", {})
.get("appSpec", {})
.get("scheduledResyncInterval")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have a code that turns the integration to an instance of integration, that way we don't need to actually use dictionary to get that key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your referring to ocean.config.integration but it only contains data from the env variables config and integration identifier

Comment on lines 90 to 91
next_resync_date = now + datetime.timedelta(minutes=float(interval or 0))
next_resync = next_resync_date.now(datetime.timezone.utc).timestamp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets leave it in seconds and make the UI show it in minutes if we want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM?
I changed it to timestamp to have better control over how we show it, and changed it to UTC so we won't have any timezone issues

port_ocean/ocean.py Outdated Show resolved Hide resolved
@shalev007 shalev007 force-pushed the PORT-9302-ocean-core-send-data-to-integration-service branch from c52cdde to 22146da Compare July 28, 2024 15:05
@shalev007 shalev007 marked this pull request as ready for review July 28, 2024 15:12
Copy link
Collaborator

@yairsimantov20 yairsimantov20 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets talk about the implementation

Comment on lines 50 to 75
def should_update_resync_state(self) -> bool:
return ocean.config.runtime == RuntimeType.Saas.value

async def before_resync(self) -> None:
if not self.should_update_resync_state():
return None

now = datetime.datetime.now()
try:
integration = await ocean.port_client.get_current_integration()
interval_str = (
integration.get("spec", {})
.get("appSpec", {})
.get("scheduledResyncInterval")
)
interval = convert_time_to_minutes(interval_str)
self.resync_state["next_resync"] = calculate_next_resync(now, interval)
except Exception:
logger.exception("Error occurred while calculating next resync")
return None

async def after_resync(self) -> None:
if not self.should_update_resync_state():
return None

await ocean.port_client.update_resync_state(self.resync_state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this implemented a bit different
With a lifecycle context manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a great idea!
but I think it's out of this PR's scope and it will inflate this PR more than it should, WDYT?

Comment on lines 72 to 96
def convert_time_to_minutes(time_str: str) -> int:
"""
Convert a string representing time to minutes.
:param time_str: a string representing time in the format "1h" or "1m"
"""
if time_str.endswith("h"):
hours = int(time_str[:-1])
return hours * 60
elif time_str.endswith("m"):
minutes = int(time_str[:-1])
return minutes
else:
raise ValueError("Invalid format. Expected a string ending with 'h' or 'm'.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Just send the time to port and let it handle it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's inconsistent with the way we send out time in non-saas integrations

port_ocean/utils/misc.py Outdated Show resolved Hide resolved
Comment on lines 100 to 160
)(
lambda: threading.Thread(
target=asyncio.run(execute_resync_all())
).start()
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to implement with asyncio task instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But tasks needs to be awaited which is not compatible with the target type Callable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI its incorrect task is a background task and we are using it in multiple places in ocean

for now its not important enough

Saas = "Saas"
OnPrem = "OnPrem"


Runtime = Literal["OnPrem", "Saas"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that needed anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point

port_ocean/core/event_listener/polling.py Outdated Show resolved Hide resolved
@yairsimantov20 yairsimantov20 changed the title [Core] ocean core send data to integration service [Core] ocean core next resync Jul 29, 2024
port_ocean/clients/port/client.py Outdated Show resolved Hide resolved
port_ocean/clients/port/client.py Outdated Show resolved Hide resolved
port_ocean/utils/misc.py Outdated Show resolved Hide resolved
@github-actions github-actions bot added size/L and removed size/M labels Aug 6, 2024
@shalev007 shalev007 force-pushed the PORT-9302-ocean-core-send-data-to-integration-service branch from 9092c8b to 1434dcf Compare August 7, 2024 07:13
@shalev007 shalev007 requested a review from Tankilevitch August 7, 2024 07:21
Comment on lines 80 to 88
async def update_integration_state(self, state: dict[str, Any]) -> dict[str, Any]:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}",
headers=await self.auth.headers(),
json={"state": state},
)
handle_status_code(response)
return response.json().get("integration", {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add it an endpoint of its own, that way modifying to implementation in the future will be easier.
As well as passing the patch to state = null won't override the last state ( As it is a computed thing by the integration )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a blocker?
is it mandatory for this PR, because if we do have time to invest in this infra then I'd rather use a whole other infra set to save this data in

port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
port_ocean/utils/misc.py Outdated Show resolved Hide resolved
port_ocean/utils/misc.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/polling.py Outdated Show resolved Hide resolved
running_task: Task[Any] = get_event_loop().create_task(
self.events["on_resync"]({}) # type: ignore
)
signal_handler.register(running_task.cancel)

await running_task
await self._after_resync()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens when it fails? will it stay running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will not stay up and running, I think whatever error that will come up to this it will not exit the app.
But we need to take into account that if this request fails in polling event-listener we might run an un called for resync since the latest updatedAt will not inline with the current one

port_ocean/core/event_listener/kafka.py Outdated Show resolved Hide resolved
@shalev007 shalev007 force-pushed the PORT-9302-ocean-core-send-data-to-integration-service branch from 83e5342 to 373e9e5 Compare August 13, 2024 08:20
port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
port_ocean/ocean.py Outdated Show resolved Hide resolved
port_ocean/ocean.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/base.py Outdated Show resolved Hide resolved
port_ocean/core/event_listener/once.py Outdated Show resolved Hide resolved
@shalev007 shalev007 force-pushed the PORT-9302-ocean-core-send-data-to-integration-service branch 2 times, most recently from 997cae6 to 6958ad1 Compare August 14, 2024 14:00
@shalev007 shalev007 requested a review from a team as a code owner August 18, 2024 13:02
Comment on lines 70 to 71
# TODO: remove this once we separate the state from the integration
self.last_integration_updated_at: str = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain why this is needed

Copy link
Contributor Author

@shalev007 shalev007 Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment, but this purely technical I do think the code is self explanatory once you look it up

Comment on lines 88 to 91
# we use the last updated time of the integration config as the start time since in saas application the interval is configured by the user from the portal
if not last_updated_saas_integration_config_str:
logger.error("updatedAt not found for integration")
return (None, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the effect and whether there are any action items?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example, more detailed log for whether this was expected or not

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines 79 to 81

async def update_integration_state(self, state: dict[str, Any]) -> dict[str, Any]:
logger.debug(f"Updating integration state with: {state}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure that should_log and should_raise are parameters that can be passed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?
We don't need this to be changed, these parameters stay constant across event-listener

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is code in the client class, therefor it should be an interface for other uses as well. and therefor each call to the client should decide whether it wants to raise the error or not

Comment on lines 86 to 89
)
handle_status_code(response, should_raise=False, should_log=True)
if not response.is_error:
logger.info("Integration state updated successfully")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and use the should_log here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYM?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you add the should_raise and should_log then decide if to log based on the parameter passed

Copy link
Contributor

@Tankilevitch Tankilevitch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

if response.is_success and should_log:
logger.info("Integration state updated successfully")

return response.json().get("integration", {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return response.json().get("integration", {})
return response.json()["integration"]

Comment on lines 41 to 72
async def _before_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action before resync.
"""
await ocean.app.update_state_before_scheduled_sync()

async def _after_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action after resync.
"""
await ocean.app.update_state_after_scheduled_sync()

async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)

async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e
Copy link
Collaborator

@yairsimantov20 yairsimantov20 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead use a context manager syntax?

@asynccontextmanager
async resync_life_cycle():
    await ocean.app.update_state_before_scheduled_sync()
    try:
        yield
    except Exception as e:
            await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)
            raise e
    else:
        await ocean.app.update_state_after_scheduled_sync()
Suggested change
async def _before_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action before resync.
"""
await ocean.app.update_state_before_scheduled_sync()
async def _after_resync(self) -> None:
"""
Can be used for event listeners that need to perform some action after resync.
"""
await ocean.app.update_state_after_scheduled_sync()
async def _on_resync_failure(self, e: Exception) -> None:
"""
Can be used for event listeners that need to handle resync failures.
"""
await ocean.app.update_state_after_scheduled_sync(IntegrationStateStatus.Failed)
async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
await self._before_resync()
try:
await self.events["on_resync"](resync_args)
await self._after_resync()
except Exception as e:
await self._on_resync_failure(e)
raise e
async def _resync(
self,
resync_args: dict[Any, Any],
) -> None:
"""
Triggers the "on_resync" event.
"""
async with resync_life_cycle():
await self.events["on_resync"](resync_args)

Copy link
Collaborator

@yairsimantov20 yairsimantov20 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of code here is irrelevant to ocean core and can be calculated on the port side

Comment on lines 100 to 160
)(
lambda: threading.Thread(
target=asyncio.run(execute_resync_all())
).start()
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI its incorrect task is a background task and we are using it in multiple places in ocean

for now its not important enough

Comment on lines +10 to +13
aware_date = datetime.datetime.fromisoformat(time_str)
if time_str.endswith("Z"):
aware_date = datetime.datetime.fromisoformat(time_str.replace("Z", "+00:00"))
return aware_date.astimezone(datetime.timezone.utc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
aware_date = datetime.datetime.fromisoformat(time_str)
if time_str.endswith("Z"):
aware_date = datetime.datetime.fromisoformat(time_str.replace("Z", "+00:00"))
return aware_date.astimezone(datetime.timezone.utc)
return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ").astimezone(datetime.timezone.utc)

Comment on lines +14 to +16
Running = "running"
Failed = "failed"
Completed = "completed"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason those are lower case?

Comment on lines 69 to 155
self.initiated_at = datetime.datetime.now(tz=datetime.timezone.utc)

async def _setup_scheduled_resync(
# This is used to differ between integration changes that require a full resync and state changes
# So that the polling event-listener can decide whether to perform a full resync or not
# TODO: remove this once we separate the state from the integration
self.last_integration_state_updated_at: str = ""

def is_saas(self) -> bool:
return self.config.runtime == Runtime.Saas

def _calculate_next_scheduled_resync(
self,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> str | None:
if interval is None:
return None
return get_next_occurrence(
interval * 60, custom_start_time or self.initiated_at
).isoformat()

async def update_state_before_scheduled_sync(
self,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> None:
def execute_resync_all() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_interval = interval or self.config.scheduled_resync_interval
nest_resync = self._calculate_next_scheduled_resync(
_interval, custom_start_time
)
state: dict[str, Any] = {
"status": IntegrationStateStatus.Running.value,
"lastResyncEnd": None,
"lastResyncStart": datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat(),
"nextResync": nest_resync,
"intervalInMinuets": _interval,
}

integration = await self.port_client.update_integration_state(
state, should_raise=False
)
if integration:
self.last_integration_state_updated_at = integration["state"]["updatedAt"]

async def update_state_after_scheduled_sync(
self,
status: Literal[
IntegrationStateStatus.Completed, IntegrationStateStatus.Failed
] = IntegrationStateStatus.Completed,
interval: int | None = None,
custom_start_time: datetime.datetime | None = None,
) -> None:
_interval = interval or self.config.scheduled_resync_interval
nest_resync = self._calculate_next_scheduled_resync(
_interval, custom_start_time
)
state: dict[str, Any] = {
"status": status.value,
"lastResyncEnd": datetime.datetime.now(
tz=datetime.timezone.utc
).isoformat(),
"nextResync": nest_resync,
"intervalInMinuets": _interval,
}

integration = await self.port_client.update_integration_state(
state, should_raise=False
)
if integration:
self.last_integration_state_updated_at = integration["state"]["updatedAt"]

async def _setup_scheduled_resync(
self,
) -> None:
async def execute_resync_all() -> None:
await self.update_state_before_scheduled_sync()
logger.info("Starting a new scheduled resync")
loop.run_until_complete(self.integration.sync_raw_all())
loop.close()
try:
await self.integration.sync_raw_all()
await self.update_state_after_scheduled_sync()
except Exception as e:
await self.update_state_after_scheduled_sync(
IntegrationStateStatus.Failed
)
raise e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic does not belong in this class
it is irrelevant for the core
this feature is only a side effect to the resync no the main core feature

@shalev007 shalev007 dismissed yairsimantov20’s stale review August 20, 2024 11:32

All remaining changes can be added after this feature is deployed

Comment on lines 80 to 92
async def update_integration_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration state updated successfully")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def update_integration_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration state updated successfully")
async def update_integration_resync_state(
self, state: dict[str, Any], should_raise: bool = True, should_log: bool = True
) -> dict[str, Any]:
if should_log:
logger.debug(f"Updating integration state with: {state}")
response = await self.client.patch(
f"{self.api_url}/integration/{self.integration_identifier}/resync-state",
headers=await self.auth.headers(),
json=state,
)
handle_status_code(response, should_raise, should_log)
if response.is_success and should_log:
logger.info("Integration resync state updated successfully")

@shalev007 shalev007 merged commit d54b97b into main Aug 21, 2024
12 checks passed
@shalev007 shalev007 deleted the PORT-9302-ocean-core-send-data-to-integration-service branch August 21, 2024 15:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants