Skip to content

Commit

Permalink
Adding single retry to Insights jobs that time out. (#21)
Browse files Browse the repository at this point in the history
* Adding single retry to Insights jobs that time out.

* Changing retry pattern to use backoff.

* fix pylint errors

* Clarifying retry function as per code review
  • Loading branch information
dmosorast committed Sep 25, 2017
1 parent 0d84acf commit 3006d68
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions tap_facebook/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ def advance_bookmark(state, tap_stream_id, bookmark_key, date):
class InsightsJobTimeout(Exception):
pass

def log_insights_retry_attempt(details):
_, exception, _ = sys.exc_info()
LOGGER.info(exception)
LOGGER.info('Retrying Insights job...')

@attr.s
class AdsInsights(Stream):
field_class = adsinsights.AdsInsights.Field
Expand All @@ -219,11 +224,6 @@ def __attrs_post_init__(self):
if self.options.get('primary-keys'):
self.key_properties.extend(self.options['primary-keys'])

@backoff.on_exception(
backoff.expo,
(InsightsJobTimeout),
max_tries=3,
factor=2)
def job_params(self):
start_date = pendulum.parse(get_start(self.state, self.name, self.bookmark_key))

Expand Down Expand Up @@ -252,7 +252,13 @@ def job_params(self):
}
buffered_start_date = buffered_start_date.add(days=1)


@backoff.on_exception(
backoff.constant,
InsightsJobTimeout,
max_tries=2,
interval=0,
on_backoff=log_insights_retry_attempt
)
def run_job(self, params):
LOGGER.info('Starting adsinsights job with params %s', params)
job = self.account.get_insights( # pylint: disable=no-member
Expand All @@ -279,17 +285,18 @@ def run_job(self, params):
job_id, INSIGHTS_MAX_WAIT_TO_START_SECONDS))
elif duration > INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS and status != "Job Completed":
pretty_error_message = ('Insights job {} did not complete after {} minutes. ' +
'This is an intermittent error and may resolve itself on subsequent queries to the Facebook API. ' +
'You should deselect fields from the schema that are not necessary, ' +
'as that may help improve the reliability of the Facebook API.')
raise Exception(pretty_error_message.format(job_id, INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS//60))
'This is an intermittent error and may resolve itself on subsequent queries to the Facebook API. ' +
'You should deselect fields from the schema that are not necessary, ' +
'as that may help improve the reliability of the Facebook API.')
raise InsightsJobTimeout(pretty_error_message.format(job_id,
INSIGHTS_MAX_WAIT_TO_FINISH_SECONDS//60))

LOGGER.info("sleeping for %d seconds until job is done", sleep_time)
time.sleep(sleep_time)
if sleep_time < INSIGHTS_MAX_ASYNC_SLEEP_SECONDS:
sleep_time = 2 * sleep_time
return job


def __iter__(self):
for params in self.job_params():
with metrics.job_timer('insights'):
Expand Down

0 comments on commit 3006d68

Please sign in to comment.