Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LITE-28676 Fix error handling for calls to CBC #121

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions connect_ext_ppr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,5 @@
"required": ["hierarchical_files_data"]
}}
"""

DELAY_SECONDS_BETWEEN_TASKS = 10
2 changes: 1 addition & 1 deletion connect_ext_ppr/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def add_new_deployment_request(db, dr_data, deployment, account_id, logger):
))
tasks.append(Task(
deployment_request_id=deployment_request.id,
title='Apply PP and delegate to marketplaces',
title='Apply PPR and delegate to marketplaces',
type=Task.TYPES.apply_and_delegate,
created_by=account_id,
))
Expand Down
7 changes: 4 additions & 3 deletions connect_ext_ppr/services/cbc_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def update_product(self, product_id: str):
)

def parse_ppr(self, file: BufferedReader):
file.seek(0)
base64_content = base64.b64encode(file.read()).decode('ascii')

return self.plm_service.action(
Expand Down Expand Up @@ -175,9 +176,9 @@ def search_task_logs_by_name(self, partial_name: str):

def parse_price_file(
self,
account_id: int,
vendor_id: str,
file: BufferedReader,
account_id: int,
vendor_id: str,
file: BufferedReader,
) -> Dict:
"""

Expand Down
150 changes: 83 additions & 67 deletions connect_ext_ppr/tasks_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from sqlalchemy.orm import joinedload, selectinload

from connect_ext_ppr.client.exception import CBCClientError
from connect_ext_ppr.constants import PPR_FILE_NAME_DELEGATION_L2, PPR_FILE_NAME_UPDATE_MARKETPLACES
from connect_ext_ppr.constants import (
DELAY_SECONDS_BETWEEN_TASKS,
PPR_FILE_NAME_DELEGATION_L2,
PPR_FILE_NAME_UPDATE_MARKETPLACES,
)
from connect_ext_ppr.db import get_cbc_extension_db, get_cbc_extension_db_engine, get_db_ctx_manager
from connect_ext_ppr.models.configuration import Configuration
from connect_ext_ppr.models.enums import (
Expand Down Expand Up @@ -65,7 +69,10 @@ def _execute_with_retries(function, func_kwargs, num_retries=5):
num_retries=num_retries,
)
except CBCClientError as ex:
raise TaskException(str(ex))
message = ex.message
if ex.json:
message += f' - {ex.json.get("message")}'
raise TaskException(message)


def _send_ppr(cbc_service, file: BufferedReader):
Expand All @@ -89,11 +96,11 @@ def _check_cbc_task_status(cbc_service, tracking_id):
task_log = _execute_with_retries(
cbc_service.search_task_logs_by_name, func_kwargs={'partial_name': tracking_id},
)
# Setting this first default value in case takes time to create it in extenal system.
# Setting this first default value in case takes time to create it in external system.
task_log = task_log[0] if task_log else {'status': CBCTaskLogStatus.not_started}

while task_log['status'] in (CBCTaskLogStatus.not_started, CBCTaskLogStatus.running):
time.sleep(10)
time.sleep(DELAY_SECONDS_BETWEEN_TASKS)
task_log = _execute_with_retries(
cbc_service.search_task_logs_by_name, {'partial_name': tracking_id})[0]

Expand All @@ -103,6 +110,29 @@ def _check_cbc_task_status(cbc_service, tracking_id):
raise TaskException(f'Something went wrong with task: {tracking_id}')


def check_and_update_product(deployment_request, cbc_service, **kwargs):
if not deployment_request.manually:

product_id = deployment_request.deployment.product_id

