Skip to content

Commit

Permalink
parallel jq
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Mar 3, 2024
1 parent 411cd5b commit 373b530
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions port_ocean/core/handlers/entity_processor/jq_entity_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
import functools
from asyncio import TaskGroup
from functools import lru_cache
from typing import Any

Expand All @@ -19,17 +22,25 @@ class JQEntityProcessor(BaseEntityProcessor):
"""

@lru_cache
def _compile(self, pattern: str) -> Any:
return jq.compile(pattern)
async def _compile(self, pattern: str) -> Any:
loop = asyncio.get_event_loop()
compiler = functools.partial(jq.compile, pattern)
return await loop.run_in_executor(None, compiler)

def _search(self, data: dict[str, Any], pattern: str) -> Any:
async def _search(self, data: dict[str, Any], pattern: str) -> Any:
try:
return self._compile(pattern).first(data)
loop = asyncio.get_event_loop()
compiled_pattern = await self._compile(pattern)
first_value_callable = functools.partial(compiled_pattern.first, data)
return await loop.run_in_executor(None, first_value_callable)
except Exception:
return None

def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
value = self._compile(pattern).first(data)
async def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
loop = asyncio.get_event_loop()
compiled_pattern = await self._compile(pattern)
first_value_callable = functools.partial(compiled_pattern.first, data)
value = await loop.run_in_executor(None, first_value_callable)

if isinstance(value, bool):
return value
Expand All @@ -38,34 +49,43 @@ def _search_as_bool(self, data: dict[str, Any], pattern: str) -> bool:
f"Expected boolean value, got {type(value)} instead"
)

def _search_as_object(
async def _search_as_object(
self, data: dict[str, Any], obj: dict[str, Any]
) -> dict[str, Any | None]:
result: dict[str, Any | None] = {}
for key, value in obj.items():
try:
search_tasks = {}
with TaskGroup() as tg:
for key, value in obj.items():
if isinstance(value, dict):
result[key] = self._search_as_object(data, value)
search_tasks[key] = tg.create_task(
self._search_as_object(data, value)
)
else:
result[key] = self._search(data, value)
search_tasks[key] = tg.create_task(self._search(data, value))

result: dict[str, Any | None] = {}
for key, task in search_tasks.items():
try:
result[key] = await task
except Exception:
result[key] = None

return result

def _calculate_entities(
async def _calculate_entities(
self, mapping: ResourceConfig, raw_data: list[dict[str, Any]]
) -> list[Entity]:
entities = []
for data in raw_data:
should_run = self._search_as_bool(data, mapping.selector.query)

async def calculate_raw(data: dict[str, Any]) -> Entity:
should_run = await self._search_as_bool(data, mapping.selector.query)
if should_run and mapping.port.entity:
entities.append(
self._search_as_object(
return Entity.parse_obj(
await self._search_as_object(
data, mapping.port.entity.mappings.dict(exclude_unset=True)
)
)

entities_tasks = [asyncio.create_task(calculate_raw(data)) for data in raw_data]
entities = asyncio.gather(*entities_tasks)

return [
Entity.parse_obj(entity_data)
for entity_data in filter(
Expand All @@ -77,10 +97,10 @@ def _calculate_entities(
async def _parse_items(
self, mapping: ResourceConfig, raw_results: RawEntityDiff
) -> EntityDiff:
entities_before: list[Entity] = self._calculate_entities(
entities_before: list[Entity] = await self._calculate_entities(
mapping, raw_results["before"]
)
entities_after: list[Entity] = self._calculate_entities(
entities_after: list[Entity] = await self._calculate_entities(
mapping, raw_results["after"]
)

Expand Down

0 comments on commit 373b530

Please sign in to comment.