Skip to content

Commit

Permalink
Merge pull request #318 from tilezen/zerebubuth/retry-sqs-sending
Browse files Browse the repository at this point in the history
Add backoff and retry to RAWR tile enqueuing.
  • Loading branch information
zerebubuth authored Dec 8, 2017
2 parents 0678474 + f77743b commit d446a31
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 9 deletions.
2 changes: 1 addition & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,7 @@ def tilequeue_rawr_seed_all(cfg, peripherals):
coords = []
for x in xrange(0, max_coord):
for y in xrange(0, max_coord):
coords.extend(Coordinate(zoom=group_by_zoom, column=x, row=y))
coords.append(Coordinate(zoom=group_by_zoom, column=x, row=y))

_tilequeue_rawr_seed(cfg, peripherals, coords)

Expand Down
58 changes: 50 additions & 8 deletions tilequeue/rawr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ def __init__(self, sqs_client, queue_url, recv_wait_time_seconds):
self.queue_url = queue_url
self.recv_wait_time_seconds = recv_wait_time_seconds

def send(self, payloads):
def send_without_retry(self, payloads):
"""
enqueue a sequence of payloads to the sqs queue
Each payload is already expected to be pre-formatted for the queue. At
this time, it should be a comma separated list of coordinates strings
that are grouped by their parent zoom.
This version does not retry, and returns any failed messages.
"""
msgs = []
for i, payload in enumerate(payloads):
Expand All @@ -54,12 +56,52 @@ def send(self, payloads):
raise Exception('Invalid status code from sqs: %s' %
resp['ResponseMetadata']['HTTPStatusCode'])
failed_messages = resp.get('Failed')
if failed_messages:
# TODO maybe retry failed messages if not sender's fault? up to a
# certain maximum number of attempts?
# http://boto3.readthedocs.io/en/latest/reference/services/sqs.html#SQS.Client.send_message_batch # noqa
raise Exception('Messages failed to send to sqs: %s' %
len(failed_messages))
return failed_messages

def send(self, payloads, logger, num_tries=5):
"""
Enqueue payloads to the SQS queue, retrying failed messages with
exponential backoff.
"""
from time import sleep

backoff_interval = 1
backoff_factor = 2

for try_counter in xrange(0, num_tries):
failed_messages = self.send_without_retry(payloads)

# success!
if not failed_messages:
payloads = []
break

# output some information about the failures for debugging
# purposes. we expect failures to be quite rare, so we can be
# pretty verbose.
if logger:
for msg in failed_messages:
logger.warning("Failed to send message on try %d: Id=%r, "
"SenderFault=%r, Code=%r, Message=%r" %
(try_counter, msg['Id'],
msg.get('SenderFault'), msg.get('Code'),
msg.get('Message')))

# wait a little while, in case the problem is that we're talking
# too fast.
sleep(backoff_interval)
backoff_interval *= backoff_factor

# filter out the failed payloads for retry
retry_payloads = []
for msg in failed_messages:
i = int(msg['Id'])
retry_payloads.append(payloads[i])
payloads = retry_payloads

if payloads:
raise Exception('Messages failed to send to sqs after %d '
'retries: %s' % (num_tries, len(payloads)))

def read(self):
"""read a single message from the queue"""
Expand Down Expand Up @@ -144,7 +186,7 @@ def __call__(self, coords):
rawr_queue_batch_size = 10
n_msgs_sent = 0
for payloads_chunk in grouper(payloads, rawr_queue_batch_size):
self.rawr_queue.send(payloads_chunk)
self.rawr_queue.send(payloads_chunk, self.logger)
n_msgs_sent += 1

if self.logger:
Expand Down

0 comments on commit d446a31

Please sign in to comment.