response = _execute_with_retries(
cbc_service.get_product_details, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

if response.get('isUpdateAvailable'):
response = _execute_with_retries(
cbc_service.update_product, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

return True


def prepare_ppr_file_for_task(
connect_client,
file_data,
Expand Down Expand Up @@ -153,8 +183,6 @@ def prepare_ppr_file_for_task(
file_obj.read(),
file_size,
)

file_obj.seek(0)
return file_obj

except (FileNotFoundError, ValueError, KeyError) as e:
Expand All @@ -163,34 +191,12 @@ def prepare_ppr_file_for_task(
raise TaskException(f'Error while connecting to Connect: {e.message}')


def check_and_update_product(deployment_request, cbc_service, **kwargs):
if not deployment_request.manually:

product_id = deployment_request.deployment.product_id

response = _execute_with_retries(
cbc_service.get_product_details, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

if response.get('isUpdateAvailable'):
response = _execute_with_retries(
cbc_service.update_product, func_kwargs={'product_id': product_id},
)

if 'error' in response.keys():
raise Exception(response['error'])

return True


def apply_ppr_and_delegate_to_marketplaces(
deployment_request,
cbc_service,
connect_client,
db,
logger,
**kwargs,
):
"""Task sends PPR to the Commerce and updates marketplaces in DB
Expand All @@ -199,6 +205,7 @@ def apply_ppr_and_delegate_to_marketplaces(
:param cbc_service: CBC service
:param connect_client: Connect client
:param db: dbsession
:param logger: logger
:rtype bool
:raises TaskException
"""
Expand Down Expand Up @@ -255,6 +262,10 @@ def apply_ppr_and_delegate_to_marketplaces(
)

tracking_id = _send_ppr(cbc_service, file)
logger.debug(
f'apply_ppr_and_delegate_to_marketplaces dr_id: {deployment_request.id},'
f' tracking_id: {tracking_id}',
)
file.close()
_check_cbc_task_status(cbc_service, tracking_id)

Expand All @@ -267,6 +278,47 @@ def apply_ppr_and_delegate_to_marketplaces(
return True


def delegate_to_l2(deployment_request, cbc_service, connect_client, logger, **kwargs):
"""Task delegates PPR to L2 in Commerce

:param deployment_request: DeploymentRequest model
:param cbc_service: CBC service
:param connect_client: Connect client
:param logger: logger
:rtype bool
:raises TaskException
"""
if deployment_request.manually:
return True

ppr_file_id = deployment_request.ppr.file
deployment = deployment_request.deployment
try:
file_data = get_ppr_from_media(
connect_client,
deployment.account_id,
deployment.id,
ppr_file_id,
)
except ClientError as e:
raise TaskException(f'Error while connecting to Connect: {e.message}')

file = prepare_ppr_file_for_task(
connect_client=connect_client,
file_data=file_data,
file_name_template=PPR_FILE_NAME_DELEGATION_L2,
deployment_request=deployment_request,
deployment=deployment,
process_func=process_ppr_file_for_delelegate_l2,
)

tracking_id = _send_ppr(cbc_service, file)
logger.debug(f'delegate_to_l2 dr_id: {deployment_request.id}, tracking_id: {tracking_id}')
file.close()
_check_cbc_task_status(cbc_service, tracking_id)
return True


def apply_pricelist_task(
deployment_request,
cbc_service,
Expand Down Expand Up @@ -343,44 +395,6 @@ def validate_pricelists_task(
return True


def delegate_to_l2(deployment_request, cbc_service, connect_client, **kwargs):
"""Task delegates PPR to L2 in Commerce

:param deployment_request: DeploymentRequest model
:param cbc_service: CBC service
:param connect_client: Connect client
:rtype bool
:raises TaskException
"""
if deployment_request.manually:
return True

ppr_file_id = deployment_request.ppr.file
deployment = deployment_request.deployment
try:
file_data = get_ppr_from_media(
connect_client,
deployment.account_id,
deployment.id,
ppr_file_id,
)
except ClientError as e:
raise TaskException(f'Error while connecting to Connect: {e.message}')

file = prepare_ppr_file_for_task(
connect_client=connect_client,
file_data=file_data,
file_name_template=PPR_FILE_NAME_DELEGATION_L2,
deployment_request=deployment_request,
deployment=deployment,
process_func=process_ppr_file_for_delelegate_l2,
)

tracking_id = _send_ppr(cbc_service, file)
file.close()
return _check_cbc_task_status(cbc_service, tracking_id)


TASK_PER_TYPE = {
TaskTypesChoices.product_setup: check_and_update_product,
TaskTypesChoices.apply_and_delegate: apply_ppr_and_delegate_to_marketplaces,
Expand Down Expand Up @@ -414,10 +428,12 @@ def execute_tasks(db, config, tasks, connect_client, logger):
connect_client=connect_client,
marketplace=task.marketplace,
db=db,
logger=logger,
)
task.status = TasksStatusChoices.done

except TaskException as ex:
logger.error(f'Task ID: {task.id} - {ex}')
was_succesfull = False
task.error_message = str(ex)[:4000]
except Exception as ex:
Expand Down
3 changes: 3 additions & 0 deletions connect_ext_ppr/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from collections import defaultdict
from functools import partial
from io import BytesIO
Expand All @@ -18,6 +19,7 @@
from connect_ext_ppr.constants import (
BASE_SCHEMA,
CONFIGURATION_SCHEMA_TEMPLATE,
DELAY_SECONDS_BETWEEN_TASKS,
PPR_SCHEMA,
SUMMARY_TEMPLATE,
)
Expand Down Expand Up @@ -682,6 +684,7 @@ def execute_with_retry(function, exception_class, args=None, kwargs=None, num_re
except exception_class:
if num_retries == 0:
raise
time.sleep(DELAY_SECONDS_BETWEEN_TASKS)


def get_mps_to_update_for_apply_ppr_and_delegate_to_marketplaces(
Expand Down
Loading