From 35a9b0dc36b8a4081b23c1625c11877e2b9f97c2 Mon Sep 17 00:00:00 2001 From: Aaron Chong Date: Wed, 8 May 2024 13:37:04 +0800 Subject: [PATCH] Reverted use of asyncio.Lock, just catch exceptions instead (#944) Signed-off-by: Aaron Chong --- .../api_server/repositories/tasks.py | 75 ++++++++++--------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/packages/api-server/api_server/repositories/tasks.py b/packages/api-server/api_server/repositories/tasks.py index c994ea0a4..5b8794e28 100644 --- a/packages/api-server/api_server/repositories/tasks.py +++ b/packages/api-server/api_server/repositories/tasks.py @@ -1,4 +1,3 @@ -import asyncio import sys from datetime import datetime from typing import Dict, List, Optional, Sequence, Tuple, cast @@ -37,7 +36,6 @@ def __init__( ): self.user = user self.logger = logger - self.save_task_state_mutex = asyncio.Lock() def parse_pickup(self, task_request: TaskRequest) -> Optional[str]: # patrol @@ -165,46 +163,49 @@ async def query_task_requests(self, task_ids: List[str]) -> List[DbTaskRequest]: raise HTTPException(422, str(e)) from e async def save_task_state(self, task_state: TaskState) -> None: - # FIXME: If the task dispatcher is also provided websocket access to - # the API server, when a new task is dispatched via the API server, - # there may be a race condition where both the ROS 2 task response and - # task dispatcher websocket update may attempt to create a new task - # state model with the same task ID. This have unfortunately not been - # reproducible locally, only in the production environment, which uses - # Postgres instead of sqlite. This may be fixed upstream in DB or ORM, - # this mutex can be removed once these libraries have been updated and - # tested to be fixed. - async with self.save_task_state_mutex: - task_state_dict = { - "data": task_state.json(), - "category": task_state.category.__root__ - if task_state.category - else None, - "assigned_to": task_state.assigned_to.name - if task_state.assigned_to - else None, - "unix_millis_start_time": task_state.unix_millis_start_time - and datetime.fromtimestamp(task_state.unix_millis_start_time / 1000), - "unix_millis_finish_time": task_state.unix_millis_finish_time - and datetime.fromtimestamp(task_state.unix_millis_finish_time / 1000), - "status": task_state.status if task_state.status else None, - "unix_millis_request_time": task_state.booking.unix_millis_request_time - and datetime.fromtimestamp( - task_state.booking.unix_millis_request_time / 1000 - ), - "requester": task_state.booking.requester - if task_state.booking.requester - else None, - } + task_state_dict = { + "data": task_state.json(), + "category": task_state.category.__root__ if task_state.category else None, + "assigned_to": task_state.assigned_to.name + if task_state.assigned_to + else None, + "unix_millis_start_time": task_state.unix_millis_start_time + and datetime.fromtimestamp(task_state.unix_millis_start_time / 1000), + "unix_millis_finish_time": task_state.unix_millis_finish_time + and datetime.fromtimestamp(task_state.unix_millis_finish_time / 1000), + "status": task_state.status if task_state.status else None, + "unix_millis_request_time": task_state.booking.unix_millis_request_time + and datetime.fromtimestamp( + task_state.booking.unix_millis_request_time / 1000 + ), + "requester": task_state.booking.requester + if task_state.booking.requester + else None, + } - if task_state.unix_millis_warn_time is not None: - task_state_dict["unix_millis_warn_time"] = datetime.fromtimestamp( - task_state.unix_millis_warn_time / 1000 - ) + if task_state.unix_millis_warn_time is not None: + task_state_dict["unix_millis_warn_time"] = datetime.fromtimestamp( + task_state.unix_millis_warn_time / 1000 + ) + try: await ttm.TaskState.update_or_create( task_state_dict, id_=task_state.booking.id ) + except Exception as e: # pylint: disable=W0703 + # This is to catch a combination of exceptions from Tortoise ORM, + # especially in the case where a data race occurs when two instances + # of update_or_create attempts to create entries with the same task + # ID at the same time. The exceptions expected are DoesNotExist, + # IntegrityError and TransactionManagementError. + # This data race happens when the server attempts to record the RMF + # service call response and Task dispatcher's websocket push at + # almost the same time. + # FIXME: this has not been observed outside of production + # environment, and may be fixed upstream in updated libraries. + self.logger.error( + f"Failed to save task state of id [{task_state.booking.id}] [{e}]" + ) async def query_task_states( self, query: QuerySet[DbTaskState], pagination: Optional[Pagination] = None