From 3e1610a8b7eb27cc8f2a446c9f5b428eab6a5b5a Mon Sep 17 00:00:00 2001 From: Dale Cannon Date: Fri, 13 Dec 2024 16:08:23 +0000 Subject: [PATCH] Correct usage of select_for_update() to ensure rows will be locked --- publishing/models/packaged_workbasket.py | 122 ++++++++++++++--------- 1 file changed, 74 insertions(+), 48 deletions(-) diff --git a/publishing/models/packaged_workbasket.py b/publishing/models/packaged_workbasket.py index 08db73e3c..4e8874d35 100644 --- a/publishing/models/packaged_workbasket.py +++ b/publishing/models/packaged_workbasket.py @@ -31,6 +31,7 @@ from notifications.models import EnvelopeRejectedNotification from notifications.models import NotificationLog from publishing import models as publishing_models +from publishing.models.decorators import refresh_after from publishing.models.decorators import save_after from publishing.models.decorators import skip_notifications_if_disabled from publishing.models.state import ProcessingState @@ -615,6 +616,7 @@ def cds_notified_notification_log(self) -> NotificationLog: @atomic @create_envelope_on_new_top + @refresh_after def pop_top(self) -> "PackagedWorkBasket": """ Pop the top-most instance, shuffling all remaining queued instances @@ -623,23 +625,34 @@ def pop_top(self) -> "PackagedWorkBasket": Management of the popped instance's `processing_state` is not altered by this function and should be managed separately by the caller. """ - if self.position != 1: + + instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + pk=self.pk, + ) + + if instance.position != 1: raise PackagedWorkBasketInvalidQueueOperation( - "Unable to pop instance at position {self.position} in queue " + "Unable to pop instance at position {instance.position} in queue " "because it is not at position 1.", ) - PackagedWorkBasket.objects.select_for_update(nowait=True).filter( - position__gt=0, - ).update( + instance.position = 0 + instance.save() + + to_update = list( + PackagedWorkBasket.objects.select_for_update(nowait=True) + .filter(position__gt=1) + .values_list("pk", flat=True), + ) + PackagedWorkBasket.objects.filter(pk__in=to_update).update( position=F("position") - 1, ) - self.refresh_from_db() - return self + return instance @atomic @create_envelope_on_new_top + @refresh_after def remove_from_queue(self) -> "PackagedWorkBasket": """ Remove instance from the queue, shuffling all successive queued @@ -649,98 +662,111 @@ def remove_from_queue(self) -> "PackagedWorkBasket": this function and should be managed separately by the caller. """ - PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) - self.refresh_from_db() + instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + pk=self.pk, + ) - if self.position == 0: + if instance.position == 0: raise PackagedWorkBasketInvalidQueueOperation( "Unable to remove instance with a position value of 0 from " "queue because 0 indicates that it is not a queue member.", ) - current_position = self.position - self.position = 0 - self.save() + current_position = instance.position + instance.position = 0 + instance.save() - PackagedWorkBasket.objects.select_for_update(nowait=True).filter( - position__gt=current_position, - ).update( + to_update = list( + PackagedWorkBasket.objects.select_for_update(nowait=True) + .filter(position__gt=current_position) + .values_list("pk", flat=True), + ) + PackagedWorkBasket.objects.filter(pk__in=to_update).update( position=F("position") - 1, ) - self.refresh_from_db() - return self + return instance @atomic @create_envelope_on_new_top + @refresh_after def promote_to_top_position(self) -> "PackagedWorkBasket": """Promote the instance to the top position of the package processing queue so that it occupies position 1.""" - PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) - self.refresh_from_db() + instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + pk=self.pk, + ) - if self.position <= 1: - return self + if instance.position <= 1: + return instance - position = self.position + current_position = instance.position - PackagedWorkBasket.objects.select_for_update(nowait=True).filter( - Q(position__gte=1) & Q(position__lt=position), - ).update(position=F("position") + 1) + to_update = list( + PackagedWorkBasket.objects.select_for_update(nowait=True) + .filter(Q(position__gte=1) & Q(position__lt=current_position)) + .values_list("pk", flat=True), + ) + PackagedWorkBasket.objects.filter(pk__in=to_update).update( + position=F("position") + 1, + ) - self.position = 1 - self.save() - self.refresh_from_db() + instance.position = 1 + instance.save() - return self + return instance @atomic @create_envelope_on_new_top + @refresh_after def promote_position(self) -> "PackagedWorkBasket": """Promote the instance by one position up the package processing queue.""" - PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) - self.refresh_from_db() + instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + pk=self.pk, + ) - if self.position <= 1: - return self + if instance.position <= 1: + return instance obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - position=self.position - 1, + position=instance.position - 1, ) obj_to_swap.position += 1 - self.position -= 1 + instance.position -= 1 + PackagedWorkBasket.objects.bulk_update( - [self, obj_to_swap], + [instance, obj_to_swap], ["position"], ) - self.refresh_from_db() - return self + return instance @atomic @create_envelope_on_new_top + @refresh_after def demote_position(self) -> "PackagedWorkBasket": """Demote the instance by one position down the package processing queue.""" - PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk) - self.refresh_from_db() + instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get( + pk=self.pk, + ) - if self.position in {0, PackagedWorkBasket.objects.max_position()}: - return self + if instance.position in {0, PackagedWorkBasket.objects.max_position()}: + return instance obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get( - position=self.position + 1, + position=instance.position + 1, ) obj_to_swap.position -= 1 - self.position += 1 + instance.position += 1 + PackagedWorkBasket.objects.bulk_update( - [self, obj_to_swap], + [instance, obj_to_swap], ["position"], ) - self.refresh_from_db() - return self + return instance