From c5ec0c62a3642b004a82f42e7728f92e8582ea3d Mon Sep 17 00:00:00 2001 From: yair Date: Thu, 10 Oct 2024 15:41:16 +0300 Subject: [PATCH] with jq --- .../core/integrations/mixins/sync_raw.py | 94 +++++++++---------- pyproject.toml | 2 +- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index ee4c204563..4402d5b3b0 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -3,7 +3,6 @@ import typing from typing import Callable, Awaitable, Any -import httpx from loguru import logger from port_ocean.clients.port.types import UserAgentType @@ -140,11 +139,11 @@ async def _register_resource_raw( objects_diff = await self._calculate_raw( [(resource, results)], parse_all, send_raw_data_examples_amount ) - modified_objects = await self.entities_state_applier.upsert( - objects_diff[0].entity_selector_diff.passed, user_agent_type - ) + # modified_objects = await self.entities_state_applier.upsert( + # objects_diff[0].entity_selector_diff.passed, user_agent_type + # ) return CalculationResult( - objects_diff[0].entity_selector_diff._replace(passed=modified_objects), + objects_diff[0].entity_selector_diff._replace(passed=objects_diff[0].entity_selector_diff.passed), errors=objects_diff[0].errors, ) @@ -173,49 +172,48 @@ async def _register_in_batches( self, resource_config: ResourceConfig, user_agent_type: UserAgentType ) -> tuple[list[Entity], list[Exception]]: results, errors = await self._get_resource_raw_results(resource_config) - passed_entities = [] - # async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] - # raw_results: RAW_RESULT = [] - # for result in results: - # if isinstance(result, dict): - # raw_results.append(result) - # else: - # async_generators.append(result) - # - # send_raw_data_examples_amount = ( - # SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 - # ) - # all_entities, register_errors = await self._register_resource_raw( - # resource_config, - # raw_results, - # user_agent_type, - # send_raw_data_examples_amount=send_raw_data_examples_amount, - # ) - # errors.extend(register_errors) - # passed_entities = list(all_entities.passed) - # - # for generator in async_generators: - # try: - # async for items in generator: - # if send_raw_data_examples_amount > 0: - # send_raw_data_examples_amount = max( - # 0, send_raw_data_examples_amount - len(passed_entities) - # ) - # - # entities, register_errors = await self._register_resource_raw( - # resource_config, - # items, - # user_agent_type, - # send_raw_data_examples_amount=send_raw_data_examples_amount, - # ) - # errors.extend(register_errors) - # passed_entities.extend(entities.passed) - # except* OceanAbortException as error: - # errors.append(error) - # - # logger.info( - # f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" - # ) + async_generators: list[ASYNC_GENERATOR_RESYNC_TYPE] = [] + raw_results: RAW_RESULT = [] + for result in results: + if isinstance(result, dict): + raw_results.append(result) + else: + async_generators.append(result) + + send_raw_data_examples_amount = ( + SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0 + ) + all_entities, register_errors = await self._register_resource_raw( + resource_config, + raw_results, + user_agent_type, + send_raw_data_examples_amount=send_raw_data_examples_amount, + ) + errors.extend(register_errors) + passed_entities = list(all_entities.passed) + + for generator in async_generators: + try: + async for items in generator: + if send_raw_data_examples_amount > 0: + send_raw_data_examples_amount = max( + 0, send_raw_data_examples_amount - len(passed_entities) + ) + + entities, register_errors = await self._register_resource_raw( + resource_config, + items, + user_agent_type, + send_raw_data_examples_amount=send_raw_data_examples_amount, + ) + errors.extend(register_errors) + passed_entities.extend(entities.passed) + except* OceanAbortException as error: + errors.append(error) + + logger.info( + f"Finished registering change for {len(results)} raw results for kind: {resource_config.kind}. {len(passed_entities)} entities were affected" + ) return passed_entities, errors async def register_raw( diff --git a/pyproject.toml b/pyproject.toml index 38811bc866..881dffa749 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.2-dev03" +version = "0.12.2-dev04" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"