Skip to content

Commit 2eecdd2

Browse files
authored
fix(deletions): Retry fetching rows after timeouts when doing bulk deletes (#102369)
We're seeing periodic timeouts when deleting commits, but they also succeed most of the time. Adding in some retries to work around this. Trying this again since we saw flakes on master
1 parent 73bc978 commit 2eecdd2

File tree

3 files changed

+110
-1
lines changed

3 files changed

+110
-1
lines changed

src/sentry/db/deletion.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ def iterator(
122122
order_by=order_field,
123123
override_unique_safety_check=True,
124124
result_value_getter=lambda item: item[1],
125+
query_timeout_retries=10,
125126
)
126127

127128
for batch in itertools.batched(wrapper, chunk_size):

src/sentry/utils/query.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
from django.db.models.query_utils import Q
1313
from django.db.models.sql.constants import ROW_COUNT
1414
from django.db.models.sql.subqueries import DeleteQuery
15+
from django.db.utils import OperationalError
1516

1617
from sentry.db.models.base import Model
1718
from sentry.services import eventstore
19+
from sentry.utils.retries import ConditionalRetryPolicy
1820

1921
if TYPE_CHECKING:
2022
from sentry.services.eventstore.models import Event
@@ -109,6 +111,8 @@ def __init__[M: Model](
109111
callbacks: Sequence[Callable[[list[V]], None]] = (),
110112
result_value_getter: Callable[[V], int] | None = None,
111113
override_unique_safety_check: bool = False,
114+
query_timeout_retries: int | None = None,
115+
retry_delay_seconds: float = 0.5,
112116
):
113117
# Support for slicing
114118
if queryset.query.low_mark == 0 and not (
@@ -132,6 +136,8 @@ def __init__[M: Model](
132136
self.order_by = order_by
133137
self.callbacks = callbacks
134138
self.result_value_getter = result_value_getter
139+
self.query_timeout_retries = query_timeout_retries
140+
self.retry_delay_seconds = retry_delay_seconds
135141

136142
order_by_col = queryset.model._meta.get_field(order_by if order_by != "pk" else "id")
137143
if not override_unique_safety_check and (
@@ -176,7 +182,16 @@ def __iter__(self) -> Iterator[V]:
176182
else:
177183
results_qs = queryset.filter(**{"%s__gte" % self.order_by: cur_value})
178184

179-
results = list(results_qs[0 : self.step])
185+
if self.query_timeout_retries is not None:
186+
retries = self.query_timeout_retries
187+
retry_policy = ConditionalRetryPolicy(
188+
test_function=lambda attempt, exc: attempt <= retries
189+
and isinstance(exc, OperationalError),
190+
delay_function=lambda i: self.retry_delay_seconds,
191+
)
192+
results = retry_policy(lambda: list(results_qs[0 : self.step]))
193+
else:
194+
results = list(results_qs[0 : self.step])
180195

181196
for cb in self.callbacks:
182197
cb(results)

tests/sentry/utils/test_query.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
from unittest.mock import patch
2+
13
import pytest
24
from django.db import connections
5+
from django.db.utils import OperationalError
36

47
from sentry.db.models.query import in_iexact
58
from sentry.models.commit import Commit
@@ -78,6 +81,96 @@ def test_wrapper_over_values_list(self) -> None:
7881
qs = User.objects.all().values_list("id")
7982
assert list(qs) == list(self.range_wrapper(qs, result_value_getter=lambda r: r[0]))
8083

84+
def test_retry_on_operational_error_success_after_failures(self) -> None:
85+
"""Test that with query_timeout_retries=3, after 2 errors and 1 success it works."""
86+
total = 5
87+
for _ in range(total):
88+
self.create_user()
89+
90+
qs = User.objects.all()
91+
batch_attempts: list[int] = []
92+
current_batch_count = 0
93+
original_getitem = type(qs).__getitem__
94+
95+
def mock_getitem(self, slice_obj):
96+
nonlocal current_batch_count
97+
current_batch_count += 1
98+
if len(batch_attempts) == 0 and current_batch_count <= 2:
99+
raise OperationalError("canceling statement due to user request")
100+
if len(batch_attempts) == 0 and current_batch_count == 3:
101+
batch_attempts.append(current_batch_count)
102+
return original_getitem(self, slice_obj)
103+
104+
with patch.object(type(qs), "__getitem__", mock_getitem):
105+
results = list(
106+
self.range_wrapper(qs, step=10, query_timeout_retries=3, retry_delay_seconds=0.01)
107+
)
108+
109+
assert len(results) == total
110+
assert batch_attempts[0] == 3
111+
112+
def test_retry_exhausted_raises_exception(self) -> None:
113+
"""Test that after exhausting retries, the OperationalError is raised."""
114+
total = 5
115+
for _ in range(total):
116+
self.create_user()
117+
118+
qs = User.objects.all()
119+
120+
def always_fail(self, slice_obj):
121+
raise OperationalError("canceling statement due to user request")
122+
123+
with patch.object(type(qs), "__getitem__", always_fail):
124+
with pytest.raises(OperationalError, match="canceling statement due to user request"):
125+
list(
126+
self.range_wrapper(
127+
qs, step=10, query_timeout_retries=3, retry_delay_seconds=0.01
128+
)
129+
)
130+
131+
def test_retry_does_not_catch_other_exceptions(self) -> None:
132+
"""Test that non-OperationalError exceptions are not retried."""
133+
total = 5
134+
for _ in range(total):
135+
self.create_user()
136+
137+
qs = User.objects.all()
138+
139+
attempt_count = {"count": 0}
140+
141+
def raise_value_error(self, slice_obj):
142+
attempt_count["count"] += 1
143+
raise ValueError("Some other error")
144+
145+
with patch.object(type(qs), "__getitem__", raise_value_error):
146+
with pytest.raises(ValueError, match="Some other error"):
147+
list(
148+
self.range_wrapper(
149+
qs, step=10, query_timeout_retries=3, retry_delay_seconds=0.01
150+
)
151+
)
152+
assert attempt_count["count"] == 1
153+
154+
def test_no_retry_when_query_timeout_retries_is_none(self) -> None:
155+
"""Test that when query_timeout_retries is None, no retry logic is applied."""
156+
total = 5
157+
for _ in range(total):
158+
self.create_user()
159+
160+
qs = User.objects.all()
161+
162+
attempt_count = {"count": 0}
163+
164+
def fail_once(self, slice_obj):
165+
attempt_count["count"] += 1
166+
raise OperationalError("canceling statement due to user request")
167+
168+
with patch.object(type(qs), "__getitem__", fail_once):
169+
with pytest.raises(OperationalError, match="canceling statement due to user request"):
170+
list(self.range_wrapper(qs, step=10, query_timeout_retries=None))
171+
172+
assert attempt_count["count"] == 1
173+
81174

82175
@no_silo_test
83176
class RangeQuerySetWrapperWithProgressBarTest(RangeQuerySetWrapperTest):

0 commit comments

Comments
 (0)