From f77743b06809ec57e4c376fef1ba2a7e62767c52 Mon Sep 17 00:00:00 2001 From: Matt Amos Date: Fri, 8 Dec 2017 19:02:15 +0000 Subject: [PATCH] Add backoff and retry to RAWR tile enqueuing. Also adds more information about the cause of the failure, as returned in the SQS failed messages. --- tilequeue/command.py | 2 +- tilequeue/rawr.py | 58 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/tilequeue/command.py b/tilequeue/command.py index 87d0b461..81821c6e 100755 --- a/tilequeue/command.py +++ b/tilequeue/command.py @@ -1851,7 +1851,7 @@ def tilequeue_rawr_seed_all(cfg, peripherals): coords = [] for x in xrange(0, max_coord): for y in xrange(0, max_coord): - coords.extend(Coordinate(zoom=group_by_zoom, column=x, row=y)) + coords.append(Coordinate(zoom=group_by_zoom, column=x, row=y)) _tilequeue_rawr_seed(cfg, peripherals, coords) diff --git a/tilequeue/rawr.py b/tilequeue/rawr.py index 86aaf33c..aaa51fdd 100644 --- a/tilequeue/rawr.py +++ b/tilequeue/rawr.py @@ -30,13 +30,15 @@ def __init__(self, sqs_client, queue_url, recv_wait_time_seconds): self.queue_url = queue_url self.recv_wait_time_seconds = recv_wait_time_seconds - def send(self, payloads): + def send_without_retry(self, payloads): """ enqueue a sequence of payloads to the sqs queue Each payload is already expected to be pre-formatted for the queue. At this time, it should be a comma separated list of coordinates strings that are grouped by their parent zoom. + + This version does not retry, and returns any failed messages. """ msgs = [] for i, payload in enumerate(payloads): @@ -54,12 +56,52 @@ def send(self, payloads): raise Exception('Invalid status code from sqs: %s' % resp['ResponseMetadata']['HTTPStatusCode']) failed_messages = resp.get('Failed') - if failed_messages: - # TODO maybe retry failed messages if not sender's fault? up to a - # certain maximum number of attempts? - # http://boto3.readthedocs.io/en/latest/reference/services/sqs.html#SQS.Client.send_message_batch # noqa - raise Exception('Messages failed to send to sqs: %s' % - len(failed_messages)) + return failed_messages + + def send(self, payloads, logger, num_tries=5): + """ + Enqueue payloads to the SQS queue, retrying failed messages with + exponential backoff. + """ + from time import sleep + + backoff_interval = 1 + backoff_factor = 2 + + for try_counter in xrange(0, num_tries): + failed_messages = self.send_without_retry(payloads) + + # success! + if not failed_messages: + payloads = [] + break + + # output some information about the failures for debugging + # purposes. we expect failures to be quite rare, so we can be + # pretty verbose. + if logger: + for msg in failed_messages: + logger.warning("Failed to send message on try %d: Id=%r, " + "SenderFault=%r, Code=%r, Message=%r" % + (try_counter, msg['Id'], + msg.get('SenderFault'), msg.get('Code'), + msg.get('Message'))) + + # wait a little while, in case the problem is that we're talking + # too fast. + sleep(backoff_interval) + backoff_interval *= backoff_factor + + # filter out the failed payloads for retry + retry_payloads = [] + for msg in failed_messages: + i = int(msg['Id']) + retry_payloads.append(payloads[i]) + payloads = retry_payloads + + if payloads: + raise Exception('Messages failed to send to sqs after %d ' + 'retries: %s' % (num_tries, len(payloads))) def read(self): """read a single message from the queue""" @@ -144,7 +186,7 @@ def __call__(self, coords): rawr_queue_batch_size = 10 n_msgs_sent = 0 for payloads_chunk in grouper(payloads, rawr_queue_batch_size): - self.rawr_queue.send(payloads_chunk) + self.rawr_queue.send(payloads_chunk, self.logger) n_msgs_sent += 1 if self.logger: