diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index 7a7338cdbe..0dbe4642b5 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -1,3 +1,6 @@ +import asyncio +import functools +from asyncio import TaskGroup from functools import lru_cache from typing import Any @@ -19,17 +22,25 @@ class JQEntityProcessor(BaseEntityProcessor): """ @lru_cache - def _compile(self, pattern: str) -> Any: - return jq.compile(pattern) + async def _compile(self, pattern: str) -> Any: + loop = asyncio.get_event_loop() + compiler = functools.partial(jq.compile, pattern) + return await loop.run_in_executor(None, compiler) - def _search(self, data: dict[str, Any], pattern: str) -> Any: + async def _search(self, data: dict[str, Any], pattern: str) -> Any: try: - return self._compile(pattern).first(data) + loop = asyncio.get_event_loop() + compiled_pattern = await self._compile(pattern) + first_value_callable = functools.partial(compiled_pattern.first, data) + return await loop.run_in_executor(None, first_value_callable) except Exception: return None - def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: - value = self._compile(pattern).first(data) + async def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: + loop = asyncio.get_event_loop() + compiled_pattern = await self._compile(pattern) + first_value_callable = functools.partial(compiled_pattern.first, data) + value = await loop.run_in_executor(None, first_value_callable) if isinstance(value, bool): return value @@ -38,34 +49,43 @@ def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool: f"Expected boolean value, got {type(value)} instead" ) - def _search_as_object( + async def _search_as_object( self, data: dict[str, Any], obj: dict[str, Any] ) -> dict[str, Any | None]: - result: dict[str, Any | None] = {} - for key, value in obj.items(): - try: + search_tasks = {} + with TaskGroup() as tg: + for key, value in obj.items(): if isinstance(value, dict): - result[key] = self._search_as_object(data, value) + search_tasks[key] = tg.create_task( + self._search_as_object(data, value) + ) else: - result[key] = self._search(data, value) + search_tasks[key] = tg.create_task(self._search(data, value)) + + result: dict[str, Any | None] = {} + for key, task in search_tasks.items(): + try: + result[key] = await task except Exception: result[key] = None + return result - def _calculate_entities( + async def _calculate_entities( self, mapping: ResourceConfig, raw_data: list[dict[str, Any]] ) -> list[Entity]: - entities = [] - for data in raw_data: - should_run = self._search_as_bool(data, mapping.selector.query) - + async def calculate_raw(data: dict[str, Any]) -> Entity: + should_run = await self._search_as_bool(data, mapping.selector.query) if should_run and mapping.port.entity: - entities.append( - self._search_as_object( + return Entity.parse_obj( + await self._search_as_object( data, mapping.port.entity.mappings.dict(exclude_unset=True) ) ) + entities_tasks = [asyncio.create_task(calculate_raw(data)) for data in raw_data] + entities = asyncio.gather(*entities_tasks) + return [ Entity.parse_obj(entity_data) for entity_data in filter( @@ -77,10 +97,10 @@ def _calculate_entities( async def _parse_items( self, mapping: ResourceConfig, raw_results: RawEntityDiff ) -> EntityDiff: - entities_before: list[Entity] = self._calculate_entities( + entities_before: list[Entity] = await self._calculate_entities( mapping, raw_results["before"] ) - entities_after: list[Entity] = self._calculate_entities( + entities_after: list[Entity] = await self._calculate_entities( mapping, raw_results["after"] )