Skip to content

Commit

Permalink
Correct usage of select_for_update() to ensure rows will be locked
Browse files Browse the repository at this point in the history
  • Loading branch information
dalecannon committed Dec 13, 2024
1 parent e02ba2f commit 3e1610a
Showing 1 changed file with 74 additions and 48 deletions.
122 changes: 74 additions & 48 deletions publishing/models/packaged_workbasket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from notifications.models import EnvelopeRejectedNotification
from notifications.models import NotificationLog
from publishing import models as publishing_models
from publishing.models.decorators import refresh_after
from publishing.models.decorators import save_after
from publishing.models.decorators import skip_notifications_if_disabled
from publishing.models.state import ProcessingState
Expand Down Expand Up @@ -615,6 +616,7 @@ def cds_notified_notification_log(self) -> NotificationLog:

@atomic
@create_envelope_on_new_top
@refresh_after
def pop_top(self) -> "PackagedWorkBasket":
"""
Pop the top-most instance, shuffling all remaining queued instances
Expand All @@ -623,23 +625,34 @@ def pop_top(self) -> "PackagedWorkBasket":
Management of the popped instance's `processing_state` is not altered by
this function and should be managed separately by the caller.
"""
if self.position != 1:

instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if instance.position != 1:
raise PackagedWorkBasketInvalidQueueOperation(
"Unable to pop instance at position {self.position} in queue "
"Unable to pop instance at position {instance.position} in queue "
"because it is not at position 1.",
)

PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=0,
).update(
instance.position = 0
instance.save()

to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(position__gt=1)
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
position=F("position") - 1,
)
self.refresh_from_db()

return self
return instance

@atomic
@create_envelope_on_new_top
@refresh_after
def remove_from_queue(self) -> "PackagedWorkBasket":
"""
Remove instance from the queue, shuffling all successive queued
Expand All @@ -649,98 +662,111 @@ def remove_from_queue(self) -> "PackagedWorkBasket":
this function and should be managed separately by the caller.
"""

PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()
instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if self.position == 0:
if instance.position == 0:
raise PackagedWorkBasketInvalidQueueOperation(
"Unable to remove instance with a position value of 0 from "
"queue because 0 indicates that it is not a queue member.",
)

current_position = self.position
self.position = 0
self.save()
current_position = instance.position
instance.position = 0
instance.save()

PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
position__gt=current_position,
).update(
to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(position__gt=current_position)
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
position=F("position") - 1,
)
self.refresh_from_db()

return self
return instance

@atomic
@create_envelope_on_new_top
@refresh_after
def promote_to_top_position(self) -> "PackagedWorkBasket":
"""Promote the instance to the top position of the package processing
queue so that it occupies position 1."""

PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()
instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if self.position <= 1:
return self
if instance.position <= 1:
return instance

position = self.position
current_position = instance.position

PackagedWorkBasket.objects.select_for_update(nowait=True).filter(
Q(position__gte=1) & Q(position__lt=position),
).update(position=F("position") + 1)
to_update = list(
PackagedWorkBasket.objects.select_for_update(nowait=True)
.filter(Q(position__gte=1) & Q(position__lt=current_position))
.values_list("pk", flat=True),
)
PackagedWorkBasket.objects.filter(pk__in=to_update).update(
position=F("position") + 1,
)

self.position = 1
self.save()
self.refresh_from_db()
instance.position = 1
instance.save()

return self
return instance

@atomic
@create_envelope_on_new_top
@refresh_after
def promote_position(self) -> "PackagedWorkBasket":
"""Promote the instance by one position up the package processing
queue."""

PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()
instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if self.position <= 1:
return self
if instance.position <= 1:
return instance

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
position=self.position - 1,
position=instance.position - 1,
)
obj_to_swap.position += 1
self.position -= 1
instance.position -= 1

PackagedWorkBasket.objects.bulk_update(
[self, obj_to_swap],
[instance, obj_to_swap],
["position"],
)
self.refresh_from_db()

return self
return instance

@atomic
@create_envelope_on_new_top
@refresh_after
def demote_position(self) -> "PackagedWorkBasket":
"""Demote the instance by one position down the package processing
queue."""

PackagedWorkBasket.objects.select_for_update(nowait=True).get(pk=self.pk)
self.refresh_from_db()
instance = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
pk=self.pk,
)

if self.position in {0, PackagedWorkBasket.objects.max_position()}:
return self
if instance.position in {0, PackagedWorkBasket.objects.max_position()}:
return instance

obj_to_swap = PackagedWorkBasket.objects.select_for_update(nowait=True).get(
position=self.position + 1,
position=instance.position + 1,
)
obj_to_swap.position -= 1
self.position += 1
instance.position += 1

PackagedWorkBasket.objects.bulk_update(
[self, obj_to_swap],
[instance, obj_to_swap],
["position"],
)
self.refresh_from_db()

return self
return instance

0 comments on commit 3e1610a

Please sign in to comment.