-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core] Add retry for failed upserts and handle circular dependencies #1241
[Core] Add retry for failed upserts and handle circular dependencies #1241
Conversation
e152ca7
to
6ef411a
Compare
1. Register callbacks of failed entities. 2. When done with upserts, try topological sort on failed entities. 3. On fail of retry because of topological sort - try unsorted upsert. 4. Update topological's sort tree creation so an entity cannot be it's own dependency. 5. Test upsert with dependencies, with self circular dependency and external entity dependency.
5ab5b5b
to
c89b550
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice job!! left a few comments
if upsertedEntity is False: | ||
event.register_failed_upsert_call_arguments( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth adding a log message that we will try it at the end of the sync
CHANGELOG.md
Outdated
- Register callbacks of failed entities. | ||
- Test upsert with dependencies, with self circular dependency and external entity dependency. | ||
- Update topologicals sort tree creation so an entity cannot be its own dependency. | ||
- When done with upserts, try topological sort on failed entities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- When done with upserts, try topological sort on failed entities. | |
- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all the failed entities through a topological sort and upsert them as well |
CHANGELOG.md
Outdated
- On fail of retry because of topological sort - try unsorted upsert. | ||
- Register callbacks of failed entities. | ||
- Test upsert with dependencies, with self circular dependency and external entity dependency. | ||
- Update topologicals sort tree creation so an entity cannot be its own dependency. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Update topologicals sort tree creation so an entity cannot be its own dependency. | |
- Fix topologicals sort tree creation so an entity cannot be its own dependency |
CHANGELOG.md
Outdated
|
||
### Bug Fixes | ||
|
||
- On fail of retry because of topological sort - try unsorted upsert. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- On fail of retry because of topological sort - try unsorted upsert. | |
- When experiencing cyclic error on topological sort try unsorted upsert of the entities |
CHANGELOG.md
Outdated
### Bug Fixes | ||
|
||
- On fail of retry because of topological sort - try unsorted upsert. | ||
- Register callbacks of failed entities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed
result = response.json() | ||
if ( | ||
response.status_code == 404 | ||
and result.get("ok") is False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and result.get("ok") is False | |
and not result.get("ok") |
port_ocean/context/event.py
Outdated
async def handle_failed(self) -> None: | ||
entity_map: dict[ | ||
str, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]] | ||
] = { | ||
f"{obj.identifier}-{obj.blueprint}": func | ||
for obj, func in self._failed_entity_callback_list | ||
} | ||
entity_list: list[Entity] = [ | ||
obj for obj, func in self._failed_entity_callback_list | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is actually a method that should be in the event
(and maybe the whole handling stuff), I am leaning towards having a class for managing all of this and moving the instance for it to the event instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.
EntityTopologicalHandler:
def register_entities
async def sort_and_upsert():
EventContext:
entity_topological_handler = EntityTopologicalHandler()
ordered_created_entities = reversed( | ||
entities_with_search_identifier | ||
+ order_by_entities_dependencies(entities_without_search_identifier) | ||
entities_with_search_identifier + entities_without_search_identifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens with search relation and identifier? do we want to register them as well?
1. Add robust tests. 2. Change the implementation of registering failed entities.
eac0300
to
b7b88ff
Compare
@@ -57,6 +57,13 @@ async def upsert_entity( | |||
f"entity: {entity.identifier} of " | |||
f"blueprint: {entity.blueprint}" | |||
) | |||
result = response.json() | |||
if ( | |||
response.status_code == 404 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use const of httpstatus.code
@@ -127,6 +126,13 @@ async def upsert( | |||
) | |||
if upsertedEntity: | |||
modified_entities.append(upsertedEntity) | |||
if upsertedEntity is False: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a comment why is False
and not if not upsertedEntity
except: | ||
await event.failed_entity_handler.handle_failed_no_sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets except specific error type
event.failed_entity_handler.register_failed_upsert_call_arguments( | ||
entity, | ||
event.port_app_config.get_port_request_options(), | ||
user_agent_type, | ||
self.context.port_client.upsert_entity, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets only pass entity
to it
|
||
|
||
@dataclass | ||
class FailedEntityHandler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be called
EntityTopologicalSorter
try: | ||
await event.failed_entity_handler.handle_failed() | ||
except: | ||
await event.failed_entity_handler.handle_failed_no_sort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe something like this
try: | |
await event.failed_entity_handler.handle_failed() | |
except: | |
await event.failed_entity_handler.handle_failed_no_sort() | |
try: | |
for entity in event.failed_entity_handler.get_sorted() | |
await self.upsert_entity( | |
entity, | |
request_options, | |
user_agent_type, | |
should_raise=should_raise, | |
) | |
for entity in entities | |
), | |
except: | |
await event.failed_entity_handler.handle_failed_no_sort() |
1. Extrac logic of topological sort into a class. 2. Modify tests.
c63bd2a
to
209e64b
Compare
|
||
if ( | ||
response.status_code == status.HTTP_404_NOT_FOUND | ||
and not result.get("ok") | ||
and result.get("error") == PortApiStatus.NOT_FOUND.value | ||
): | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a comment why we return False
and what it means, add it to the method description as well
port_ocean/core/models.py
Outdated
@@ -27,6 +27,10 @@ def is_installation_type_compatible(self, installation_type: str) -> bool: | |||
) or installation_type == self.value | |||
|
|||
|
|||
class PortApiStatus(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class PortApiStatus(Enum): | |
class PortAPIErrorMessage(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be in the core/utils
rather than in the utils
exposed to the integrations clients
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can squeeze the entity_topological_sorter
together with this?
@@ -29,7 +30,7 @@ | |||
) | |||
from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results | |||
from port_ocean.exceptions.core import OceanAbortException | |||
|
|||
import json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
def get_entities(self) -> Generator[Entity, Any, None]: | ||
entity_map: dict[str, Entity] = { | ||
f"{entity.identifier}-{entity.blueprint}": entity | ||
for entity in self.entities | ||
} | ||
sorted_and_mapped = order_by_entities_dependencies(self.entities) | ||
for obj in sorted_and_mapped: | ||
entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") | ||
if entity is not None: | ||
yield entity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def get_entities(self) -> Generator[Entity, Any, None]: | |
entity_map: dict[str, Entity] = { | |
f"{entity.identifier}-{entity.blueprint}": entity | |
for entity in self.entities | |
} | |
sorted_and_mapped = order_by_entities_dependencies(self.entities) | |
for obj in sorted_and_mapped: | |
entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") | |
if entity is not None: | |
yield entity | |
def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]: | |
if not sorted: | |
for entity in self.entities: | |
yield entity | |
entity_map: dict[str, Entity] = { | |
f"{entity.identifier}-{entity.blueprint}": entity | |
for entity in self.entities | |
} | |
sorted_and_mapped = order_by_entities_dependencies(self.entities) | |
for obj in sorted_and_mapped: | |
entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") | |
if entity is not None: | |
yield entity |
try: | ||
for entity in event.entity_topological_sorter.get_entities(): | ||
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) | ||
|
||
except OceanAbortException as ocean_abort: | ||
if isinstance(ocean_abort.__cause__,CycleError): | ||
for entity in event.entity_topological_sorter.entities: | ||
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try: | |
for entity in event.entity_topological_sorter.get_entities(): | |
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) | |
except OceanAbortException as ocean_abort: | |
if isinstance(ocean_abort.__cause__,CycleError): | |
for entity in event.entity_topological_sorter.entities: | |
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) | |
try: | |
for entity in event.entity_topological_sorter.get_entities(): | |
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) | |
except OceanAbortException as ocean_abort: | |
if isinstance(ocean_abort.__cause__,CycleError): | |
for entity in event.entity_topological_sorter.get_entities(sorted=False): | |
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) |
try: | ||
for entity in event.entity_topological_sorter.get_entities(): | ||
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a log that x entities found that failed due to non existing relations, trying to re-ingest
and also a log if not
7041bd7
to
b9cf2c3
Compare
1. Change location of files. 2. Exctract logic of handle failed into a function. 3. Update get_entities.
b9cf2c3
to
3d49fa7
Compare
…ecting-argo-cd-integration
) -> Entity | None | Literal[False]: | ||
""" | ||
[Entity] will be returned on happy flow | ||
[None] will be returned if entity is using search identifier | ||
[False] will be returned if upsert failed because of unmet dependency | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets follow the function description format like in process_in_queue
@@ -115,8 +114,7 @@ async def upsert( | |||
entities_without_search_identifier.append(entity) | |||
|
|||
ordered_created_entities = reversed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no ordering needed any more, lets just use entities
1. Update `upsert_entity` function description. 2. Remove unused logic from `upsert`.
…ecting-argo-cd-integration
1. Fix lint 2. change order of assert in test
1. Update `entity_topological_sorter` to be created seperatly for each class instance. 2. Remove data class from EntityTopologicalSorter. 3. Update `entity_topological_sorter` to `_entity_topological_sorter`.
ffe2c27
to
5099a80
Compare
Rename _entity_topological_sorter to entity_topological_sorter
1. Modify logs and pass count as param. 2. Verify `create_missing_related_entities` is false before `sort_and_upsert_failed_entities` execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
||
async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: | ||
try: | ||
if not event.entity_topological_sorter.is_to_execute(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not event.entity_topological_sorter.is_to_execute(): | |
if not event.entity_topological_sorter.should_execute(): |
update is_to_execute to should_execute
…ecting-argo-cd-integration
Description
What -
Why -
Users having circular dependency errors and entities are not upserted.
Type of change
Please leave one option from the following and delete the rest:
All tests should be run against the port production environment(using a testing org).
Core testing checklist
Integration testing checklist
examples
folder in the integration directory.Preflight checklist
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.