From 65b70339f7546e0f82a84da1458b38491925d7cf Mon Sep 17 00:00:00 2001 From: Yaeli Gimelshtein Date: Thu, 23 Jan 2025 08:54:07 -0800 Subject: [PATCH] [Core] Reduce port system load by comparing entities and upserting changed entities for all types of integrations (#1336) # Description What - Introduced a new entity diff resolver to reduce port system load by comparing entities and upserting changed entities only for all types of integrations (not only saas) Why - after a poc , we see that the fix works well and we want to have it for all the integrations, not only saas How - removed the if ocean saas check ## Type of change - [ ] New feature (non-breaking change which adds functionality) ### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector --- CHANGELOG.md | 6 + port_ocean/clients/port/mixins/entities.py | 4 +- .../core/integrations/mixins/sync_raw.py | 42 +-- .../core/handlers/mixins/test_sync_raw.py | 345 +++++++----------- pyproject.toml | 2 +- 5 files changed, 159 insertions(+), 240 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 366c62f239..d2d2cdf8fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.18.4 (2025-01-22) + +### Improvements + +- added check diff entitites to reduce load from port to all integrations + ## 0.18.3 (2025-01-22) ### Improvements diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 521863bcdd..e1900289ea 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -22,7 +22,9 @@ def __init__(self, auth: PortAuthentication, client: httpx.AsyncClient): # Semaphore is used to limit the number of concurrent requests to port, to avoid overloading it. # The number of concurrent requests is set to 90% of the max connections limit, to leave some room for other # requests that are not related to entities. - self.semaphore = asyncio.Semaphore(round(0.9 * PORT_HTTP_MAX_CONNECTIONS_LIMIT)) + self.semaphore = asyncio.Semaphore( + round(0.5 * PORT_HTTP_MAX_CONNECTIONS_LIMIT) + ) # 50% of the max connections limit in order to avoid overloading port async def upsert_entity( self, diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index c579a87153..c91754f869 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -220,33 +220,27 @@ async def _register_resource_raw( ) modified_objects = [] - if ocean.app.is_saas(): - try: - changed_entities = await self._map_entities_compared_with_port( - objects_diff[0].entity_selector_diff.passed, - resource, - user_agent_type + try: + changed_entities = await self._map_entities_compared_with_port( + objects_diff[0].entity_selector_diff.passed, + resource, + user_agent_type + ) + if changed_entities: + logger.info("Upserting changed entities", changed_entities=len(changed_entities), + total_entities=len(objects_diff[0].entity_selector_diff.passed)) + await self.entities_state_applier.upsert( + changed_entities, user_agent_type ) - - if changed_entities: - logger.info("Upserting changed entities", changed_entities=len(changed_entities), - total_entities=len(objects_diff[0].entity_selector_diff.passed)) - await self.entities_state_applier.upsert( - changed_entities, user_agent_type - ) - else: - logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed)) - - modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed] - except Exception as e: - logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}") - modified_objects = await self.entities_state_applier.upsert( - objects_diff[0].entity_selector_diff.passed, user_agent_type - ) - else: + else: + logger.info("Entities in batch didn't changed since last sync, skipping", total_entities=len(objects_diff[0].entity_selector_diff.passed)) + modified_objects = [ocean.port_client._reduce_entity(entity) for entity in objects_diff[0].entity_selector_diff.passed] + except Exception as e: + logger.warning(f"Failed to resolve batch entities with Port, falling back to upserting all entities: {str(e)}") modified_objects = await self.entities_state_applier.upsert( objects_diff[0].entity_selector_diff.passed, user_agent_type - ) + ) + return CalculationResult( objects_diff[0].entity_selector_diff._replace(passed=modified_objects), errors=objects_diff[0].errors, diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index e25c8aef04..b0143a7fb4 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -433,93 +433,76 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: @pytest.mark.asyncio async def test_register_raw( mock_sync_raw_mixin_with_jq_processor: SyncRawMixin, - mock_ocean: Ocean, - mock_context: PortOceanContext, - monkeypatch: pytest.MonkeyPatch, + mock_resource_config: ResourceConfig, ) -> None: - kind = "service" - user_agent_type = UserAgentType.exporter - raw_entity = [ - {"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"}, - ] - expected_result = [ - { - "identifier": "entity_1", - "blueprint": "service", - "name": "entity_1", - "properties": {"url": "https://example.com"}, - }, - ] - - # Set is_saas to False - monkeypatch.setattr(mock_context.app, "is_saas", lambda: False) - - async with event_context(EventType.HTTP_REQUEST, trigger_type="machine") as event: - # Use patch to mock the method instead of direct assignment - with patch.object( - mock_sync_raw_mixin_with_jq_processor.port_app_config_handler, - "get_port_app_config", - return_value=PortAppConfig( - enable_merge_entity=True, - delete_dependent_entities=True, - create_missing_related_entities=False, - resources=[ - ResourceConfig( - kind=kind, - selector=Selector(query="true"), - port=PortResourceConfig( - entity=MappingsConfig( - mappings=EntityMapping( - identifier=".id | tostring", - title=".name", - blueprint='"service"', - properties={"url": ".web_url"}, - relations={}, - ) - ) - ), - ) - ], - ), - ): - # Ensure the event.port_app_config is set correctly - event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config( - use_cache=False - ) - - def upsert_side_effect( - entities: list[Entity], user_agent_type: UserAgentType - ) -> list[Entity]: - # Simulate returning the passed entities - return entities + # Mock the integration settings with skip_check_diff + with patch.object(ocean.config.integration, "skip_check_diff", False): + kind = "service" + user_agent_type = UserAgentType.exporter + raw_entity = [ + {"id": "entity_1", "name": "entity_1", "web_url": "https://example.com"}, + ] + expected_result = [ + { + "identifier": "entity_1", + "blueprint": "service", + "name": "entity_1", + "properties": {}, + }, + ] - # Patch the upsert method with the side effect + async with event_context( + EventType.HTTP_REQUEST, trigger_type="machine" + ) as event: + # Use patch to mock the method instead of direct assignment with patch.object( - mock_sync_raw_mixin_with_jq_processor.entities_state_applier, - "upsert", - side_effect=upsert_side_effect, + mock_sync_raw_mixin_with_jq_processor.port_app_config_handler, + "get_port_app_config", + return_value=PortAppConfig( + enable_merge_entity=True, + delete_dependent_entities=True, + create_missing_related_entities=False, + resources=[mock_resource_config], + ), ): - # Call the register_raw method - registered_entities = ( - await mock_sync_raw_mixin_with_jq_processor.register_raw( - kind, raw_entity, user_agent_type - ) + # Ensure the event.port_app_config is set correctly + event.port_app_config = await mock_sync_raw_mixin_with_jq_processor.port_app_config_handler.get_port_app_config( + use_cache=False ) - # Assert that the registered entities match the expected results - assert len(registered_entities) == len(expected_result) - for entity, result in zip(registered_entities, expected_result): - assert entity.identifier == result["identifier"] - assert entity.blueprint == result["blueprint"] - assert entity.properties == result["properties"] + def upsert_side_effect( + entities: list[Entity], user_agent_type: UserAgentType + ) -> list[Entity]: + # Simulate returning the passed entities + return entities + + # Patch the upsert method with the side effect + with patch.object( + mock_sync_raw_mixin_with_jq_processor.entities_state_applier, + "upsert", + side_effect=upsert_side_effect, + ): + # Call the register_raw method + registered_entities = ( + await mock_sync_raw_mixin_with_jq_processor.register_raw( + kind, raw_entity, user_agent_type + ) + ) + + # Assert that the registered entities match the expected results + assert len(registered_entities) == len(expected_result) + for entity, result in zip(registered_entities, expected_result): + assert entity.identifier == result["identifier"] + assert entity.blueprint == result["blueprint"] + assert entity.properties == result["properties"] @pytest.mark.asyncio async def test_unregister_raw( mock_sync_raw_mixin_with_jq_processor: SyncRawMixin, - mock_ocean: Ocean, mock_context: PortOceanContext, monkeypatch: pytest.MonkeyPatch, + mock_resource_config: ResourceConfig, ) -> None: kind = "service" user_agent_type = UserAgentType.exporter @@ -547,23 +530,7 @@ async def test_unregister_raw( enable_merge_entity=True, delete_dependent_entities=True, create_missing_related_entities=False, - resources=[ - ResourceConfig( - kind=kind, - selector=Selector(query="true"), - port=PortResourceConfig( - entity=MappingsConfig( - mappings=EntityMapping( - identifier=".id | tostring", - title=".name", - blueprint='"service"', - properties={"url": ".web_url"}, - relations={}, - ) - ) - ), - ) - ], + resources=[mock_resource_config], ), ): # Ensure the event.port_app_config is set correctly @@ -718,142 +685,92 @@ class CalculationResult: @pytest.mark.asyncio -async def test_register_resource_raw_saas_no_changes_upsert_not_called_entitiy_is_returned( - mock_sync_raw_mixin: SyncRawMixin, - mock_port_app_config: PortAppConfig, - mock_context: PortOceanContext, - monkeypatch: pytest.MonkeyPatch, -) -> None: - # Mock ocean.app.is_saas() - monkeypatch.setattr(mock_context.app, "is_saas", lambda: True) - - # Mock dependencies - entity = Entity(identifier="1", blueprint="service") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], # Use the first resource from the config - [{"some": "data"}], - UserAgentType.exporter, - ) - - # Assertions - assert len(result.entity_selector_diff.passed) == 1 - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() - - -@pytest.mark.asyncio -async def test_register_resource_raw_saas_with_changes_upsert_called_and_entities_are_mapped( +async def test_register_resource_raw_no_changes_upsert_not_called_entitiy_is_returned( mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig, - mock_context: PortOceanContext, - monkeypatch: pytest.MonkeyPatch, ) -> None: - # Mock ocean.app.is_saas() - monkeypatch.setattr(mock_context.app, "is_saas", lambda: True) - - # Mock dependencies - entity = Entity(identifier="1", blueprint="service") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], - [{"some": "data"}], - UserAgentType.exporter, - ) + # Mock the integration settings with skip_check_diff + with patch.object(ocean.config.integration, "skip_check_diff", False): + entity = Entity(identifier="1", blueprint="service") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[ + 0 + ], # Use the first resource from the config + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 1 - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() + # Assertions + assert len(result.entity_selector_diff.passed) == 1 + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() @pytest.mark.asyncio -async def test_register_resource_raw_non_saas_upsert_called_and_no_entitites_diff_calculation( +async def test_register_resource_raw_with_changes_upsert_called_and_entities_are_mapped( mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig, - mock_context: PortOceanContext, - monkeypatch: pytest.MonkeyPatch, ) -> None: - # Mock ocean.app.is_saas() - monkeypatch.setattr(mock_context.app, "is_saas", lambda: False) - - # Mock dependencies - entity = Entity(identifier="1", blueprint="service") - calculation_result = CalculationResult( - entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), - errors=[], - misconfigurations=[], - misonfigured_entity_keys=[], - ) - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[calculation_result]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock() # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], - [{"some": "data"}], - UserAgentType.exporter, - ) + # Mock the integration settings with skip_check_diff + with patch.object(ocean.config.integration, "skip_check_diff", False): + entity = Entity(identifier="1", blueprint="service") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[entity], failed=[]), errors=[], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([entity])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock(return_value=[entity]) # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 1 - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_not_called() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() + # Assertions + assert len(result.entity_selector_diff.passed) == 1 + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_called_once() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() @pytest.mark.asyncio -async def test_register_resource_raw_saas_with_errors( - mock_sync_raw_mixin: SyncRawMixin, - mock_port_app_config: PortAppConfig, - mock_context: PortOceanContext, - monkeypatch: pytest.MonkeyPatch, +async def test_register_resource_raw_with_errors( + mock_sync_raw_mixin: SyncRawMixin, mock_port_app_config: PortAppConfig ) -> None: - # Mock ocean.app.is_saas() - monkeypatch.setattr(mock_context.app, "is_saas", lambda: True) - - # Mock dependencies - failed_entity = Entity(identifier="1", blueprint="service") - error = Exception("Test error") - mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore - mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore - mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore - - async with event_context(EventType.RESYNC, trigger_type="machine") as event: - event.port_app_config = mock_port_app_config - - # Test execution - result = await mock_sync_raw_mixin._register_resource_raw( - mock_port_app_config.resources[0], - [{"some": "data"}], - UserAgentType.exporter, - ) + # Mock the integration settings with skip_check_diff + with patch.object(ocean.config.integration, "skip_check_diff", False): + failed_entity = Entity(identifier="1", blueprint="service") + error = Exception("Test error") + mock_sync_raw_mixin._calculate_raw = AsyncMock(return_value=[CalculationResult(entity_selector_diff=EntitySelectorDiff(passed=[], failed=[failed_entity]), errors=[error], misconfigurations=[], misonfigured_entity_keys=[])]) # type: ignore + mock_sync_raw_mixin._map_entities_compared_with_port = AsyncMock(return_value=([])) # type: ignore + mock_sync_raw_mixin.entities_state_applier.upsert = AsyncMock() # type: ignore + + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + event.port_app_config = mock_port_app_config + + # Test execution + result = await mock_sync_raw_mixin._register_resource_raw( + mock_port_app_config.resources[0], + [{"some": "data"}], + UserAgentType.exporter, + ) - # Assertions - assert len(result.entity_selector_diff.passed) == 0 - assert len(result.entity_selector_diff.failed) == 1 - assert len(result.errors) == 1 - assert result.errors[0] == error - mock_sync_raw_mixin._calculate_raw.assert_called_once() - mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() - mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() + # Assertions + assert len(result.entity_selector_diff.passed) == 0 + assert len(result.entity_selector_diff.failed) == 1 + assert len(result.errors) == 1 + assert result.errors[0] == error + mock_sync_raw_mixin._calculate_raw.assert_called_once() + mock_sync_raw_mixin._map_entities_compared_with_port.assert_called_once() + mock_sync_raw_mixin.entities_state_applier.upsert.assert_not_called() diff --git a/pyproject.toml b/pyproject.toml index 1cadd96c34..ea38b0fed0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.18.3" +version = "0.18.4" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"