Skip to content

Commit

Permalink
Reverted use of asyncio.Lock, just catch exceptions instead (#944)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Chong <[email protected]>
  • Loading branch information
aaronchongth authored May 8, 2024
1 parent 1f14af3 commit 35a9b0d
Showing 1 changed file with 38 additions and 37 deletions.
75 changes: 38 additions & 37 deletions packages/api-server/api_server/repositories/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import sys
from datetime import datetime
from typing import Dict, List, Optional, Sequence, Tuple, cast
Expand Down Expand Up @@ -37,7 +36,6 @@ def __init__(
):
self.user = user
self.logger = logger
self.save_task_state_mutex = asyncio.Lock()

def parse_pickup(self, task_request: TaskRequest) -> Optional[str]:
# patrol
Expand Down Expand Up @@ -165,46 +163,49 @@ async def query_task_requests(self, task_ids: List[str]) -> List[DbTaskRequest]:
raise HTTPException(422, str(e)) from e

async def save_task_state(self, task_state: TaskState) -> None:
# FIXME: If the task dispatcher is also provided websocket access to
# the API server, when a new task is dispatched via the API server,
# there may be a race condition where both the ROS 2 task response and
# task dispatcher websocket update may attempt to create a new task
# state model with the same task ID. This have unfortunately not been
# reproducible locally, only in the production environment, which uses
# Postgres instead of sqlite. This may be fixed upstream in DB or ORM,
# this mutex can be removed once these libraries have been updated and
# tested to be fixed.
async with self.save_task_state_mutex:
task_state_dict = {
"data": task_state.json(),
"category": task_state.category.__root__
if task_state.category
else None,
"assigned_to": task_state.assigned_to.name
if task_state.assigned_to
else None,
"unix_millis_start_time": task_state.unix_millis_start_time
and datetime.fromtimestamp(task_state.unix_millis_start_time / 1000),
"unix_millis_finish_time": task_state.unix_millis_finish_time
and datetime.fromtimestamp(task_state.unix_millis_finish_time / 1000),
"status": task_state.status if task_state.status else None,
"unix_millis_request_time": task_state.booking.unix_millis_request_time
and datetime.fromtimestamp(
task_state.booking.unix_millis_request_time / 1000
),
"requester": task_state.booking.requester
if task_state.booking.requester
else None,
}
task_state_dict = {
"data": task_state.json(),
"category": task_state.category.__root__ if task_state.category else None,
"assigned_to": task_state.assigned_to.name
if task_state.assigned_to
else None,
"unix_millis_start_time": task_state.unix_millis_start_time
and datetime.fromtimestamp(task_state.unix_millis_start_time / 1000),
"unix_millis_finish_time": task_state.unix_millis_finish_time
and datetime.fromtimestamp(task_state.unix_millis_finish_time / 1000),
"status": task_state.status if task_state.status else None,
"unix_millis_request_time": task_state.booking.unix_millis_request_time
and datetime.fromtimestamp(
task_state.booking.unix_millis_request_time / 1000
),
"requester": task_state.booking.requester
if task_state.booking.requester
else None,
}

if task_state.unix_millis_warn_time is not None:
task_state_dict["unix_millis_warn_time"] = datetime.fromtimestamp(
task_state.unix_millis_warn_time / 1000
)
if task_state.unix_millis_warn_time is not None:
task_state_dict["unix_millis_warn_time"] = datetime.fromtimestamp(
task_state.unix_millis_warn_time / 1000
)

try:
await ttm.TaskState.update_or_create(
task_state_dict, id_=task_state.booking.id
)
except Exception as e: # pylint: disable=W0703
# This is to catch a combination of exceptions from Tortoise ORM,
# especially in the case where a data race occurs when two instances
# of update_or_create attempts to create entries with the same task
# ID at the same time. The exceptions expected are DoesNotExist,
# IntegrityError and TransactionManagementError.
# This data race happens when the server attempts to record the RMF
# service call response and Task dispatcher's websocket push at
# almost the same time.
# FIXME: this has not been observed outside of production
# environment, and may be fixed upstream in updated libraries.
self.logger.error(
f"Failed to save task state of id [{task_state.booking.id}] [{e}]"
)

async def query_task_states(
self, query: QuerySet[DbTaskState], pagination: Optional[Pagination] = None
Expand Down

0 comments on commit 35a9b0d

Please sign in to comment.