Skip to content

Commit

Permalink
Add backoff and retry to RAWR tile enqueuing.
Browse files Browse the repository at this point in the history
Also adds more information about the cause of the failure, as returned in the SQS failed messages.
  • Loading branch information
Matt Amos committed Dec 8, 2017
1 parent 0678474 commit f77743b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,7 @@ def tilequeue_rawr_seed_all(cfg, peripherals):
coords = []
for x in xrange(0, max_coord):
for y in xrange(0, max_coord):
coords.extend(Coordinate(zoom=group_by_zoom, column=x, row=y))
coords.append(Coordinate(zoom=group_by_zoom, column=x, row=y))

_tilequeue_rawr_seed(cfg, peripherals, coords)

Expand Down
58 changes: 50 additions & 8 deletions tilequeue/rawr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ def __init__(self, sqs_client, queue_url, recv_wait_time_seconds):
self.queue_url = queue_url
self.recv_wait_time_seconds = recv_wait_time_seconds

def send(self, payloads):
def send_without_retry(self, payloads):
"""
enqueue a sequence of payloads to the sqs queue
Each payload is already expected to be pre-formatted for the queue. At
this time, it should be a comma separated list of coordinates strings
that are grouped by their parent zoom.
This version does not retry, and returns any failed messages.
"""
msgs = []
for i, payload in enumerate(payloads):
Expand All @@ -54,12 +56,52 @@ def send(self, payloads):
raise Exception('Invalid status code from sqs: %s' %
resp['ResponseMetadata']['HTTPStatusCode'])
failed_messages = resp.get('Failed')
if failed_messages:
# TODO maybe retry failed messages if not sender's fault? up to a
# certain maximum number of attempts?
# http://boto3.readthedocs.io/en/latest/reference/services/sqs.html#SQS.Client.send_message_batch # noqa
raise Exception('Messages failed to send to sqs: %s' %
len(failed_messages))
return failed_messages

def send(self, payloads, logger, num_tries=5):
"""
Enqueue payloads to the SQS queue, retrying failed messages with
exponential backoff.
"""
from time import sleep

backoff_interval = 1
backoff_factor = 2

for try_counter in xrange(0, num_tries):
failed_messages = self.send_without_retry(payloads)

# success!
if not failed_messages:
payloads = []
break

# output some information about the failures for debugging
# purposes. we expect failures to be quite rare, so we can be
# pretty verbose.
if logger:
for msg in failed_messages:
logger.warning("Failed to send message on try %d: Id=%r, "
"SenderFault=%r, Code=%r, Message=%r" %
(try_counter, msg['Id'],
msg.get('SenderFault'), msg.get('Code'),
msg.get('Message')))

# wait a little while, in case the problem is that we're talking
# too fast.
sleep(backoff_interval)
backoff_interval *= backoff_factor

# filter out the failed payloads for retry
retry_payloads = []
for msg in failed_messages:
i = int(msg['Id'])
retry_payloads.append(payloads[i])
payloads = retry_payloads

if payloads:
raise Exception('Messages failed to send to sqs after %d '
'retries: %s' % (num_tries, len(payloads)))

def read(self):
"""read a single message from the queue"""
Expand Down Expand Up @@ -144,7 +186,7 @@ def __call__(self, coords):
rawr_queue_batch_size = 10
n_msgs_sent = 0
for payloads_chunk in grouper(payloads, rawr_queue_batch_size):
self.rawr_queue.send(payloads_chunk)
self.rawr_queue.send(payloads_chunk, self.logger)
n_msgs_sent += 1

if self.logger:
Expand Down

0 comments on commit f77743b

Please sign in to comment.