Skip to content

Commit

Permalink
[Core] Reduce port system load by comparing entities and upserting ch…
Browse files Browse the repository at this point in the history
…anged entities for all types of integrations (#1336)

# Description

What - Introduced a new entity diff resolver to reduce port system load
by comparing entities and upserting changed entities only for all types
of integrations (not only saas)

Why - after a poc , we see that the fix works well and we want to have
it for all the integrations, not only saas

How - removed the if ocean saas check

## Type of change
- [ ] New feature (non-breaking change which adds functionality)


### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
  • Loading branch information
yaelibarg authored Jan 23, 2025
1 parent be90ed4 commit 65b7033
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 240 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.18.4 (2025-01-22)

### Improvements

- added check diff entitites to reduce load from port to all integrations

## 0.18.3 (2025-01-22)

### Improvements
Expand Down
4 changes: 3 additions & 1 deletion port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient):
# Semaphore is used to limit the number of concurrent requests to port, to avoid overloading it.
# The number of concurrent requests is set to 90% of the max connections limit, to leave some room for other
# requests that are not related to entities.
self.semaphore = asyncio.Semaphore(round(0.9 * PORT_HTTP_MAX_CONNECTIONS_LIMIT))
self.semaphore = asyncio.Semaphore(
round(0.5 * PORT_HTTP_MAX_CONNECTIONS_LIMIT)
) # 50% of the max connections limit in order to avoid overloading port

async def upsert_entity(
self,
Expand Down
42 changes: 18 additions & 24 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,33 +220,27 @@ async def _register_resource_raw(
)
modified_objects = []

if ocean.app.is_saas():
try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
try:
changed_entities = await self._map_entities_compared_with_port(
objects_diff[0].entity_selector_diff.passed,
resource,
user_agent_type
)
if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)

if changed_entities:
logger.info("Upserting changed entities", changed_entities=len(changed_entities),
total_entities=len(objects_diff[0].entity_selector_diff.passed))
await self.entities_state_applier.upsert(
changed_entities, user_agent_type
)
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))

modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
else:
else:
logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed))
modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed]
except Exception as e:
logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}")
modified_objects = await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
)

return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=modified_objects),
errors=objects_diff[0].errors,
Expand Down
Loading

0 comments on commit 65b7033

Please sign in to comment.