Skip to content
This repository was archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
Retry when HTTP 429 (Ads Customer Match)
Browse files Browse the repository at this point in the history
  • Loading branch information
diogoaihara committed Oct 14, 2022
1 parent be2c2c7 commit 30e2b57
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 11 deletions.
52 changes: 45 additions & 7 deletions megalista_dataflow/steps/processing_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

import apache_beam as beam

from typing import List

from .megalista_step import MegalistaStep
from sources.batches_from_executions import BatchesFromExecutions, TransactionalType
from models.execution import DestinationType
from models.execution import DestinationType, Execution
from error.error_handling import ErrorHandler

from uploaders.support.transactional_events_results_writer import TransactionalEventsResultsWriter
Expand Down Expand Up @@ -49,6 +51,10 @@
ADS_CM_HASHER = AdsUserListPIIHashingMapper()
DV_CM_HASHER = DVUserListPIIHashingMapper()

def inform_finish_of_execution_filter(el: Execution):
logging.get_logger('megalista.ProcessingSteps').info('Execution finished', execution=el)
return el

class GoogleAdsSSDStep(MegalistaStep):
def expand(self, executions):
return (
Expand All @@ -69,6 +75,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.ADS_SSD_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -92,6 +100,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -115,6 +125,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -138,6 +150,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.ADS_CUSTOMER_MATCH_USER_ID_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -164,7 +178,10 @@ def expand(self, executions):
>> TransactionalEventsResultsWriter(
self.params._dataflow_options,
TransactionalType.GCLID_TIME,
ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, self.params.error_notifier))
ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION, self.params.error_notifier)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -191,7 +208,10 @@ def expand(self, executions):
>> TransactionalEventsResultsWriter(
self.params._dataflow_options,
TransactionalType.GCLID_TIME,
ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION_CALLS, self.params.error_notifier))
ErrorHandler(DestinationType.ADS_OFFLINE_CONVERSION_CALLS, self.params.error_notifier)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -209,7 +229,10 @@ def expand(self, executions):
| "Upload - GA user list"
>> beam.ParDo(GoogleAnalyticsUserListUploaderDoFn(self.params._oauth_credentials,
ErrorHandler(DestinationType.GA_USER_LIST_UPLOAD,
self.params.error_notifier)))
self.params.error_notifier))
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -234,6 +257,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.GA_DATA_IMPORT,
self.params.error_notifier))
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -255,7 +280,10 @@ def expand(self, executions):
>> TransactionalEventsResultsWriter(
self.params._dataflow_options,
TransactionalType.UUID,
ErrorHandler(DestinationType.GA_MEASUREMENT_PROTOCOL, self.params.error_notifier))
ErrorHandler(DestinationType.GA_MEASUREMENT_PROTOCOL, self.params.error_notifier)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -277,7 +305,10 @@ def expand(self, executions):
>> TransactionalEventsResultsWriter(
self.params._dataflow_options,
TransactionalType.UUID,
ErrorHandler(DestinationType.GA_4_MEASUREMENT_PROTOCOL, self.params.error_notifier))
ErrorHandler(DestinationType.GA_4_MEASUREMENT_PROTOCOL, self.params.error_notifier)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -302,7 +333,10 @@ def expand(self, executions):
>> TransactionalEventsResultsWriter(
self.params._dataflow_options,
TransactionalType.UUID,
ErrorHandler(DestinationType.CM_OFFLINE_CONVERSION, self.params.error_notifier))
ErrorHandler(DestinationType.CM_OFFLINE_CONVERSION, self.params.error_notifier)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)

class DisplayVideoCustomerMatchDeviceIdStep(MegalistaStep):
Expand All @@ -325,6 +359,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.DV_CUSTOMER_MATCH_DEVICE_ID_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)


Expand All @@ -348,6 +384,8 @@ def expand(self, executions):
ErrorHandler(DestinationType.DV_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD, self.params.error_notifier)
)
)
| "Finish"
>> beam.Filter(inform_finish_of_execution_filter)
)

PROCESSING_STEPS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
from uploaders import utils
from uploaders.google_ads import ADS_API_VERSION
from uploaders.uploaders import MegalistaUploader
from google.api_core.exceptions import ResourceExhausted
from time import sleep

_DEFAULT_LOGGER: str = 'megalista.GoogleAdsCustomerMatchAbstractUploader'

_DEFAULT_TIMEOUT: int = 60
_MAX_RETRIES: int = 6

class GoogleAdsCustomerMatchAbstractUploaderDoFn(MegalistaUploader):

Expand Down Expand Up @@ -236,9 +239,26 @@ def process(self, batch: Batch, **kwargs):
'operations': operations
}

data_insertion_response = offline_user_data_job_service.add_offline_user_data_job_operations(
request=data_insertion_payload)

timeout_multiplier = 1
data_insertion_response = None
retry = 0
while data_insertion_response is None:
try:
data_insertion_response = offline_user_data_job_service.add_offline_user_data_job_operations(
request=data_insertion_payload)
except ResourceExhausted as e:
if retry < _MAX_RETRIES:
retry = retry + 1
logging.get_logger(_DEFAULT_LOGGER).warning(
f'Resource Exhausted exception: {e}. Retrying... ({retry} of {_MAX_RETRIES})', execution=execution)
sleep(timeout_multiplier * _DEFAULT_TIMEOUT)
timeout_multiplier = timeout_multiplier * 2

else:
logging.get_logger(_DEFAULT_LOGGER).warning(
f'Resource Exhausted exception: {e}. No retries left.', execution=execution)
raise e

error_message = utils.print_partial_error_messages(_DEFAULT_LOGGER, 'uploading customer match',
data_insertion_response)
if error_message:
Expand Down
2 changes: 2 additions & 0 deletions megalista_dataflow/uploaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from models.execution import Batch
from uploaders.uploaders import MegalistaUploader
from unittest.mock import ANY, MagicMock
from google.ads.googleads.errors import GoogleAdsException

MAX_RETRIES = 3

Expand Down Expand Up @@ -78,6 +79,7 @@ def inner(*args, **kwargs):
except Exception as e:
self_._add_error(batch.execution, f'Error uploading data: {e}')
logger.warning(f'Error uploading data for :{batch.elements}', execution=batch.execution)
logger.warning(f'Exception type: {type(e).__name__}', execution=batch.execution)
logger.warning(e, exc_info=True, execution=batch.execution)
# logger.exception('Error uploading data.', execution=batch.execution)

Expand Down

0 comments on commit 30e2b57

Please sign in to comment.