diff --git a/.circleci/config.yml b/.circleci/config.yml index 828702db..529237a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1,68 +1,166 @@ version: 2.1 + orbs: slack: circleci/slack@3.4.2 +executors: + docker-executor: + docker: + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester + jobs: build: - docker: - - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4 + executor: docker-executor + steps: + - run: echo 'CI done' + ensure_env: + executor: docker-executor steps: - checkout - run: - name: 'Setup virtual env' + name: 'Setup Virtual Env' command: | - python3 -mvenv /usr/local/share/virtualenvs/tap-facebook + python3 -m venv /usr/local/share/virtualenvs/tap-facebook source /usr/local/share/virtualenvs/tap-facebook/bin/activate - pip install -U pip setuptools + pip install 'pip==23.3.2' + pip install 'setuptools==56.0.0' pip install .[dev] + - slack/notify-on-failure: + only_for_branches: master + - persist_to_workspace: + root: /usr/local/share/virtualenvs + paths: + - tap-facebook + # Regression Job Definitions + run_pylint: + executor: docker-executor + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs - run: - name: 'pylint' + name: 'Run pylint' command: | source /usr/local/share/virtualenvs/tap-facebook/bin/activate pylint tap_facebook -d C,R,W + - slack/notify-on-failure: + only_for_branches: master + run_unit_tests: + executor: docker-executor + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs - run: - when: always - name: 'Unit Tests' + name: 'Run Unit Tests' command: | source /usr/local/share/virtualenvs/tap-facebook/bin/activate - nosetests tests/unittests - - add_ssh_keys + pip install nose2 + nose2 -v -s tests/unittests + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov + - slack/notify-on-failure: + only_for_branches: master + run_integration_tests: + executor: docker-executor + parallelism: 9 + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs - run: - name: 'Integration Tests' + name: 'Run Integration Tests' command: | aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh + mkdir /tmp/${CIRCLE_PROJECT_REPONAME} + export STITCH_CONFIG_DIR=/tmp/${CIRCLE_PROJECT_REPONAME} source /usr/local/share/virtualenvs/tap-tester/bin/activate - run-test --tap=tap-facebook \ - --target=target-stitch \ - --orchestrator=stitch-orchestrator \ - --email=harrison+sandboxtest@stitchdata.com \ - --password=$SANDBOX_PASSWORD \ - --client-id=50 \ - tests - - run: - name: 'pylint tests' - command: | - source /usr/local/share/virtualenvs/tap-tester/bin/activate - pip install pylint - pylint tests/*.py -d 'broad-except,chained-comparison,empty-docstring,fixme,invalid-name,line-too-long,missing-class-docstring,missing-function-docstring,missing-module-docstring,no-else-raise,no-else-return,too-few-public-methods,too-many-arguments,too-many-branches,too-many-lines,too-many-locals,ungrouped-imports,wrong-spelling-in-comment,wrong-spelling-in-docstring,duplicate-code,no-name-in-module,attribute-defined-outside-init,too-many-statements,cell-var-from-loop,too-many-public-methods,missing-docstring,use-a-generator' + circleci tests glob "tests/*.py" | circleci tests split > ./tests-to-run + if [ -s ./tests-to-run ]; then + for test_file in $(cat ./tests-to-run) + do + run-test --tap=${CIRCLE_PROJECT_REPONAME} $test_file + done + fi - slack/notify-on-failure: only_for_branches: master + - store_artifacts: + path: /tmp/tap-facebook workflows: version: 2 commit: jobs: + - ensure_env: + context: + - circleci-user + - tier-1-tap-user + - run_pylint: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - run_unit_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - run_integration_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env - build: - context: circleci-user + context: + - circleci-user + - tier-1-tap-user + requires: + - run_pylint + - run_unit_tests + - run_integration_tests + build_daily: + jobs: + - ensure_env: + context: + - circleci-user + - tier-1-tap-user + - run_pylint: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - run_unit_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - run_integration_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env + - build: + context: + - circleci-user + - tier-1-tap-user + requires: + - run_pylint + - run_unit_tests + - run_integration_tests triggers: - schedule: - cron: "0 6 * * *" + cron: "0 1 * * *" filters: branches: only: - master - jobs: - - build: - context: circleci-user diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 6e46b008..ef49bc0e 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -9,3 +9,7 @@ # Rollback steps - revert this branch + +#### AI generated code +https://internal.qlik.dev/general/ways-of-working/code-reviews/#guidelines-for-ai-generated-code +- [ ] this PR has been written with the help of GitHub Copilot or another generative AI tool diff --git a/CHANGELOG.md b/CHANGELOG.md index 71467e2a..d3492282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,20 +1,69 @@ # Changelog -# 1.14.0 +## 1.21.0 + * Bump facebook_business SDK to v21.0.5 [#242](https://github.com/singer-io/tap-facebook/pull/242) + +## 1.20.2 + * Bump facebook_business SDK to v19.0.2 [#238](https://github.com/singer-io/tap-facebook/pull/239) + +## 1.20.1 + * Bump facebook_business SDK to v19.0.0 [#238](https://github.com/singer-io/tap-facebook/pull/238) + +## 1.20.0 + * Run on python 3.11.7 [#237](https://github.com/singer-io/tap-facebook/pull/237) + +## 1.19.1 + * Add retry logic for status code - 503 [#226](https://github.com/singer-io/tap-facebook/pull/226) + +## 1.19.0 + * Add conversions to insights streams [#204](https://github.com/singer-io/tap-facebook/pull/204) + +## 1.18.6 + * Bump facebook_business SDK to v17.0.2 for token param bug fix [#219](https://github.com/singer-io/tap-facebook/pull/219) + +## 1.18.5 + * Bump facebook_business SDK to v16.0.2 [#213](https://github.com/singer-io/tap-facebook/pull/213) + +## 1.18.4 + * Facebook business API to v14.0 [#201](https://github.com/singer-io/tap-facebook/pull/201) + +## 1.18.3 + * Facebook business API to V13.0 [#191] (https://github.com/singer-io/tap-facebook/pull/191) +## 1.18.2 + * Implemented Request Timeout [#173](https://github.com/singer-io/tap-facebook/pull/173) + +## 1.18.1 + * Forced Replication Method implemented for couple of streams and replication keys [167](https://github.com/singer-io/tap-facebook/pull/167) + * Added Tap-tester test cases [168](https://github.com/singer-io/tap-facebook/pull/168) + * Updated schema file of ads_insights_age_and_gender and ads_insights_hourly_advertiser and added "format": "date-time" [#172](https://github.com/singer-io/tap-facebook/pull/172) + +## 1.17.0 + * Added retry to AdsInsights job polling to resolve race-condition errors [#174](https://github.com/singer-io/tap-facebook/pull/174) + +## 1.16.0 + * Bump tap dependency, `facebook_business`, from `10.0.0` to `12.0.0` [#164](https://github.com/singer-io/tap-facebook/pull/164) + +## 1.15.1 + * Bump tap dependency, `attrs`, from `16.3.0` to `17.3.0` [#161](https://github.com/singer-io/tap-facebook/pull/161) + +## 1.15.0 + * Add `country` to `ad_insights_country`'s composite primary key [#154](https://github.com/singer-io/tap-facebook/pull/154) + +## 1.14.0 * Add an Ads Insight Stream, broken down by `hourly_stats_aggregated_by_advertiser_time_zone` [#151](https://github.com/singer-io/tap-facebook/pull/151) -# 1.13.0 +## 1.13.0 * Bump API version from `v9` to `v10` [#146](https://github.com/singer-io/tap-facebook/pull/146) * Add feature for AdsInsights stream: The tap will shift the start date to 37 months ago in order to fetch data from this API * More info [here](https://www.facebook.com/business/help/1695754927158071?id=354406972049255) -# 1.12.1 +## 1.12.1 * Increased insights job timeout to 300 seconds [#148](https://github.com/singer-io/tap-facebook/pull/148) -# 1.12.0 +## 1.12.0 * Added leads stream [#143](https://github.com/singer-io/tap-facebook/pull/143) -# 1.11.2 +## 1.11.2 * Added unique_outbound_clicks to several streams [#138](https://github.com/singer-io/tap-facebook/pull/138) ## 1.11.1 diff --git a/README.md b/README.md index 04f73537..ac3f7e01 100644 --- a/README.md +++ b/README.md @@ -3,15 +3,18 @@ This is a [Singer](https://singer.io) tap that produces JSON-formatted data foll This tap: - Pulls raw data from the [Facebook Marketing API](https://developers.facebook.com/docs/marketing-apis) -- Extracts the following resources from Facebook for a one Ad account: +- Extracts the following resources from Facebook for one Ad account: - Ad Creatives - Ads - Ad Sets - Campaigns + - Leads - Ads Insights - Breakdown by age and gender - Breakdown by country - Breakdown by placement and device + - Breakdown by region + - Breakdown by the hour for advertisers - Outputs the schema for each resource - Incrementally pulls data based on the input state diff --git a/setup.py b/setup.py index 853fe737..2c2be115 100755 --- a/setup.py +++ b/setup.py @@ -3,19 +3,19 @@ from setuptools import setup setup(name='tap-facebook', - version='1.14.0', + version='1.21.0', description='Singer.io tap for extracting data from the Facebook Ads API', author='Stitch', url='https://singer.io', classifiers=['Programming Language :: Python :: 3 :: Only'], py_modules=['tap_facebook'], install_requires=[ - 'attrs==16.3.0', - 'backoff==1.8.0', + 'attrs==17.3.0', + 'backoff==2.2.1', + 'facebook_business==21.0.5', 'pendulum==1.2.0', - 'facebook_business==10.0.0', 'requests==2.20.0', - 'singer-python==5.10.0', + 'singer-python==6.0.0', ], extras_require={ 'dev': [ diff --git a/tap_facebook/__init__.py b/tap_facebook/__init__.py index 3f42f805..b24d74de 100755 --- a/tap_facebook/__init__.py +++ b/tap_facebook/__init__.py @@ -14,6 +14,9 @@ import requests import backoff +import sys + +import re import singer import singer.metrics as metrics from singer import utils, metadata @@ -37,6 +40,8 @@ from facebook_business.exceptions import FacebookError, FacebookRequestError, FacebookBadObjectError +from requests.exceptions import ConnectionError, Timeout + API = None INSIGHTS_MAX_WAIT_TO_START_SECONDS = 5 * 60 @@ -45,6 +50,8 @@ RESULT_RETURN_LIMIT = 100 +REQUEST_TIMEOUT = 300 + STREAMS = [ 'adcreative', 'ads', @@ -83,6 +90,55 @@ CONFIG = {} +def retry_on_summary_param_error(backoff_type, exception, **wait_gen_kwargs): + """ + At times, the Facebook Graph API exhibits erratic behavior, + triggering errors related to the Summary parameter with a status code of 400. + However, upon retrying, the API functions as expected. + """ + def log_retry_attempt(details): + _, exception, _ = sys.exc_info() + LOGGER.info('Caught Summary param error after %s tries. Waiting %s more seconds then retrying...', + details["tries"], + details["wait"]) + + def should_retry_api_error(exception): + + # Define the regular expression pattern + pattern = r'\(#100\) Cannot include [\w, ]+ in summary param because they weren\'t there while creating the report run(?:\. All available values are: )?' + if isinstance(exception, FacebookRequestError): + return (exception.http_status()==400 and re.match(pattern, exception._error['message'])) + return False + + return backoff.on_exception( + backoff_type, + exception, + jitter=None, + on_backoff=log_retry_attempt, + giveup=lambda exc: not should_retry_api_error(exc), + **wait_gen_kwargs + ) + +original_call = FacebookAdsApi.call + +@retry_on_summary_param_error(backoff.expo, (FacebookRequestError), max_tries=5, factor=5) +def call_with_retry(self, method, path, params=None, headers=None, files=None, url_override=None, api_version=None,): + """ + Adding the retry decorator on the original function call + """ + return original_call( + self, + method, + path, + params, + headers, + files, + url_override, + api_version,) + +FacebookAdsApi.call = call_with_retry + + class TapFacebookException(Exception): pass @@ -126,7 +182,8 @@ def raise_from(singer_error, fb_error): error_message = '{}: {} Message: {}'.format( http_method, fb_error.http_status(), - fb_error.body().get('error', {}).get('message') + fb_error.body().get('error', {}).get('message') + if isinstance(fb_error.body(), dict) else str(fb_error.body()) ) else: # All other facebook errors are `FacebookError`s and we handle @@ -145,10 +202,15 @@ def log_retry_attempt(details): if isinstance(exception, TypeError) and str(exception) == "string indices must be integers": LOGGER.info('TypeError due to bad JSON response') def should_retry_api_error(exception): - if isinstance(exception, FacebookBadObjectError): + if isinstance(exception, FacebookBadObjectError) or isinstance(exception, Timeout) or isinstance(exception, ConnectionError) or isinstance(exception, AttributeError): return True elif isinstance(exception, FacebookRequestError): - return exception.api_transient_error() or exception.api_error_subcode() == 99 or exception.http_status() == 500 + return (exception.api_transient_error() + or exception.api_error_subcode() == 99 + or exception.http_status() in (500, 503) + # This subcode corresponds to a race condition between AdsInsights job creation and polling + or exception.api_error_subcode() == 33 + ) elif isinstance(exception, InsightsJobTimeout): return True elif isinstance(exception, TypeError) and str(exception) == "string indices must be integers": @@ -170,6 +232,7 @@ class Stream(object): account = attr.ib() stream_alias = attr.ib() catalog_entry = attr.ib() + replication_method = 'FULL_TABLE' def automatic_fields(self): fields = set() @@ -200,6 +263,7 @@ def fields(self): class IncrementalStream(Stream): state = attr.ib() + replication_method = 'INCREMENTAL' def __attrs_post_init__(self): self.current_bookmark = get_start(self, UPDATED_TIME_KEY) @@ -241,6 +305,8 @@ class AdCreative(Stream): doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup/adcreatives/ ''' + # Added retry_pattern to handle AttributeError raised from api_batch.execute() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) @@ -269,7 +335,9 @@ def sync_batches(self, stream_objects): key_properties = ['id'] - @retry_pattern(backoff.expo, (FacebookRequestError, TypeError), max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from account.get_ad_creatives() below + @retry_pattern(backoff.expo, (FacebookRequestError, TypeError, AttributeError), max_tries=5, factor=5) def get_adcreatives(self): return self.account.get_ad_creatives(params={'limit': RESULT_RETURN_LIMIT}) @@ -285,7 +353,9 @@ class Ads(IncrementalStream): key_properties = ['id', 'updated_time'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from account.get_ads() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ads(self, params): """ This is necessary because the functions that call this endpoint return @@ -310,7 +380,9 @@ def do_request_multiple(): filt_ads = self._call_get_ads(params) yield filt_ads - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from ad.api_get() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad): return ad.api_get(fields=self.fields()).export_all_data() @@ -329,7 +401,9 @@ class AdSets(IncrementalStream): key_properties = ['id', 'updated_time'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from account.get_ad_sets() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_ad_sets(self, params): """ This is necessary because the functions that call this endpoint return @@ -354,7 +428,9 @@ def do_request_multiple(): filt_adsets = self._call_get_ad_sets(params) yield filt_adsets - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from ad_set.api_get() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(ad_set): return ad_set.api_get(fields=self.fields()).export_all_data() @@ -370,7 +446,9 @@ class Campaigns(IncrementalStream): key_properties = ['id'] - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from account.get_campaigns() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def _call_get_campaigns(self, params): """ This is necessary because the functions that call this endpoint return @@ -400,7 +478,9 @@ def do_request_multiple(): filt_campaigns = self._call_get_campaigns(params) yield filt_campaigns - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from request call below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def prepare_record(campaign): """If campaign.ads is selected, make the request and insert the data here""" campaign_out = campaign.api_get(fields=fields).export_all_data() @@ -425,6 +505,7 @@ class Leads(Stream): replication_key = "created_time" key_properties = ['id'] + replication_method = 'INCREMENTAL' def compare_lead_created_times(self, leadA, leadB): if leadA is None: @@ -436,6 +517,8 @@ def compare_lead_created_times(self, leadA, leadB): else: return leadA + # Added retry_pattern to handle AttributeError raised from api_batch.execute() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def sync_batches(self, stream_objects): refs = load_shared_schema_refs() schema = singer.resolve_schema_references(self.catalog_entry.schema.to_dict(), refs) @@ -469,12 +552,16 @@ def sync_batches(self, stream_objects): api_batch.execute() return str(pendulum.parse(latest_lead[self.replication_key])) - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from account.get_ads() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_ads(self): params = {'limit': RESULT_RETURN_LIMIT} yield from self.account.get_ads(params=params) - @retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from ad.get_leads() below + @retry_pattern(backoff.expo, (FacebookRequestError, AttributeError), max_tries=5, factor=5) def get_leads(self, ads, start_time, previous_start_time): start_time = int(start_time.timestamp()) # Get unix timestamp params = {'limit': RESULT_RETURN_LIMIT, @@ -554,6 +641,7 @@ def advance_bookmark(stream, bookmark_key, date): @attr.s class AdsInsights(Stream): base_properties = ['campaign_id', 'adset_id', 'ad_id', 'date_start'] + replication_method = 'INCREMENTAL' state = attr.ib() options = attr.ib() @@ -579,14 +667,17 @@ def __attrs_post_init__(self): if self.options.get('primary-keys'): self.key_properties.extend(self.options['primary-keys']) + self.buffer_days = 28 + if CONFIG.get('insights_buffer_days'): + self.buffer_days = int(CONFIG.get('insights_buffer_days')) + # attribution window should only be 1, 7 or 28 + if self.buffer_days not in [1, 7, 28]: + raise Exception("The attribution window must be 1, 7 or 28.") + def job_params(self): start_date = get_start(self, self.bookmark_key) - buffer_days = 28 - if CONFIG.get('insights_buffer_days'): - buffer_days = int(CONFIG.get('insights_buffer_days')) - - buffered_start_date = start_date.subtract(days=buffer_days) + buffered_start_date = start_date.subtract(days=self.buffer_days) min_start_date = pendulum.today().subtract(months=self.FACEBOOK_INSIGHTS_RETENTION_PERIOD) if buffered_start_date < min_start_date: LOGGER.warning("%s: Start date is earlier than %s months ago, using %s instead. " @@ -615,7 +706,15 @@ def job_params(self): } buffered_start_date = buffered_start_date.add(days=1) - @retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError), max_tries=5, factor=5) + @staticmethod + @retry_pattern(backoff.constant, FacebookRequestError, max_tries=5, interval=1) + def __api_get_with_retry(job): + job = job.api_get() + return job + + @retry_pattern(backoff.expo, (Timeout, ConnectionError), max_tries=5, factor=2) + # Added retry_pattern to handle AttributeError raised from requests call below + @retry_pattern(backoff.expo, (FacebookRequestError, InsightsJobTimeout, FacebookBadObjectError, TypeError, AttributeError), max_tries=5, factor=5) def run_job(self, params): LOGGER.info('Starting adsinsights job with params %s', params) job = self.account.get_insights( # pylint: disable=no-member @@ -626,7 +725,7 @@ def run_job(self, params): sleep_time = 10 while status != "Job Completed": duration = time.time() - time_start - job = job.api_get() + job = AdsInsights.__api_get_with_retry(job) status = job['async_status'] percent_complete = job['async_percent_completion'] @@ -685,7 +784,8 @@ def __iter__(self): 'ads_insights': {"breakdowns": []}, 'ads_insights_age_and_gender': {"breakdowns": ['age', 'gender'], "primary-keys": ['age', 'gender']}, - 'ads_insights_country': {"breakdowns": ['country']}, + 'ads_insights_country': {"breakdowns": ['country'], + "primary-keys": ['country']}, 'ads_insights_platform_and_device': {"breakdowns": ['publisher_platform', 'platform_position', 'impression_device'], "primary-keys": ['publisher_platform', @@ -793,10 +893,13 @@ def discover_schemas(): LOGGER.info('Loading schema for %s', stream.name) schema = singer.resolve_schema_references(load_schema(stream), refs) + bookmark_key = BOOKMARK_KEYS.get(stream.name) + mdata = metadata.to_map(metadata.get_standard_metadata(schema, - key_properties=stream.key_properties)) + key_properties=stream.key_properties, + replication_method=stream.replication_method, + valid_replication_keys=[bookmark_key] if bookmark_key else None)) - bookmark_key = BOOKMARK_KEYS.get(stream.name) if bookmark_key == UPDATED_TIME_KEY or bookmark_key == CREATED_TIME_KEY : mdata = metadata.write(mdata, ('properties', bookmark_key), 'inclusion', 'automatic') @@ -835,8 +938,15 @@ def main_impl(): global RESULT_RETURN_LIMIT RESULT_RETURN_LIMIT = CONFIG.get('result_return_limit', RESULT_RETURN_LIMIT) + # Set request timeout with config param `request_timeout`. + config_request_timeout = CONFIG.get('request_timeout') + if config_request_timeout and float(config_request_timeout): + request_timeout = float(config_request_timeout) + else: + request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then set default to 300 seconds. + global API - API = FacebookAdsApi.init(access_token=access_token) + API = FacebookAdsApi.init(access_token=access_token, timeout=request_timeout) user = fb_user.User(fbid='me') accounts = user.get_ad_accounts() diff --git a/tap_facebook/insights_experiment.py b/tap_facebook/insights_experiment.py index 7ce911c4..66f291a6 100644 --- a/tap_facebook/insights_experiment.py +++ b/tap_facebook/insights_experiment.py @@ -33,6 +33,8 @@ "canvas_avg_view_time", "clicks", "conversion_rate_ranking", + "conversion_values", + "conversions", "cost_per_action_type", "cost_per_inline_link_click", "cost_per_inline_post_engagement", @@ -94,6 +96,8 @@ "canvas_avg_view_time", "clicks", "conversion_rate_ranking", + "conversion_values", + "conversions", "cost_per_inline_link_click", "cost_per_inline_post_engagement", "cost_per_total_action", @@ -190,6 +194,8 @@ "campaign_name", "clicks", "conversion_rate_ranking", + "conversion_values", + "conversions", "date_start", "date_stop", "deeplink_clicks", diff --git a/tap_facebook/schemas/ads_insights.json b/tap_facebook/schemas/ads_insights.json index b95b40b3..8dc4a2b7 100644 --- a/tap_facebook/schemas/ads_insights.json +++ b/tap_facebook/schemas/ads_insights.json @@ -7,6 +7,8 @@ "unique_actions": {"$ref": "ads_action_stats.json"}, "actions": {"$ref": "ads_action_stats.json"}, "action_values": {"$ref": "ads_action_stats.json"}, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "outbound_clicks": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": {"$ref": "ads_action_stats.json"}, @@ -262,12 +264,6 @@ "number" ] }, - "reach": { - "type": [ - "null", - "integer" - ] - }, "canvas_avg_view_percent": { "type": [ "null", diff --git a/tap_facebook/schemas/ads_insights_age_and_gender.json b/tap_facebook/schemas/ads_insights_age_and_gender.json index f17f3dca..f8f0bda4 100644 --- a/tap_facebook/schemas/ads_insights_age_and_gender.json +++ b/tap_facebook/schemas/ads_insights_age_and_gender.json @@ -8,6 +8,8 @@ "actions": {"$ref": "ads_action_stats.json"}, "action_values": {"$ref": "ads_action_stats.json"}, "outbound_clicks": {"$ref": "ads_action_stats.json"}, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": {"$ref": "ads_action_stats.json"}, "video_p25_watched_actions": {"$ref": "ads_action_stats.json"}, @@ -25,7 +27,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "ad_id": { "type": [ @@ -283,7 +286,8 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "objective": { "type": [ diff --git a/tap_facebook/schemas/ads_insights_country.json b/tap_facebook/schemas/ads_insights_country.json index c79a9086..d4c8bc69 100644 --- a/tap_facebook/schemas/ads_insights_country.json +++ b/tap_facebook/schemas/ads_insights_country.json @@ -8,6 +8,8 @@ "actions": {"$ref": "ads_action_stats.json"}, "action_values": {"$ref": "ads_action_stats.json"}, "outbound_clicks": {"$ref": "ads_action_stats.json"}, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": {"$ref": "ads_action_stats.json"}, "video_p25_watched_actions": {"$ref": "ads_action_stats.json"}, diff --git a/tap_facebook/schemas/ads_insights_dma.json b/tap_facebook/schemas/ads_insights_dma.json index 60cbd4d3..286c12d2 100644 --- a/tap_facebook/schemas/ads_insights_dma.json +++ b/tap_facebook/schemas/ads_insights_dma.json @@ -5,6 +5,8 @@ "actions": { "$ref": "ads_action_stats.json" }, "action_values": { "$ref": "ads_action_stats.json" }, "outbound_clicks": { "$ref": "ads_action_stats.json" }, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": { "$ref": "ads_action_stats.json" }, "video_p25_watched_actions": { "$ref": "ads_action_stats.json" }, diff --git a/tap_facebook/schemas/ads_insights_hourly_advertiser.json b/tap_facebook/schemas/ads_insights_hourly_advertiser.json index 70f8c8e1..27a3e9f5 100644 --- a/tap_facebook/schemas/ads_insights_hourly_advertiser.json +++ b/tap_facebook/schemas/ads_insights_hourly_advertiser.json @@ -165,13 +165,15 @@ "type": [ "null", "string" - ] + ], + "format": "date-time" }, "date_stop": { "type": [ "null", "string" - ] + ], + "format": "date-time" }, "engagement_rate_ranking": { "type": [ diff --git a/tap_facebook/schemas/ads_insights_platform_and_device.json b/tap_facebook/schemas/ads_insights_platform_and_device.json index 0e13b572..90e14efd 100644 --- a/tap_facebook/schemas/ads_insights_platform_and_device.json +++ b/tap_facebook/schemas/ads_insights_platform_and_device.json @@ -8,6 +8,8 @@ "actions": {"$ref": "ads_action_stats.json"}, "action_values": {"$ref": "ads_action_stats.json"}, "outbound_clicks": {"$ref": "ads_action_stats.json"}, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": {"$ref": "ads_action_stats.json"}, "video_p25_watched_actions": {"$ref": "ads_action_stats.json"}, diff --git a/tap_facebook/schemas/ads_insights_region.json b/tap_facebook/schemas/ads_insights_region.json index 63ded38e..bbe5187b 100644 --- a/tap_facebook/schemas/ads_insights_region.json +++ b/tap_facebook/schemas/ads_insights_region.json @@ -8,6 +8,8 @@ "actions": {"$ref": "ads_action_stats.json"}, "action_values": {"$ref": "ads_action_stats.json"}, "outbound_clicks": {"$ref": "ads_action_stats.json"}, + "conversion_values": {"$ref": "ads_action_stats.json"}, + "conversions": {"$ref": "ads_action_stats.json"}, "unique_outbound_clicks": {"$ref": "ads_action_stats.json"}, "video_30_sec_watched_actions": {"$ref": "ads_action_stats.json"}, "video_p25_watched_actions": {"$ref": "ads_action_stats.json"}, diff --git a/tests/base.py b/tests/base.py index 098b51b2..48bc91a5 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2,22 +2,49 @@ Setup expectations for test sub classes Run discovery for as a prerequisite for most tests """ -import unittest import os from datetime import timedelta from datetime import datetime as dt -import singer -from tap_tester import connections, menagerie, runner +from tap_tester import connections, menagerie, runner, LOGGER +from tap_tester.base_case import BaseCase +from tap_tester.jira_client import JiraClient as jira_client +from tap_tester.jira_client import CONFIGURATION_ENVIRONMENT as jira_config +JIRA_CLIENT = jira_client({ **jira_config }) -class FacebookBaseTest(unittest.TestCase): + +class FacebookBaseTest(BaseCase): """ Setup expectations for test sub classes. Metadata describing streams. A bunch of shared methods that are used in tap-tester tests. Shared tap-specific methods (as needed). + + Insights Test Data by Date Ranges + "ads_insights": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_age_and_gender": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_country": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_platform_and_device": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_region": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_dma": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_hourly_advertiser": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + """ AUTOMATIC_FIELDS = "automatic" REPLICATION_KEYS = "valid-replication-keys" @@ -29,9 +56,9 @@ class FacebookBaseTest(unittest.TestCase): FULL_TABLE = "FULL_TABLE" START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" - LOGGER = singer.get_logger() start_date = "" + end_date = "" @staticmethod def tap_name(): @@ -47,14 +74,17 @@ def get_properties(self, original: bool = True): """Configuration properties required for the tap.""" return_value = { 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), - 'start_date' : '2015-03-15T00:00:00Z', - 'end_date': '2015-03-16T00:00:00+00:00', - 'insights_buffer_days': '1' + 'start_date' : '2021-04-07T00:00:00Z', + 'end_date': '2021-04-09T00:00:00Z', + 'insights_buffer_days': '1', } if original: return return_value return_value["start_date"] = self.start_date + if self.end_date: + return_value["end_date"] = self.end_date + return return_value @staticmethod @@ -96,8 +126,8 @@ def expected_metadata(self): self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"date_start"} }, - "ads_insights_country": { # TODO | add country | https://stitchdata.atlassian.net/browse/SRCE-2555 - self.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"}, # , "country"}, + "ads_insights_country": { + self.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", "country"}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {"date_start"} }, @@ -216,7 +246,7 @@ def run_and_verify_check_mode(self, conn_id): found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs)) self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match") - print("discovered schemas are OK") + LOGGER.info("discovered schemas are OK") return found_catalogs @@ -240,7 +270,7 @@ def run_and_verify_sync(self, conn_id): sum(sync_record_count.values()), 0, msg="failed to replicate any data: {}".format(sync_record_count) ) - print("total replicated row count: {}".format(sum(sync_record_count.values()))) + LOGGER.info("total replicated row count: %s", sum(sync_record_count.values())) return sync_record_count @@ -270,7 +300,7 @@ def perform_and_verify_table_and_field_selection(self, # Verify all testable streams are selected selected = catalog_entry.get('annotated-schema').get('selected') - print("Validating selection on {}: {}".format(cat['stream_name'], selected)) + LOGGER.info("Validating selection on %s: %s", cat['stream_name'], selected) if cat['stream_name'] not in expected_selected: self.assertFalse(selected, msg="Stream selected, but not testable.") continue # Skip remaining assertions if we aren't selecting this stream @@ -280,8 +310,8 @@ def perform_and_verify_table_and_field_selection(self, # Verify all fields within each selected stream are selected for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): field_selected = field_props.get('selected') - print("\tValidating selection on {}.{}: {}".format( - cat['stream_name'], field, field_selected)) + LOGGER.info("\tValidating selection on %s.%s: %s", + cat['stream_name'], field, field_selected) self.assertTrue(field_selected, msg="Field not selected.") else: # Verify only automatic fields are selected @@ -339,22 +369,15 @@ def parse_date(date_value): raise NotImplementedError("Tests do not account for dates of this format: {}".format(date_value)) - def timedelta_formatted(self, dtime, days=0): + def timedelta_formatted(self, dtime, days=0, date_format=''): try: - date_stripped = dt.strptime(dtime, self.START_DATE_FORMAT) + date_stripped = dt.strptime(dtime, date_format) return_date = date_stripped + timedelta(days=days) - return dt.strftime(return_date, self.START_DATE_FORMAT) + return dt.strftime(return_date, date_format) except ValueError: - try: - date_stripped = dt.strptime(dtime, self.BOOKMARK_COMPARISON_FORMAT) - return_date = date_stripped + timedelta(days=days) - - return dt.strftime(return_date, self.BOOKMARK_COMPARISON_FORMAT) - - except ValueError: - return Exception("Datetime object is not of the format: {}".format(self.START_DATE_FORMAT)) + raise AssertionError("Datetime object is not of the format: {}".format(date_format)) ########################################################################## ### Tap Specific Methods @@ -363,3 +386,15 @@ def timedelta_formatted(self, dtime, days=0): @staticmethod def is_insight(stream): return stream.startswith('ads_insights') + + def setUp(self): + LOGGER.info("-------------------------------------------- STARTING TEST ---------------------------------------------------") + LOGGER.info("test: %s", self.name()) + LOGGER.info("streams covered: %s", self.streams_to_test()) + LOGGER.info("--------------------------------------------------------------------------------------------------------------") + + def tearDown(self): + LOGGER.info("--------------------------------------------- ENDING TEST ----------------------------------------------------") + LOGGER.info("test: %s", self.name()) + LOGGER.info("streams covered: %s", self.streams_to_test()) + LOGGER.info("--------------------------------------------------------------------------------------------------------------") diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py new file mode 100644 index 00000000..cb3a3259 --- /dev/null +++ b/tests/base_new_frmwrk.py @@ -0,0 +1,178 @@ +import os +from datetime import datetime as dt +from datetime import timezone as tz +from tap_tester import connections, menagerie, runner, LOGGER +from tap_tester.base_suite_tests.base_case import BaseCase + + +class FacebookBaseTest(BaseCase): + """ + Setup expectations for test sub classes. + Metadata describing streams. + + A bunch of shared methods that are used in tap-tester tests. + Shared tap-specific methods (as needed). + + Insights Test Data by Date Ranges + "ads_insights": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_age_and_gender": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_country": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_platform_and_device": + "2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_region": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_dma": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + "ads_insights_hourly_advertiser": + "2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z" + "2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z" + + """ + FULL_TABLE = "FULL_TABLE" + BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" + + start_date = "2021-04-07T00:00:00Z" + end_date = "" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-facebook" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.facebook" + + def get_properties(self): + """Configuration properties required for the tap.""" + return { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : self.start_date, + 'end_date': '2021-04-09T00:00:00Z', + 'insights_buffer_days': '1', + } + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return {'access_token': os.getenv('TAP_FACEBOOK_ACCESS_TOKEN')} + @staticmethod + def expected_metadata(): + """The expected streams and metadata about the streams""" + return { + "ads": { + BaseCase.PRIMARY_KEYS: {"id", "updated_time"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 + }, + "adcreative": { + BaseCase.PRIMARY_KEYS: {"id"}, + BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE, + BaseCase.API_LIMIT: 100 + }, + "adsets": { + BaseCase.PRIMARY_KEYS: {"id", "updated_time"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 + }, + "campaigns": { + BaseCase.PRIMARY_KEYS: {"id", }, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 + }, + "ads_insights": { + BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"}, + BaseCase.API_LIMIT: 100 + }, + "ads_insights_age_and_gender": { + BaseCase.PRIMARY_KEYS: { + "campaign_id", "adset_id", "ad_id", "date_start", "age", "gender" + }, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + "ads_insights_country": { + BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", + "country"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + "ads_insights_platform_and_device": { + BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", + "publisher_platform", "platform_position", + "impression_device"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + "ads_insights_region": { + BaseCase.PRIMARY_KEYS: {"region", "campaign_id", "adset_id", "ad_id", "date_start"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + "ads_insights_dma": { + BaseCase.PRIMARY_KEYS: {"dma", "campaign_id", "adset_id", "ad_id", "date_start"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + "ads_insights_hourly_advertiser": { + BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", + "campaign_id", "adset_id", "ad_id", "date_start"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"date_start"} + }, + # "leads": { + # BaseCase.PRIMARY_KEYS: {"id"}, + # BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + # BaseCase.REPLICATION_KEYS: {"created_time"} + # }, + } + + def set_replication_methods(self, conn_id, catalogs, replication_methods): + + replication_keys = self.expected_replication_keys() + for catalog in catalogs: + replication_method = replication_methods.get(catalog['stream_name']) + annt=menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + if replication_method == self.INCREMENTAL: + replication_key = list(replication_keys.get(catalog['stream_name']))[0] + replication_md = [{ "breadcrumb": [], "metadata":{ "selected" : True}}] + else: + replication_md = [{ "breadcrumb": [], "metadata": { "selected": None}}] + connections.set_non_discoverable_metadata(conn_id, + catalog, + menagerie.get_annotated_schema( + conn_id, + catalog['stream_id']), + replication_md) + + @classmethod + def setUpClass(cls,logging="Ensuring environment variables are sourced."): + super().setUpClass(logging=logging) + missing_envs = [x for x in [os.getenv('TAP_FACEBOOK_ACCESS_TOKEN'), + os.getenv('TAP_FACEBOOK_ACCOUNT_ID')] if x is None] + if len(missing_envs) != 0: + raise Exception("set environment variables") + + + ########################################################################## + ### Tap Specific Methods + ########################################################################## + + @staticmethod + def is_insight(stream): + return stream.startswith('ads_insights') diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 00000000..5e5f7146 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,161 @@ +import os +import random +import requests +import string + +from tap_tester.logger import LOGGER + + +class TestClient(): + def __init__(self): + self.base_url = 'https://graph.facebook.com' + self.api_version = 'v19.0' + self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID') + self.access_token = os.getenv('TAP_FACEBOOK_ACCESS_TOKEN') + self.account_url = f"{self.base_url}/{self.api_version}/act_{self.account_id}" + + self.stream_endpoint_map = {'ads': '/ads', + 'adsets': '/adsets', + 'adcreative': '/adcreatives', + 'ads_insights': '/insights', # GET only endpoint + 'campaigns': '/campaigns', + 'users': '/users',} + + self.campaign_special_ad_categories = ['NONE', + 'EMPLOYMENT', + 'HOUSING', + 'CREDIT', + # 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized + 'ONLINE_GAMBLING_AND_GAMING'] + + self.adset_billing_events = ['APP_INSTALLS', + 'CLICKS', + 'IMPRESSIONS', + 'LINK_CLICKS', + 'NONE', + 'OFFER_CLAIMS', + 'PAGE_LIKES', + 'POST_ENGAGEMENT', + 'THRUPLAY', + 'PURCHASE', + 'LISTING_INTERACTION'] + + self.adset_optimization_goals = ['NONE', + 'APP_INSTALLS', + 'AD_RECALL_LIFT', + 'ENGAGED_USERS', + 'EVENT_RESPONSES', + 'IMPRESSIONS', + 'LEAD_GENERATION', + 'QUALITY_LEAD', + 'LINK_CLICKS', + 'OFFSITE_CONVERSIONS', + 'PAGE_LIKES', + 'POST_ENGAGEMENT', + 'QUALITY_CALL', + 'REACH', + 'LANDING_PAGE_VIEWS', + 'VISIT_INSTAGRAM_PROFILE', + 'VALUE', + 'THRUPLAY', + 'DERIVED_EVENTS', + 'APP_INSTALLS_AND_OFFSITE_CONVERSIONS', + 'CONVERSATIONS', + 'IN_APP_VALUE', + 'MESSAGING_PURCHASE_CONVERSION', + 'MESSAGING_APPOINTMENT_CONVERSION', + 'SUBSCRIBERS', + 'REMINDERS_SET'] + + # valid and verified objectives listed below, other objectives should be re-mapped to these + self.campaign_objectives = ['OUTCOME_APP_PROMOTION', + 'OUTCOME_AWARENESS', + 'OUTCOME_ENGAGEMENT', + 'OUTCOME_LEADS', + 'OUTCOME_SALES', + 'OUTCOME_TRAFFIC'] + + def get_account_objects(self, stream, limit, time_range): + # time_range defines query start and end dates and should match tap config + assert stream in self.stream_endpoint_map.keys(), \ + f'Endpoint undefined for specified stream: {stream}' + endpoint = self.stream_endpoint_map[stream] + url = self.account_url + endpoint + params = {'access_token': self.access_token, + 'limit': limit, + 'time_range': str({'since': time_range['since'], 'until': time_range['until']})} + LOGGER.info(f"Getting url: {url}") + response = requests.get(url, params) + response.raise_for_status() + LOGGER.info(f"Returning get response: {response}") + return response.json() + + def create_account_objects(self, stream): + assert stream in self.stream_endpoint_map.keys(), \ + f'Endpoint undefined for specified stream: {stream}' + endpoint = self.stream_endpoint_map[stream] + url = self.account_url + endpoint + LOGGER.info(f"Posting to url: {url}") + params = self.generate_post_params(stream) + response = requests.post(url, params) + response.raise_for_status() + LOGGER.info(f"Returning post response: {response}") + return response + + def generate_post_params(self, stream): + if stream == 'adcreative': + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=18)), + 'object_story_spec': str({'page_id': '453760455405317', + 'link_data': {'link': 'http://fb.me'}})} + return params + + elif stream == 'ads': + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=17)), + # adset is bound to parent campaign_objective, can cause errors posting new ads + # as certain objectives have different requirements. 50 ads per adset max + # adset below can be found under campaign: 120203395323750059 + 'adset_id': 120203403135680059, + 'creative': str({'creative_id': 23843561378450058}), + 'status': "PAUSED"} + return params + + elif stream == 'adsets': + # TODO In order to randomize optimization_goal and billing_event the campaign_id + # would need to be examined to determine which goals were supported. Then an option + # could be selected from the available billing events supported by that goal. + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=16)), + 'optimization_goal': 'REACH', + 'billing_event': 'IMPRESSIONS', + 'bid_amount': 2, # TODO random? + 'daily_budget': 1000, # TODO random? tie to parent campaign? + 'campaign_id': 120203241386960059, + 'targeting': str({'geo_locations': {'countries': ["US"]}, + 'facebook_positions': ["feed"]}), + 'status': "PAUSED", + 'promoted_object': str({'page_id': '453760455405317'})} + return params + + elif stream == 'campaigns': + params = { # generate a campaign with random name, ojbective, and ad category + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=15)), + 'objective': random.choice(self.campaign_objectives), + 'special_ad_categories': random.choice(self.campaign_special_ad_categories)} + return params + + else: + assert False, f"Post params for stream {stream} not implemented / supported" + + + # Ad Insights TODO + # endpoint is "GET" only. We cannot post fake insights data for test. As of Oct 27, 2023 + # data lists for original 3 AdSet Ids, and Ads account as a whole are empty. + # 1 - Can we run enough ads to get enough data to paginate? + # 2 - Can we interact with our own ads? + # if 1 or 2 == True then use setUp to conditionally test ads_insights if there is enough data diff --git a/tests/test_facebook_all_fields.py b/tests/test_facebook_all_fields.py new file mode 100644 index 00000000..288bda4a --- /dev/null +++ b/tests/test_facebook_all_fields.py @@ -0,0 +1,241 @@ +""" +Test that with no fields selected for a stream all fields are still replicated +""" + +from tap_tester import LOGGER +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from base_new_frmwrk import FacebookBaseTest +import base + + +class FacebookAllFieldsTest(AllFieldsTest, FacebookBaseTest): + """Test that with no fields selected for a stream all fields are still replicated""" + + is_done = None + + # https://jira.talendforge.org/browse/TDL-24424 + MISSING_FIELDS = { + "ads_insights" : { + 'outbound_clicks', + 'conversions', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'video_p75_watched_actions', + 'action_values', + 'video_30_sec_watched_actions', + 'canvas_avg_view_percent', + 'video_p50_watched_actions', + 'video_p25_watched_actions', + 'conversion_values', + 'unique_outbound_clicks', + 'canvas_avg_view_time', + 'cost_per_unique_action_type' + }, + "campaigns" : { + 'adlabels' + }, + "adsets" : { + 'adlabels', + }, + "adcreative" : { + 'image_crops', + 'product_set_id', + 'url_tags', + 'applink_treatment', + 'object_id', + 'link_og_id', + 'template_url', + 'template_url_spec', + 'object_url', + 'link_url', + 'adlabels', + 'instagram_story_id' + }, + "ads_insights_country": { + 'video_p75_watched_actions', + 'conversions', + 'conversion_values', + 'canvas_avg_view_percent', + 'action_values', + 'unique_outbound_clicks', + 'cost_per_unique_action_type', + 'outbound_clicks', + 'social_spend', + 'video_p50_watched_actions', + 'engagement_rate_ranking', + 'video_p25_watched_actions', + 'quality_ranking', + 'video_play_curve_actions', + 'video_30_sec_watched_actions', + 'canvas_avg_view_time', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'conversion_rate_ranking' + }, + "ads_insights_age_and_gender": { + 'video_p75_watched_actions', + 'conversions', + 'conversion_values', + 'canvas_avg_view_percent', + 'action_values', + 'unique_outbound_clicks', + 'cost_per_unique_action_type', + 'outbound_clicks', + 'social_spend', + 'video_p50_watched_actions', + 'engagement_rate_ranking', + 'video_p25_watched_actions', + 'quality_ranking', + 'video_play_curve_actions', + 'video_30_sec_watched_actions', + 'canvas_avg_view_time', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'conversion_rate_ranking' + }, + "ads_insights_dma": { + 'video_p75_watched_actions', + 'conversions', + 'cost_per_unique_click', + 'inline_link_click_ctr', + 'conversion_values', + 'canvas_avg_view_percent', + 'action_values', + 'unique_ctr', + 'unique_outbound_clicks', + 'unique_inline_link_clicks', + 'cost_per_unique_action_type', + 'outbound_clicks', + 'social_spend', + 'cost_per_unique_inline_link_click', + 'unique_link_clicks_ctr', + 'video_p50_watched_actions', + 'engagement_rate_ranking', + 'unique_inline_link_click_ctr', + 'video_p25_watched_actions', + 'quality_ranking', + 'cpp', + 'video_play_curve_actions', + 'canvas_avg_view_time', + 'video_30_sec_watched_actions', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'conversion_rate_ranking' + }, + "ads_insights_region": { + 'video_p75_watched_actions', + 'conversions', + 'conversion_values', + 'canvas_avg_view_percent', + 'action_values', + 'unique_outbound_clicks', + 'cost_per_unique_action_type', + 'outbound_clicks', + 'social_spend', + 'video_p50_watched_actions', + 'engagement_rate_ranking', + 'video_p25_watched_actions', + 'quality_ranking', + 'video_play_curve_actions', + 'video_30_sec_watched_actions', + 'canvas_avg_view_time', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'conversion_rate_ranking' + }, + "ads_insights_hourly_advertiser": { + 'conversions', + 'cost_per_estimated_ad_recallers', + 'cost_per_unique_click', + 'cost_per_unique_outbound_click', + 'frequency', + 'conversion_values', + 'canvas_avg_view_percent', + 'cost_per_conversion', + 'cost_per_thruplay', + 'action_values', + 'full_view_impressions', + 'place_page_name', + 'instant_experience_outbound_clicks', + 'cost_per_unique_action_type', + 'estimated_ad_recallers', + 'outbound_clicks', + 'social_spend', + 'cost_per_unique_inline_link_click', + 'instant_experience_clicks_to_start', + 'attribution_setting', + 'engagement_rate_ranking', + 'purchase_roas', + 'reach', + 'cost_per_outbound_click', + 'estimated_ad_recall_rate', + 'quality_ranking', + 'cpp', + 'catalog_segment_value', + 'canvas_avg_view_time', + 'cost_per_action_type', + 'outbound_clicks_ctr', + 'qualifying_question_qualify_answer_rate', + 'converted_product_quantity', + 'converted_product_value', + 'instant_experience_clicks_to_open', + 'conversion_rate_ranking' + }, + "ads": { + 'recommendations', + 'adlabels' + }, + "ads_insights_platform_and_device": { + 'video_p75_watched_actions', + 'conversions', + 'conversion_values', + 'canvas_avg_view_percent', + 'action_values', + 'unique_outbound_clicks', + 'cost_per_unique_action_type', + 'outbound_clicks', + 'social_spend', + 'video_p50_watched_actions', + 'engagement_rate_ranking', + 'video_p25_watched_actions', + 'quality_ranking', + 'video_play_curve_actions', + 'conversion_rate_ranking', + 'video_30_sec_watched_actions', + 'canvas_avg_view_time', + 'cost_per_action_type', + 'video_p100_watched_actions', + 'placement' + } + } + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + EXCLUDE_STREAMS = { + 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640 + 'ads_insights_platform_and_device', # TDL-26640 + 'ads_insights', # TDL-26640 + 'ads_insights_age_and_gender', # TDL-26640 + 'ads_insights_country', # TDL-26640 + 'ads_insights_dma', # TDL-26640 + 'ads_insights_region' # TDL-26640 + } + + @staticmethod + def name(): + return "tt_facebook_all_fields_test" + + def streams_to_test(self): + expected_streams = self.expected_metadata().keys() + self.assert_message = f"JIRA ticket has moved to done, \ + re-add the applicable stream to the test: {0}" + assert base.JIRA_CLIENT.get_status_category("TDL-24312") != 'done',\ + self.assert_message.format('ads_insights_hourly_advertiser') + expected_streams = self.expected_metadata().keys() - {'ads_insights_hourly_advertiser'} + LOGGER.warn(f"Skipped streams: {'ads_insights_hourly_advertiser'}") + + assert base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done',\ + self.assert_message.format(self.EXCLUDE_STREAMS) + expected_streams = self.expected_metadata().keys() - self.EXCLUDE_STREAMS + LOGGER.warn(f"Skipped streams: {self.EXCLUDE_STREAMS}") + + return expected_streams diff --git a/tests/test_facebook_archived_data.py b/tests/test_facebook_archived_data.py new file mode 100644 index 00000000..6b55fa72 --- /dev/null +++ b/tests/test_facebook_archived_data.py @@ -0,0 +1,115 @@ +import os + +from tap_tester import connections, runner, menagerie + +from base import FacebookBaseTest + +class FacebookArchivedData(FacebookBaseTest): + + @staticmethod + def name(): + return "tap_tester_facebook_archived_data" + + def streams_to_test(self): + """include_deleted is supported for below streams only""" + return ['ads', 'adsets', 'campaigns'] + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2021-10-06T00:00:00Z', + 'end_date' : '2021-10-07T00:00:00Z', + 'insights_buffer_days': '1', + 'include_deleted': 'false' + } + if original: + return return_value + + return_value["include_deleted"] = 'true' + return return_value + + def test_run(self): + ''' + Testing the archived data with 'include_deleted' parameter + ''' + expected_streams = self.streams_to_test() + + ########################################################################## + ### First Sync with include_deleted = false + ########################################################################## + + # instantiate connection with the include_deleted = false + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + ### Second Sync with include_deleted = true + ########################################################################## + + # create a new connection with the include_deleted = true + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected primary keys + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information about count of records + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + # collect list and set of primary keys for all the records + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + # collect list of effective_status for all the records + records_status_sync1 = [message.get('data').get('effective_status') + for message in synced_records_1.get(stream).get('messages') + if message.get('action') == 'upsert'] + records_status_sync2 = [message.get('data').get('effective_status') + for message in synced_records_2.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # Verifying that no ARCHIVED records are returned for sync 1 + self.assertNotIn('ARCHIVED', records_status_sync1) + + # Verifying that ARCHIVED records are returned for sync 2 + self.assertIn('ARCHIVED', records_status_sync2) + + # Verify the number of records replicated in sync 2 is greater than the number + # of records replicated in sync 1 + self.assertGreater(record_count_sync_2, record_count_sync_1) + + # Verify the records replicated in sync 1 were also replicated in sync 2 + self.assertTrue(primary_keys_sync_1.issubset(primary_keys_sync_2)) diff --git a/tests/test_facebook_attribution_window.py b/tests/test_facebook_attribution_window.py new file mode 100644 index 00000000..73dcb6ff --- /dev/null +++ b/tests/test_facebook_attribution_window.py @@ -0,0 +1,120 @@ +import base +import os + +from tap_tester import runner, connections +from tap_tester.base_case import BaseCase as base_case +from base import FacebookBaseTest, LOGGER + + +class FacebookAttributionWindow(FacebookBaseTest): + + is_done = None + + @staticmethod + def name(): + return "tap_tester_facebook_attribution_window" + + def streams_to_test(self): + """ 'attribution window' is only supported for 'ads_insights' streams """ + + # Fail the test when the JIRA card is done to allow stream to be re-added and tested + if self.is_done is None: + self.is_done = base.JIRA_CLIENT.get_status_category("TDL-24312") == 'done' + self.assert_message = ("JIRA ticket has moved to done, re-add the " + "ads_insights_hourly_advertiser stream to the test.") + assert self.is_done != True, self.assert_message + + # return [stream for stream in self.expected_streams() if self.is_insight(stream)] + return [stream for stream in self.expected_streams() + if self.is_insight(stream) + and stream != 'ads_insights_hourly_advertiser'] + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date': self.start_date, + 'end_date': self.end_date, + 'insights_buffer_days': str(self.ATTRIBUTION_WINDOW) + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + @base_case.skipUnless(base.JIRA_CLIENT.get_status_category("TDL-26640") == "done", "TDL-26640") + def test_run(self): + """ + For the test ad set up in facebook ads manager we see data + on April 7th, start date is based on this data + """ + # attrribution window = 7 + self.ATTRIBUTION_WINDOW = 7 + self.start_date = '2021-04-14T00:00:00Z' + self.end_date = '2021-04-15T00:00:00Z' + self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date) + + # attribution window = 28 + self.ATTRIBUTION_WINDOW = 28 + self.start_date = '2021-04-30T00:00:00Z' + self.end_date = '2021-05-01T00:00:00Z' + self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date) + + # attribution window = 1 + self.ATTRIBUTION_WINDOW = 1 + self.start_date = '2021-04-08T00:00:00Z' + self.end_date = '2021-04-09T00:00:00Z' + self.run_test(self.ATTRIBUTION_WINDOW, self.start_date, self.end_date) + + def run_test(self, attr_window, start_date, end_date): + """ + Test to check the attribution window + """ + + expected_streams = self.streams_to_test() + + conn_id = connections.ensure_connection(self) + + # calculate start date with attribution window + start_date_with_attribution_window = self.timedelta_formatted( + start_date, days=-attr_window, date_format=self.START_DATE_FORMAT + ) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Select only the expected streams tables + catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id, catalog_entries, select_all_fields=True) + + # Run a sync job using orchestrator + self.run_and_verify_sync(conn_id) + sync_records = runner.get_records_from_target_output() + + expected_replication_keys = self.expected_replication_keys() + + for stream in expected_streams: + with self.subTest(stream=stream): + + replication_key = next(iter(expected_replication_keys[stream])) + + # get records + records = [record.get('data') for record in sync_records.get(stream).get('messages')] + + # check for the record is between attribution date and start date + is_between = False + + for record in records: + replication_key_value = record.get(replication_key) + + # Verify the sync records respect the attribution window + self.assertGreaterEqual(self.parse_date(replication_key_value), self.parse_date(start_date_with_attribution_window), + msg="The record does not respect the attribution window.") + + # verify if the record's bookmark value is between start date and attribution window + if self.parse_date(start_date_with_attribution_window) <= self.parse_date(replication_key_value) <= self.parse_date(start_date): + is_between = True + + self.assertTrue(is_between) diff --git a/tests/test_facebook_automatic_fields.py b/tests/test_facebook_automatic_fields.py index a98444fd..5a57dafd 100644 --- a/tests/test_facebook_automatic_fields.py +++ b/tests/test_facebook_automatic_fields.py @@ -3,24 +3,53 @@ """ import os -from tap_tester import runner, connections - +from tap_tester import runner, connections, LOGGER +import base from base import FacebookBaseTest class FacebookAutomaticFields(FacebookBaseTest): """Test that with no fields selected for a stream automatic fields are still replicated""" + is_done = None + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + EXCLUDE_STREAMS = { + 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640 + 'ads_insights_platform_and_device', # TDL-26640 + 'ads_insights', # TDL-26640 + 'ads_insights_age_and_gender', # TDL-26640 + 'ads_insights_country', # TDL-26640 + 'ads_insights_dma', # TDL-26640 + 'ads_insights_region' # TDL-26640 + } + @staticmethod def name(): return "tap_tester_facebook_automatic_fields" + def streams_to_test(self): + expected_streams = self.expected_metadata().keys() + self.assert_message = f"JIRA ticket has moved to done, \ + re-add the applicable stream to the test: {0}" + assert base.JIRA_CLIENT.get_status_category("TDL-24312") != 'done',\ + self.assert_message.format('ads_insights_hourly_advertiser') + expected_streams = self.expected_metadata().keys() - {'ads_insights_hourly_advertiser'} + LOGGER.warn(f"Skipped streams: {'ads_insights_hourly_advertiser'}") + + assert base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done',\ + self.assert_message.format(self.EXCLUDE_STREAMS) + expected_streams = self.expected_metadata().keys() - self.EXCLUDE_STREAMS + LOGGER.warn(f"Skipped streams: {self.EXCLUDE_STREAMS}") + + return expected_streams + def get_properties(self, original: bool = True): """Configuration properties required for the tap.""" return_value = { 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), - 'start_date' : '2019-07-22T00:00:00Z', - 'end_date' : '2019-07-23T00:00:00Z', + 'start_date' : '2021-04-08T00:00:00Z', + 'end_date' : '2021-04-08T00:00:00Z', 'insights_buffer_days': '1' } if original: @@ -41,7 +70,7 @@ def test_run(self): that 251 (or more) records have been posted for that stream. """ - expected_streams = self.expected_streams() + expected_streams = self.streams_to_test() # instantiate connection conn_id = connections.ensure_connection(self) diff --git a/tests/test_facebook_bookmarks.py b/tests/test_facebook_bookmarks.py index ada0da17..817f39a1 100644 --- a/tests/test_facebook_bookmarks.py +++ b/tests/test_facebook_bookmarks.py @@ -1,33 +1,48 @@ -import os +import base import datetime import dateutil.parser +import os import pytz -from tap_tester import runner, menagerie, connections +from tap_tester import runner, menagerie, connections, LOGGER from base import FacebookBaseTest class FacebookBookmarks(FacebookBaseTest): + + is_done = None + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + EXCLUDE_STREAMS = { + 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640 + 'ads_insights_platform_and_device', # TDL-26640 + 'ads_insights', # TDL-26640 + 'ads_insights_age_and_gender', # TDL-26640 + 'ads_insights_country', # TDL-26640 + 'ads_insights_dma', # TDL-26640 + 'ads_insights_region' # TDL-26640 + } + @staticmethod def name(): return "tap_tester_facebook_bookmarks" - def get_properties(self, original: bool = True): - """Configuration properties required for the tap.""" - return_value = { - 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), - 'start_date' : '2019-07-22T00:00:00Z', - 'end_date' : '2019-07-26T00:00:00Z', - 'insights_buffer_days': '1' - } - if original: - return return_value + def streams_to_test(self): + expected_streams = self.expected_metadata().keys() + self.assert_message = f"JIRA ticket has moved to done, \ + re-add the applicable stream to the test: {0}" + assert base.JIRA_CLIENT.get_status_category("TDL-24312") != 'done',\ + self.assert_message.format('ads_insights_hourly_advertiser') + expected_streams = self.expected_metadata().keys() - {'ads_insights_hourly_advertiser'} + LOGGER.warn(f"Skipped streams: {'ads_insights_hourly_advertiser'}") - return_value["start_date"] = self.start_date - return_value["end_date"] = self.end_date + assert base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done',\ + self.assert_message.format(self.EXCLUDE_STREAMS) + expected_streams = self.expected_metadata().keys() - self.EXCLUDE_STREAMS + LOGGER.warn(f"Skipped streams: {self.EXCLUDE_STREAMS}") - return return_value + return expected_streams @staticmethod def convert_state_to_utc(date_str): @@ -63,7 +78,9 @@ def calculated_states_by_stream(self, current_state): leads '2021-04-07T20:09:39+0000', '2021-04-07T20:08:27+0000', """ - timedelta_by_stream = {stream: [2,0,0] # {stream_name: [days, hours, minutes], ...} + # TODO We want to move this bookmark back by some amount for insgihts streams but + # cannot do that unless we have at least 3 days of data. Currently we have 2. + timedelta_by_stream = {stream: [0,0,0] # {stream_name: [days, hours, minutes], ...} for stream in self.expected_streams()} timedelta_by_stream['campaigns'] = [0, 1, 0] timedelta_by_stream['adsets'] = [0, 1, 0] @@ -84,16 +101,32 @@ def calculated_states_by_stream(self, current_state): return stream_to_calculated_state + # function for verifying the date format + def is_expected_date_format(self, date): + try: + # parse date + datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + # return False if date is in not expected format + return False + # return True in case of no error + return True def test_run(self): - expected_streams = self.expected_streams() + expected_streams = self.streams_to_test() non_insight_streams = {stream for stream in expected_streams if not self.is_insight(stream)} insight_streams = {stream for stream in expected_streams if self.is_insight(stream)} # Testing against ads insights objects self.start_date = self.get_properties()['start_date'] self.end_date = self.get_properties()['end_date'] - self.bookmarks_test(insight_streams) + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + status_category = base.JIRA_CLIENT.get_status_category("TDL-26640") + assert status_category != 'done',\ + "TDL-26640 is fixed, re-enable the test for insights streams" + # Uncomment the following line when ready to run the test for insights streams + # self.bookmarks_test(insight_streams) # Testing against core objects self.end_date = '2021-02-09T00:00:00Z' @@ -176,7 +209,8 @@ def bookmarks_test(self, expected_streams): second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) simulated_bookmark_value = new_states['bookmarks'][stream][replication_key] simulated_bookmark_minus_lookback = self.timedelta_formatted( - simulated_bookmark_value, days=expected_insights_buffer + simulated_bookmark_value, days=expected_insights_buffer, + date_format=self.BOOKMARK_COMPARISON_FORMAT ) if self.is_insight(stream) else simulated_bookmark_value @@ -193,14 +227,16 @@ def bookmarks_test(self, expected_streams): for record in second_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the second sync records respect the previous (simulated) bookmark value replication_key_value = record.get(replication_key) - if stream in {'ads_insights_age_and_gender', 'ads_insights_hourly_advertiser'}: # BUG | https://stitchdata.atlassian.net/browse/SRCE-4873 - replication_key_value = datetime.datetime.strftime( - dateutil.parser.parse(replication_key_value), - self.BOOKMARK_COMPARISON_FORMAT - ) self.assertGreaterEqual(replication_key_value, simulated_bookmark_minus_lookback, msg="Second sync records do not repect the previous bookmark.") @@ -211,6 +247,13 @@ def bookmarks_test(self, expected_streams): ) for record in first_sync_messages: + # for "ads_insights_age_and_gender" and "ads_insights_hourly_advertiser" + # verify that the "date_start" and "date_stop" is in expected format + if stream in ["ads_insights_age_and_gender", "ads_insights_hourly_advertiser"]: + date_start = record.get("date_start") + self.assertTrue(self.is_expected_date_format(date_start)) + date_stop = record.get("date_stop") + self.assertTrue(self.is_expected_date_format(date_stop)) # Verify the first sync bookmark value is the max replication key value for a given stream replication_key_value = record.get(replication_key) diff --git a/tests/test_facebook_discovery.py b/tests/test_facebook_discovery.py index 46e04fc0..1a865a06 100644 --- a/tests/test_facebook_discovery.py +++ b/tests/test_facebook_discovery.py @@ -1,134 +1,14 @@ -"""Test tap discovery mode and metadata.""" -import re +import unittest +from tap_tester.base_suite_tests.discovery_test import DiscoveryTest -from tap_tester import menagerie, connections +from base_new_frmwrk import FacebookBaseTest -from base import FacebookBaseTest - -class DiscoveryTest(FacebookBaseTest): - """Test tap discovery mode and metadata conforms to standards.""" +class FacebookDiscoveryTest(DiscoveryTest, FacebookBaseTest): + """Standard Discovery Test""" @staticmethod def name(): - return "tap_tester_facebook_discovery_test" - - def test_run(self): - """ - Testing that discovery creates the appropriate catalog with valid metadata. - - • Verify number of actual streams discovered match expected - • Verify the stream names discovered were what we expect - • Verify stream names follow naming convention - streams should only have lowercase alphas and underscores - • verify there is only 1 top level breadcrumb - • verify replication key(s) - • verify primary key(s) - • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - • verify the actual replication matches our expected replication method - • verify that primary, replication and foreign keys - are given the inclusion of automatic. - • verify that all other fields have inclusion of available metadata. - """ - streams_to_test = self.expected_streams() - - conn_id = connections.ensure_connection(self) - - found_catalogs = self.run_and_verify_check_mode(conn_id) - - # Verify stream names follow naming convention - # streams should only have lowercase alphas and underscores - found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} - self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), - msg="One or more streams don't follow standard naming") - - for stream in streams_to_test: - with self.subTest(stream=stream): - - # Verify ensure the caatalog is found for a given stream - catalog = next(iter([catalog for catalog in found_catalogs - if catalog["stream_name"] == stream])) - self.assertIsNotNone(catalog) - - # collecting expected values - expected_primary_keys = self.expected_primary_keys()[stream] - expected_replication_keys = self.expected_replication_keys()[stream] - expected_automatic_fields = expected_primary_keys | expected_replication_keys - expected_replication_method = self.expected_replication_method()[stream] - - # collecting actual values... - schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) - metadata = schema_and_metadata["metadata"] - stream_properties = [item for item in metadata if item.get("breadcrumb") == []] - actual_primary_keys = set( - stream_properties[0].get( - "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) - ) - actual_replication_keys = set( - stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) - ) - actual_replication_method = stream_properties[0].get( - "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) - actual_automatic_fields = set( - item.get("breadcrumb", ["properties", None])[1] for item in metadata - if item.get("metadata").get("inclusion") == "automatic" - ) - - ########################################################################## - ### metadata assertions - ########################################################################## - - # verify there is only 1 top level breadcrumb in metadata - self.assertTrue(len(stream_properties) == 1, - msg="There is NOT only one top level breadcrumb for {}".format(stream) + \ - "\nstream_properties | {}".format(stream_properties)) - - # BUG_1 | https://stitchdata.atlassian.net/browse/SRCE-4855 - failing_with_no_replication_keys = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_keys: # BUG_1 - # verify replication key(s) match expectations - self.assertSetEqual( - expected_replication_keys, actual_replication_keys - ) - - # verify primary key(s) match expectations - self.assertSetEqual( - expected_primary_keys, actual_primary_keys, - ) - - # BUG_2 | https://stitchdata.atlassian.net/browse/SRCE-4856 - failing_with_no_replication_method = { - 'ads_insights_country', 'adsets', 'adcreative', 'ads', 'ads_insights_region', - 'campaigns', 'ads_insights_age_and_gender', 'ads_insights_platform_and_device', - 'ads_insights_dma', 'ads_insights', 'leads', 'ads_insights_hourly_advertiser' - } - if stream not in failing_with_no_replication_method: # BUG_2 - # verify the replication method matches our expectations - self.assertEqual( - expected_replication_method, actual_replication_method - ) - - # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - if actual_replication_keys: - self.assertEqual(self.INCREMENTAL, actual_replication_method) - else: - self.assertEqual(self.FULL_TABLE, actual_replication_method) - - # verify that primary keys and replication keys - # are given the inclusion of automatic in metadata. - self.assertSetEqual(expected_automatic_fields, actual_automatic_fields) - - # verify that all other fields have inclusion of available - # This assumes there are no unsupported fields for SaaS sources - self.assertTrue( - all({item.get("metadata").get("inclusion") == "available" - for item in metadata - if item.get("breadcrumb", []) != [] - and item.get("breadcrumb", ["properties", None])[1] - not in actual_automatic_fields}), - msg="Not all non key properties are set to available in metadata") + return "tt_facebook_discovery" + def streams_to_test(self): + return self.expected_stream_names() diff --git a/tests/test_facebook_field_selection.py b/tests/test_facebook_field_selection.py index 45761722..0203239a 100644 --- a/tests/test_facebook_field_selection.py +++ b/tests/test_facebook_field_selection.py @@ -1,7 +1,7 @@ import os from functools import reduce -from tap_tester import connections, menagerie, runner +from tap_tester import connections, menagerie, runner, LOGGER from base import FacebookBaseTest @@ -13,7 +13,7 @@ def name(): return "tap_tester_facebook_field_selection" @staticmethod - def expected_check_streams(): + def streams_to_test(): return { 'ads', 'adcreative', @@ -29,23 +29,6 @@ def expected_check_streams(): #'leads', } - @staticmethod - def expected_sync_streams(): - return { - "ads", - "adcreative", - "adsets", - "campaigns", - "ads_insights", - "ads_insights_age_and_gender", - "ads_insights_country", - "ads_insights_platform_and_device", - "ads_insights_region", - "ads_insights_dma", - "ads_insights_hourly_advertiser", - #"leads", - } - @staticmethod def expected_pks(): return { @@ -55,7 +38,7 @@ def expected_pks(): "campaigns" : {"id"}, "ads_insights" : {"campaign_id", "adset_id", "ad_id", "date_start"}, "ads_insights_age_and_gender" : {"campaign_id", "adset_id", "ad_id", "date_start", "age", "gender"}, - "ads_insights_country" : {"campaign_id", "adset_id", "ad_id", "date_start"}, + "ads_insights_country" : {"campaign_id", "adset_id", "ad_id", "date_start", "country"}, "ads_insights_platform_and_device": {"campaign_id", "adset_id", "ad_id", "date_start", "publisher_platform", "platform_position", "impression_device"}, "ads_insights_region" : {"campaign_id", "adset_id", "ad_id", "date_start"}, "ads_insights_dma" : {"campaign_id", "adset_id", "ad_id", "date_start"}, @@ -76,6 +59,9 @@ def get_properties(self): # pylint: disable=arguments-differ } def test_run(self): + + expected_streams = self.streams_to_test() + conn_id = connections.ensure_connection(self) # run in check mode @@ -90,9 +76,9 @@ def test_run(self): found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) - diff = self.expected_check_streams().symmetric_difference( found_catalog_names ) + diff = expected_streams.symmetric_difference( found_catalog_names ) self.assertEqual(len(diff), 0, msg="discovered schemas do not match: {}".format(diff)) - print("discovered schemas are kosher") + LOGGER.info("discovered schemas are kosher") all_excluded_fields = {} # select all catalogs @@ -118,7 +104,7 @@ def test_run(self): menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) # This should be validating the the PKs are written in each record - record_count_by_stream = runner.examine_target_output_file(self, conn_id, self.expected_sync_streams(), self.expected_pks()) + record_count_by_stream = runner.examine_target_output_file(self, conn_id, expected_streams, self.expected_pks()) replicated_row_count = reduce(lambda accum,c : accum + c, record_count_by_stream.values()) self.assertGreater(replicated_row_count, 0, msg="failed to replicate any data: {}".format(record_count_by_stream)) print("total replicated row count: {}".format(replicated_row_count)) diff --git a/tests/test_facebook_invalid_attribution_window.py b/tests/test_facebook_invalid_attribution_window.py new file mode 100644 index 00000000..bcd1eef5 --- /dev/null +++ b/tests/test_facebook_invalid_attribution_window.py @@ -0,0 +1,65 @@ +import os +import unittest + +from tap_tester import runner, connections, menagerie + +from base import FacebookBaseTest + +class FacebookInvalidAttributionWindowInt(FacebookBaseTest): + + @staticmethod + def name(): + return "tt_facebook_invalid_window_int" + + @staticmethod + def streams_to_test(): + return [] + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), + 'start_date' : '2019-07-24T00:00:00Z', + 'end_date' : '2019-07-26T00:00:00Z', + 'insights_buffer_days': self.ATTRIBUTION_WINDOW, + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + def test_run(self): + self.ATTRIBUTION_WINDOW = '10' # set attribution window other than 1, 7 or 28 + self.run_test() + + def run_test(self): + """ + Test to verify that the error is raise when passing attribution window other than 1, 7 or 28 + """ + # create connection + conn_id = connections.ensure_connection(self) + # run check mode + check_job_name = runner.run_check_mode(self, conn_id) + # get exit status + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + # get discovery error message + discovery_error_message = exit_status.get('discovery_error_message') + # validate the error message + self.assertEquals(discovery_error_message, "The attribution window must be 1, 7 or 28.") + self.assertIsNone(exit_status.get('target_exit_status')) + self.assertIsNone(exit_status.get('tap_exit_status')) + + +class FacebookInvalidAttributionWindowStr(FacebookInvalidAttributionWindowInt): + + @staticmethod + def name(): + return "tt_facebook_invalid_window_str" + + @unittest.skip("BUG: TDL-18569") + def test_run(self): + self.ATTRIBUTION_WINDOW = 'something' + self.run_test() + + # BUG : TDL-18569 created to standarize the error message for sting values for attribution window diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py new file mode 100644 index 00000000..364fca96 --- /dev/null +++ b/tests/test_facebook_pagination.py @@ -0,0 +1,68 @@ +import test_client as tc +import time +import unittest + +from datetime import datetime as dt + +from tap_tester.base_suite_tests.pagination_test import PaginationTest +from tap_tester import connections, runner, menagerie, LOGGER + +from base_new_frmwrk import FacebookBaseTest + +fb_client = tc.TestClient() + + +class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest): + """Standard Pagination Test""" + + @staticmethod + def name(): + return "tt_facebook_pagination" + def streams_to_test(self): + # TODO ads_insights empty for account, no post via API, spike on generating data + return {'adcreative', 'ads', 'adsets', 'campaigns'} + + def setUp(self): # pylint: disable=invalid-name + """ + Setup for tests in this module. + """ + if PaginationTest.synced_records and PaginationTest.record_count_by_stream: + return + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in self.streams_to_test()] + + # non_selected_fields are none + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) + + # ensure there is enough data to paginate + start_date_dt = self.parse_date(self.start_date) + date_range = {'since': dt.strftime(start_date_dt, "%Y-%m-%d"), + 'until': dt.strftime(dt.now(), "%Y-%m-%d")} + + for stream in self.streams_to_test(): + limit = self.expected_page_size(stream) + response = fb_client.get_account_objects(stream, limit, date_range) + + number_of_records = len(response['data']) + # TODO move "if" logic below to client method get_account_objects() + if number_of_records >= limit and response.get('paging', {}).get('next'): + continue # stream is ready for test, no need for futher action + + LOGGER.info(f"Stream: {stream} - Record count is less than max page size: {limit}, " + "posting more records to setUp the PaginationTest") + + for i in range(limit - number_of_records + 1): + post_response = fb_client.create_account_objects(stream) + LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}") + + # run initial sync + PaginationTest.record_count_by_stream = self.run_and_verify_sync_mode(conn_id) + PaginationTest.synced_records = runner.get_records_from_target_output() diff --git a/tests/test_facebook_start_date.py b/tests/test_facebook_start_date.py index ccf7de34..2667b6d9 100644 --- a/tests/test_facebook_start_date.py +++ b/tests/test_facebook_start_date.py @@ -1,42 +1,59 @@ +import base import os -from tap_tester import connections, runner +from tap_tester import connections, runner, LOGGER from base import FacebookBaseTest class FacebookStartDateTest(FacebookBaseTest): + is_done = None start_date_1 = "" start_date_2 = "" + # TODO: https://jira.talendforge.org/browse/TDL-26640 + EXCLUDE_STREAMS = { + 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640 + 'ads_insights_platform_and_device', # TDL-26640 + 'ads_insights', # TDL-26640 + 'ads_insights_age_and_gender', # TDL-26640 + 'ads_insights_country', # TDL-26640 + 'ads_insights_dma', # TDL-26640 + 'ads_insights_region' # TDL-26640 + } + @staticmethod def name(): return "tap_tester_facebook_start_date_test" - def get_properties(self, original: bool = True): - """Configuration properties required for the tap.""" - return_value = { - 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), - 'start_date' : '2019-07-22T00:00:00Z', - 'end_date' : '2019-07-26T00:00:00Z', - 'insights_buffer_days': '1' - } - if original: - return return_value + def streams_to_test(self): + expected_streams = self.expected_metadata().keys() + self.assert_message = f"JIRA ticket has moved to done, \ + re-add the applicable stream to the test: {0}" + assert base.JIRA_CLIENT.get_status_category("TDL-24312") != 'done',\ + self.assert_message.format('ads_insights_hourly_advertiser') + expected_streams = self.expected_metadata().keys() - {'ads_insights_hourly_advertiser'} + LOGGER.warn(f"Skipped streams: {'ads_insights_hourly_advertiser'}") + + assert base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done',\ + self.assert_message.format(self.EXCLUDE_STREAMS) + expected_streams = self.expected_metadata().keys() - self.EXCLUDE_STREAMS + LOGGER.warn(f"Skipped streams: {self.EXCLUDE_STREAMS}") - return_value["start_date"] = self.start_date - return return_value + return expected_streams def test_run(self): """Instantiate start date according to the desired data set and run the test""" - self.start_date_1 = self.get_properties().get('start_date') - self.start_date_2 = self.timedelta_formatted(self.start_date_1, days=3) + self.start_date_1 = '2021-04-07T00:00:00Z' + self.start_date_2 = self.timedelta_formatted( + self.start_date_1, days=2, date_format=self.START_DATE_FORMAT + ) self.start_date = self.start_date_1 - expected_streams = self.expected_streams() + expected_streams = self.streams_to_test() ########################################################################## ### First Sync @@ -61,7 +78,7 @@ def test_run(self): ### Update START DATE Between Syncs ########################################################################## - print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(self.start_date, self.start_date_2)) + LOGGER.info("REPLICATION START DATE CHANGE: %s ===>>> %s ", self.start_date, self.start_date_2) self.start_date = self.start_date_2 ########################################################################## @@ -89,8 +106,12 @@ def test_run(self): # expected values expected_primary_keys = self.expected_primary_keys()[stream] expected_insights_buffer = -1 * int(self.get_properties()['insights_buffer_days']) - expected_start_date_1 = self.timedelta_formatted(self.start_date_1, days=expected_insights_buffer) - expected_start_date_2 = self.timedelta_formatted(self.start_date_2, days=expected_insights_buffer) + expected_start_date_1 = self.timedelta_formatted( + self.start_date_1, days=expected_insights_buffer, date_format=self.START_DATE_FORMAT + ) + expected_start_date_2 = self.timedelta_formatted( + self.start_date_2, days=expected_insights_buffer, date_format=self.START_DATE_FORMAT + ) # collect information for assertions from syncs 1 & 2 base on expected values record_count_sync_1 = record_count_by_stream_1.get(stream, 0) diff --git a/tests/test_facebook_table_reset.py b/tests/test_facebook_table_reset.py new file mode 100644 index 00000000..0c8a1e46 --- /dev/null +++ b/tests/test_facebook_table_reset.py @@ -0,0 +1,90 @@ +import os +import dateutil.parser +import datetime +import base +from base_new_frmwrk import FacebookBaseTest +from tap_tester.base_suite_tests.table_reset_test import TableResetTest +from tap_tester import LOGGER + + +class FacebookTableResetTest(TableResetTest, FacebookBaseTest): + """tap-salesforce Table reset test implementation + Currently tests only the stream with Incremental replication method""" + + is_done = None + + @staticmethod + def name(): + return "tt_facebook_table_reset" + + # TODO: https://jira.talendforge.org/browse/TDL-26640 + EXCLUDE_STREAMS = { + 'ads_insights_hourly_advertiser', # TDL-24312, TDL-26640 + 'ads_insights_platform_and_device', # TDL-26640 + 'ads_insights', # TDL-26640 + 'ads_insights_age_and_gender', # TDL-26640 + 'ads_insights_country', # TDL-26640 + 'ads_insights_dma', # TDL-26640 + 'ads_insights_region' # TDL-26640 + } + + def streams_to_test(self): + expected_streams = self.expected_metadata().keys() + self.assert_message = f"JIRA ticket has moved to done, \ + re-add the applicable stream to the test: {0}" + assert base.JIRA_CLIENT.get_status_category("TDL-24312") != 'done',\ + self.assert_message.format('ads_insights_hourly_advertiser') + expected_streams = self.expected_metadata().keys() - {'ads_insights_hourly_advertiser'} + LOGGER.warn(f"Skipped streams: {'ads_insights_hourly_advertiser'}") + + assert base.JIRA_CLIENT.get_status_category("TDL-26640") != 'done',\ + self.assert_message.format(self.EXCLUDE_STREAMS) + expected_streams = self.expected_metadata().keys() - self.EXCLUDE_STREAMS + LOGGER.warn(f"Skipped streams: {self.EXCLUDE_STREAMS}") + + return expected_streams + + @property + def reset_stream(self): + return ('ads') + + + def calculated_states_by_stream(self, current_state): + + """ The following streams barely make the cut: + + campaigns "2021-02-09T18:17:30.000000Z" + "2021-02-09T16:24:58.000000Z" + + adsets "2021-02-09T18:17:41.000000Z" + "2021-02-09T17:10:09.000000Z" + + leads '2021-04-07T20:09:39+0000', + '2021-04-07T20:08:27+0000', + """ + timedelta_by_stream = {stream: [0,0,0] # {stream_name: [days, hours, minutes], ...} + for stream in self.expected_stream_names()} + timedelta_by_stream['campaigns'] = [0, 1, 0] + timedelta_by_stream['adsets'] = [0, 1, 0] + timedelta_by_stream['leads'] = [0, 0 , 1] + + stream_to_calculated_state = {stream: "" for stream in current_state['bookmarks'].keys()} + for stream, state in current_state['bookmarks'].items(): + state_key, state_value = next(iter(state.keys())), next(iter(state.values())) + state_as_datetime = dateutil.parser.parse(state_value) + days, hours, minutes = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes) + + state_format = '%Y-%m-%dT00:00:00+00:00' if self.is_insight(stream) else '%Y-%m-%dT%H:%M:%S-00:00' + calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format) + + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state + + def manipulate_state(self,current_state): + new_states = {'bookmarks': dict()} + simulated_states = self.calculated_states_by_stream(current_state) + for stream, new_state in simulated_states.items(): + new_states['bookmarks'][stream] = new_state + return new_states diff --git a/tests/unittests/test_attribute_error_retry.py b/tests/unittests/test_attribute_error_retry.py new file mode 100644 index 00000000..63949a2b --- /dev/null +++ b/tests/unittests/test_attribute_error_retry.py @@ -0,0 +1,193 @@ +import unittest +from unittest.mock import Mock +from unittest import mock +from tap_facebook import AdCreative, Ads, AdSets, Campaigns, AdsInsights, Leads + +@mock.patch("time.sleep") +class TestAttributErrorBackoff(unittest.TestCase): + """A set of unit tests to ensure that requests are retrying properly for AttributeError Error""" + def test_get_adcreatives(self, mocked_sleep): + """ + AdCreative.get_adcreatives calls a `facebook_business` method,`get_ad_creatives()`, to get a batch of ad creatives. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_creatives function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ad_creatives = Mock() + mocked_account.get_ad_creatives.side_effect = AttributeError + + # Call get_adcreatives() function of AdCreatives and verify AttributeError is raised + ad_creative_object = AdCreative('', mocked_account, '', '') + with self.assertRaises(AttributeError): + ad_creative_object.get_adcreatives() + + # verify get_ad_creatives() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_creatives.call_count, 5) + + def test_call_get_ads(self, mocked_sleep): + """ + Ads._call_get_ads calls a `facebook_business` method,`get_ads()`, to get a batch of ads. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ads function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = AttributeError + + # Call _call_get_ads() function of Ads and verify AttributeError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + ad_object._call_get_ads('test') + + # verify get_ads() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ads.call_count, 5) + + @mock.patch("pendulum.parse") + def test_ad_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Ads calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock ad object + mocked_ad = Mock() + mocked_ad.api_get = Mock() + mocked_ad.__getitem__ = Mock() + mocked_ad.api_get.side_effect = AttributeError + + # # Mock get_ads function return mocked ad object + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = [[mocked_ad]] + + # Iterate ads object which calls prepare_record() inside and verify AttributeError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in ad_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_ad.api_get.call_count, 5) + + def test__call_get_ad_sets(self, mocked_sleep): + """ + AdSets._call_get_ad_sets calls a `facebook_business` method,`get_ad_sets()`, to get a batch of adsets. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_sets function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = AttributeError + + # Call _call_get_ad_sets() function of AdSets and verify AttributeError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + ad_set_object._call_get_ad_sets('test') + + # verify get_ad_sets() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_sets.call_count, 5) + + @mock.patch("pendulum.parse") + def test_adset_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of AdSets calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock adset object + mocked_adset = Mock() + mocked_adset.api_get = Mock() + mocked_adset.__getitem__ = Mock() + mocked_adset.api_get.side_effect = AttributeError + + # Mock get_ad_sets function return mocked ad object + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = [[mocked_adset]] + + # Iterate adset object which calls prepare_record() inside and verify AttributeError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in ad_set_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_adset.api_get.call_count, 5) + + def test__call_get_campaigns(self, mocked_sleep): + """ + Campaigns._call_get_campaigns calls a `facebook_business` method,`get_campaigns()`, to get a batch of campaigns. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_campaigns function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = AttributeError + + # Call _call_get_campaigns() function of Campaigns and verify AttributeError is raised + campaigns_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + campaigns_object._call_get_campaigns('test') + + # verify get_campaigns() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_campaigns.call_count, 5) + + @mock.patch("pendulum.parse") + def test_campaign_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Campaigns calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # # Mock campaign object + mocked_campaign = Mock() + mocked_campaign.api_get = Mock() + mocked_campaign.__getitem__ = Mock() + mocked_campaign.api_get.side_effect = AttributeError + + # # Mock get_campaigns function return mocked ad object + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = [[mocked_campaign]] + + # Iterate campaigns object which calls prepare_record() inside and verify AttributeError is raised + campaign_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(AttributeError): + for message in campaign_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_campaign.api_get.call_count, 5) + + def test_run_job(self, mocked_sleep): + """ + AdsInsights.run_job calls a `facebook_business` method,`get_insights()`, to get a batch of insights. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_insights function to throw AttributeError exception + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.side_effect = AttributeError + + # Call run_job() function of Campaigns and verify AttributeError is raised + ads_insights_object = AdsInsights('', mocked_account, '', '', '', {}) + with self.assertRaises(AttributeError): + ads_insights_object.run_job('test') + + # verify get_insights() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_insights.call_count, 5) diff --git a/tests/unittests/test_attribution_window.py b/tests/unittests/test_attribution_window.py new file mode 100644 index 00000000..a3e6cc5c --- /dev/null +++ b/tests/unittests/test_attribution_window.py @@ -0,0 +1,29 @@ +import unittest +import tap_facebook.__init__ as tap_facebook + +class TestAttributionWindow(unittest.TestCase): + """ + Test case to verify that proper error message is raise + when user enters attribution window other than 1, 7 and 28 + """ + + def test_invalid_attribution_window(self): + error_message = None + + # set config + tap_facebook.CONFIG = { + "start_date": "2019-01-01T00:00:00Z", + "account_id": "test_account_id", + "access_token": "test_access_token", + "insights_buffer_days": 30 + } + + try: + # initialize 'AdsInsights' stream as attribution window is only supported in those streams + tap_facebook.AdsInsights("test", "test", "test", None, {}, {}) + except Exception as e: + # save error message for assertion + error_message = str(e) + + # verify the error message was as expected + self.assertEquals(error_message, "The attribution window must be 1, 7 or 28.") diff --git a/tests/unittests/test_request_timeout.py b/tests/unittests/test_request_timeout.py new file mode 100644 index 00000000..19737470 --- /dev/null +++ b/tests/unittests/test_request_timeout.py @@ -0,0 +1,504 @@ +import unittest +import tap_facebook +from unittest.mock import Mock +from unittest import mock +from requests.exceptions import ConnectionError, Timeout +from tap_facebook import AdCreative, Ads, AdSets, Campaigns, AdsInsights, Leads + +@mock.patch("time.sleep") +class TestRequestTimeoutBackoff(unittest.TestCase): + """A set of unit tests to ensure that requests are retrying properly for Timeout Error""" + def test_get_adcreatives(self, mocked_sleep): + """ + AdCreative.get_adcreatives calls a `facebook_business` method,`get_ad_creatives()`, to get a batch of ad creatives. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_creatives function to throw Timeout exception + mocked_account = Mock() + mocked_account.get_ad_creatives = Mock() + mocked_account.get_ad_creatives.side_effect = Timeout + + # Call get_adcreatives() function of AdCreatives and verify Timeout is raised + ad_creative_object = AdCreative('', mocked_account, '', '') + with self.assertRaises(Timeout): + ad_creative_object.get_adcreatives() + + # verify get_ad_creatives() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_creatives.call_count, 5) + + def test__call_get_ads(self, mocked_sleep): + """ + Ads._call_get_ads calls a `facebook_business` method,`get_ads()`, to get a batch of ads. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ads function to throw Timeout exception + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = Timeout + + # Call _call_get_ads() function of Ads and verify Timeout is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + ad_object._call_get_ads('test') + + # verify get_ads() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ads.call_count, 5) + + @mock.patch("pendulum.parse") + def test_ad_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Ads calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock ad object + mocked_ad = Mock() + mocked_ad.api_get = Mock() + mocked_ad.__getitem__ = Mock() + mocked_ad.api_get.side_effect = Timeout + + # # Mock get_ads function return mocked ad object + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = [[mocked_ad]] + + # Iterate ads object which calls prepare_record() inside and verify Timeout is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + for message in ad_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_ad.api_get.call_count, 5) + + def test__call_get_ad_sets(self, mocked_sleep): + """ + AdSets._call_get_ad_sets calls a `facebook_business` method,`get_ad_sets()`, to get a batch of adsets. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_sets function to throw Timeout exception + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = Timeout + + # Call _call_get_ad_sets() function of AdSets and verify Timeout is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + ad_set_object._call_get_ad_sets('test') + + # verify get_ad_sets() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_sets.call_count, 5) + + @mock.patch("pendulum.parse") + def test_adset_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of AdSets calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock adset object + mocked_adset = Mock() + mocked_adset.api_get = Mock() + mocked_adset.__getitem__ = Mock() + mocked_adset.api_get.side_effect = Timeout + + # Mock get_ad_sets function return mocked ad object + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = [[mocked_adset]] + + # Iterate adset object which calls prepare_record() inside and verify Timeout is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + for message in ad_set_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_adset.api_get.call_count, 5) + + def test__call_get_campaigns(self, mocked_sleep): + """ + Campaigns._call_get_campaigns calls a `facebook_business` method,`get_campaigns()`, to get a batch of campaigns. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_campaigns function to throw Timeout exception + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = Timeout + + # Call _call_get_campaigns() function of Campaigns and verify Timeout is raised + campaigns_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + campaigns_object._call_get_campaigns('test') + + # verify get_campaigns() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_campaigns.call_count, 5) + + @mock.patch("pendulum.parse") + def test_campaign_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Campaigns calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # # Mock campaign object + mocked_campaign = Mock() + mocked_campaign.api_get = Mock() + mocked_campaign.__getitem__ = Mock() + mocked_campaign.api_get.side_effect = Timeout + + # # Mock get_campaigns function return mocked ad object + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = [[mocked_campaign]] + + # Iterate campaigns object which calls prepare_record() inside and verify Timeout is raised + campaign_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(Timeout): + for message in campaign_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_campaign.api_get.call_count, 5) + + def test_run_job(self, mocked_sleep): + """ + AdsInsights.run_job calls a `facebook_business` method,`get_insights()`, to get a batch of insights. + We mock this method to raise a `Timeout` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_insights function to throw Timeout exception + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.side_effect = Timeout + + # Call run_job() function of Campaigns and verify Timeout is raised + ads_insights_object = AdsInsights('', mocked_account, '', '', '', {}) + with self.assertRaises(Timeout): + ads_insights_object.run_job('test') + + # verify get_insights() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_insights.call_count, 5) + +@mock.patch("time.sleep") +class TestConnectionErrorBackoff(unittest.TestCase): + """A set of unit tests to ensure that requests are retrying properly for ConnectionError Error""" + def test_get_adcreatives(self, mocked_sleep): + """ + AdCreative.get_adcreatives calls a `facebook_business` method,`get_ad_creatives()`, to get a batch of ad creatives. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_creatives function to throw ConnectionError exception + mocked_account = Mock() + mocked_account.get_ad_creatives = Mock() + mocked_account.get_ad_creatives.side_effect = ConnectionError + + # Call get_adcreatives() function of AdCreatives and verify ConnectionError is raised + ad_creative_object = AdCreative('', mocked_account, '', '') + with self.assertRaises(ConnectionError): + ad_creative_object.get_adcreatives() + + # verify get_ad_creatives() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_creatives.call_count, 5) + + def test__call_get_ads(self, mocked_sleep): + """ + Ads._call_get_ads calls a `facebook_business` method,`get_ads()`, to get a batch of ads. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ads function to throw ConnectionError exception + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = ConnectionError + + # Call _call_get_ads() function of Ads and verify ConnectionError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + ad_object._call_get_ads('test') + + # verify get_ads() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ads.call_count, 5) + + @mock.patch("pendulum.parse") + def test_ad_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Ads calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock ad object + mocked_ad = Mock() + mocked_ad.api_get = Mock() + mocked_ad.__getitem__ = Mock() + mocked_ad.api_get.side_effect = ConnectionError + + # # Mock get_ads function return mocked ad object + mocked_account = Mock() + mocked_account.get_ads = Mock() + mocked_account.get_ads.side_effect = [[mocked_ad]] + + # Iterate ads object which calls prepare_record() inside and verify ConnectionError is raised + ad_object = Ads('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + for message in ad_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_ad.api_get.call_count, 5) + + def test__call_get_ad_sets(self, mocked_sleep): + """ + AdSets._call_get_ad_sets calls a `facebook_business` method,`get_ad_sets()`, to get a batch of adsets. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_ad_sets function to throw ConnectionError exception + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = ConnectionError + + # Call _call_get_ad_sets() function of AdSets and verify ConnectionError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + ad_set_object._call_get_ad_sets('test') + + # verify get_ad_sets() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_ad_sets.call_count, 5) + + @mock.patch("pendulum.parse") + def test_adset_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of AdSets calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock adset object + mocked_adset = Mock() + mocked_adset.api_get = Mock() + mocked_adset.__getitem__ = Mock() + mocked_adset.api_get.side_effect = ConnectionError + + # Mock get_ad_sets function return mocked ad object + mocked_account = Mock() + mocked_account.get_ad_sets = Mock() + mocked_account.get_ad_sets.side_effect = [[mocked_adset]] + + # Iterate adset object which calls prepare_record() inside and verify ConnectionError is raised + ad_set_object = AdSets('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + for message in ad_set_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_adset.api_get.call_count, 5) + + def test__call_get_campaigns(self, mocked_sleep): + """ + Campaigns._call_get_campaigns calls a `facebook_business` method,`get_campaigns()`, to get a batch of campaigns. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_campaigns function to throw ConnectionError exception + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = ConnectionError + + # Call _call_get_campaigns() function of Campaigns and verify ConnectionError is raised + campaigns_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + campaigns_object._call_get_campaigns('test') + + # verify get_campaigns() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_campaigns.call_count, 5) + + @mock.patch("pendulum.parse") + def test_campaign_prepare_record(self, mocked_parse, mocked_sleep): + """ + __iter__ of Campaigns calls a function _iterate which calls a nested prepare_record function. + Prepare_record calls a `facebook_business` method,`ad.api_get()`, to get a ad fields. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # # Mock campaign object + mocked_campaign = Mock() + mocked_campaign.api_get = Mock() + mocked_campaign.__getitem__ = Mock() + mocked_campaign.api_get.side_effect = ConnectionError + + # # Mock get_campaigns function return mocked ad object + mocked_account = Mock() + mocked_account.get_campaigns = Mock() + mocked_account.get_campaigns.side_effect = [[mocked_campaign]] + + # Iterate campaigns object which calls prepare_record() inside and verify ConnectionError is raised + campaign_object = Campaigns('', mocked_account, '', '', '') + with self.assertRaises(ConnectionError): + for message in campaign_object: + pass + + # verify prepare_record() function by checking call count of mocked ad.api_get() + self.assertEquals(mocked_campaign.api_get.call_count, 5) + + def test_run_job(self, mocked_sleep): + """ + AdsInsights.run_job calls a `facebook_business` method,`get_insights()`, to get a batch of insights. + We mock this method to raise a `ConnectionError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + + # Mock get_insights function to throw ConnectionError exception + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.side_effect = ConnectionError + + # Call run_job() function of Campaigns and verify ConnectionError is raised + ads_insights_object = AdsInsights('', mocked_account, '', '', '', {}) + with self.assertRaises(ConnectionError): + ads_insights_object.run_job('test') + + # verify get_insights() is called 5 times as max 5 reties provided for function + self.assertEquals(mocked_account.get_insights.call_count, 5) + + +# Mock args +class Args(): + def __init__(self, config): + self.config = config + self.discover = False + self.properties = False + self.catalog = False + self.state = False + +@mock.patch('tap_facebook.utils.parse_args') +@mock.patch("tap_facebook.FacebookAdsApi.init") +@mock.patch("tap_facebook.fb_user.User") +class TestRequestTimeoutValue(unittest.TestCase): + """A set of unit tests to ensure that request timeout is set based on config or default value""" + + def test_default_value_request_timeout(self, mocked_user, mocked_facebook_api, mocked_args): + """ + unit tests to ensure that request timeout is set based on config or default value + """ + tap_facebook.CONFIG = {} + config = {'account_id': 'test', 'access_token': 'test'} # No request_timeout in config + mocked_args.return_value = Args(config) + + # Mock fb_user and get_add_accounts + mocked_fb_user = Mock() + mocked_fb_user.get_ad_accounts = Mock() + mocked_fb_user.get_ad_accounts.return_value = [{'account_id': 'test'}] + mocked_user.return_value = mocked_fb_user + + # Call main_impl function which initialize FacebookAdsApi with timeout + tap_facebook.main_impl() + + # verify that FacebookAdsApi.init() called with default timeout + mocked_facebook_api.assert_called_with(access_token='test', timeout=300) + + def test_config_provided_int_request_timeout(self, mocked_user, mocked_facebook_api, mocked_args): + """ + unit tests to ensure that request timeout is set based on config(integer value) + """ + tap_facebook.CONFIG = {} + # request_timeout provided in config with 100(integer) + config = {'account_id': 'test', 'access_token': 'test', 'request_timeout': 100} + mocked_args.return_value = Args(config) + + # Mock fb_user and get_add_accounts + mocked_fb_user = Mock() + mocked_fb_user.get_ad_accounts = Mock() + mocked_fb_user.get_ad_accounts.return_value = [{'account_id': 'test'}] + mocked_user.return_value = mocked_fb_user + + # Call main_impl function which initialize FacebookAdsApi with timeout + tap_facebook.main_impl() + + # verify that FacebookAdsApi.init() called with timeout provided in config + mocked_facebook_api.assert_called_with(access_token='test', timeout=100) + + def test_config_provided_float_request_timeout(self, mocked_user, mocked_facebook_api, mocked_args): + """ + unit tests to ensure that request timeout is set based on config(float value) + """ + tap_facebook.CONFIG = {} + # request_timeout provided in config with 100.5(float) + config = {'account_id': 'test', 'access_token': 'test', 'request_timeout': 100.5} + mocked_args.return_value = Args(config) + + # Mock fb_user and get_add_accounts + mocked_fb_user = Mock() + mocked_fb_user.get_ad_accounts = Mock() + mocked_fb_user.get_ad_accounts.return_value = [{'account_id': 'test'}] + mocked_user.return_value = mocked_fb_user + + # Call main_impl function which initialize FacebookAdsApi with timeout + tap_facebook.main_impl() + + # verify that FacebookAdsApi.init() called with timeout provided in config + mocked_facebook_api.assert_called_with(access_token='test', timeout=100.5) + + def test_config_provided_string_request_timeout(self, mocked_user, mocked_facebook_api, mocked_args): + """ + unit tests to ensure that request timeout is set based on config(string value) + """ + tap_facebook.CONFIG = {} + # request_timeout provided in config with 100(string) + config = {'account_id': 'test', 'access_token': 'test', 'request_timeout': '100'} + mocked_args.return_value = Args(config) + + # Mock fb_user and get_add_accounts + mocked_fb_user = Mock() + mocked_fb_user.get_ad_accounts = Mock() + mocked_fb_user.get_ad_accounts.return_value = [{'account_id': 'test'}] + mocked_user.return_value = mocked_fb_user + + # Call main_impl function which initialize FacebookAdsApi with timeout + tap_facebook.main_impl() + + # verify that FacebookAdsApi.init() called with timeout provided in config + mocked_facebook_api.assert_called_with(access_token='test', timeout=100) + + def test_config_provided_empty_request_timeout(self, mocked_user, mocked_facebook_api, mocked_args): + """ + unit tests to ensure that request timeout is set based on default value as config file has empty string + """ + tap_facebook.CONFIG = {} + # request_timeout provided in config with empty string + config = {'account_id': 'test', 'access_token': 'test', 'request_timeout': ''} + mocked_args.return_value = Args(config) + + # Mock fb_user and get_add_accounts + mocked_fb_user = Mock() + mocked_fb_user.get_ad_accounts = Mock() + mocked_fb_user.get_ad_accounts.return_value = [{'account_id': 'test'}] + mocked_user.return_value = mocked_fb_user + + # Call main_impl function which initialize FacebookAdsApi with timeout + tap_facebook.main_impl() + + # verify that FacebookAdsApi.init() called with default timeout + mocked_facebook_api.assert_called_with(access_token='test', timeout=300) diff --git a/tests/unittests/test_retry_logic.py b/tests/unittests/test_retry_logic.py index a58d5dde..56758ece 100644 --- a/tests/unittests/test_retry_logic.py +++ b/tests/unittests/test_retry_logic.py @@ -42,6 +42,34 @@ def test_retries_on_500(self): # 5 is the max tries specified in the tap self.assertEquals(5, mocked_account.get_ad_creatives.call_count ) + def test_retries_on_503(self): + """`AdCreative.sync.do_request()` calls a `facebook_business` method, + `get_ad_creatives()`, to make a request to the API. We mock this + method to raise a `FacebookRequestError` with an `http_status` of + `503`. + + We expect the tap to retry this request up to 5 times, which is + the current hard coded `max_tries` value. + """ + + # Create the mock and force the function to throw an error + mocked_account = Mock() + mocked_account.get_ad_creatives = Mock() + mocked_account.get_ad_creatives.side_effect = FacebookRequestError( + message='', + request_context={"":Mock()}, + http_status=503, + http_headers=Mock(), + body="Service Uavailable" + ) + + # Initialize the object and call `sync()` + ad_creative_object = AdCreative('', mocked_account, '', '') + with self.assertRaises(FacebookRequestError): + ad_creative_object.sync() + # 5 is the max tries specified in the tap + self.assertEquals(5, mocked_account.get_ad_creatives.call_count ) + def test_catch_a_type_error(self): """`AdCreative.sync.do_request()` calls a `facebook_business` method `get_ad_creatives()`. We want to mock this to throw a `TypeError("string indices must be integers")` and assert @@ -186,3 +214,76 @@ def test_retries_and_good_response(self): # Clean up tests patcher.stop() + + + def test_job_polling_retry(self): + """AdInsights.api_get() polls the job status of an insights job we've requested + that Facebook generate. This test makes a request with a mock response to + raise a 400 status error that should be retried. + + We expect the tap to retry this request up to 5 times for each insights job attempted. + """ + + mocked_api_get = Mock() + mocked_api_get.side_effect = FacebookRequestError( + message='Unsupported get request; Object does not exist', + request_context={"":Mock()}, + http_status=400, + http_headers=Mock(), + body={"error": {"error_subcode": 33}} + ) + # Create the mock and force the function to throw an error + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.return_value.api_get = mocked_api_get + + + # Initialize the object and call `sync()` + ad_insights_object = AdsInsights('', mocked_account, '', '', {}, {}) + with self.assertRaises(FacebookRequestError): + ad_insights_object.run_job({}) + # 5 is the max tries specified in the tap + self.assertEquals(25, mocked_account.get_insights.return_value.api_get.call_count) + self.assertEquals(5, mocked_account.get_insights.call_count ) + + + + def test_job_polling_retry_succeeds_eventually(self): + """AdInsights.api_get() polls the job status of an insights job we've requested + that Facebook generate. This test makes a request with a mock response to + raise a 400 status error that should be retried. + + We expect the tap to retry this request up to 5 times for each insights job attempted. + """ + + mocked_bad_response = FacebookRequestError( + message='Unsupported get request; Object does not exist', + request_context={"":Mock()}, + http_status=400, + http_headers=Mock(), + body={"error": {"error_subcode": 33}} + ) + + mocked_good_response = { + "async_status": "Job Completed", + "async_percent_completion": 100, + "id": "2134" + } + + mocked_api_get = Mock() + mocked_api_get.side_effect = [ + mocked_bad_response, + mocked_bad_response, + mocked_good_response + ] + + # Create the mock and force the function to throw an error + mocked_account = Mock() + mocked_account.get_insights = Mock() + mocked_account.get_insights.return_value.api_get = mocked_api_get + + # Initialize the object and call `sync()` + ad_insights_object = AdsInsights('', mocked_account, '', '', {}, {}) + ad_insights_object.run_job({}) + self.assertEquals(3, mocked_account.get_insights.return_value.api_get.call_count) + self.assertEquals(1, mocked_account.get_insights.call_count) diff --git a/tests/unittests/test_sync_batches_retry.py b/tests/unittests/test_sync_batches_retry.py new file mode 100644 index 00000000..53cccbfa --- /dev/null +++ b/tests/unittests/test_sync_batches_retry.py @@ -0,0 +1,154 @@ +import unittest +from unittest import mock +from unittest.mock import Mock +from tap_facebook import FacebookRequestError +from tap_facebook import AdCreative, Leads +from singer import resolve_schema_references +from singer.schema import Schema +from singer.catalog import CatalogEntry + +# Mock object for the batch object to raise exception +class MockBatch: + + def __init__(self, exception="NoException"): + self.exception = exception + + def execute(self): + if self.exception == "AttributeError": + raise AttributeError("'str' object has no attribute 'get'") + elif self.exception == "FacebookRequestError": + raise FacebookRequestError( + message='', + request_context={"":Mock()}, + http_status=500, + http_headers=Mock(), + body={} + ) + +class TestAdCreativeSyncBbatches(unittest.TestCase): + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="AttributeError") # Raise AttributeError exception + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives and verify AttributeError is raised + with self.assertRaises(AttributeError): + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 retries provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_facebook_request_error_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to raise a `FacebookRequestError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="FacebookRequestError") # Raise FacebookRequestError exception + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives and verify FacebookRequestError is raised + with self.assertRaises(FacebookRequestError): + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_no_error_on_sync_batches(self, mocked_schema, mocked_api): + """ + AdCreative.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of ad creatives. + We mock this method to simply pass the things and expect the tap to run without exception + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch() # No exception + + # Initialize AdCreative and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + ad_creative_object = AdCreative('', '', '', '') + ad_creative_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of AdCreatives + ad_creative_object.sync_batches([]) + + # verify calls inside sync_batches are called once as no exception is thrown + self.assertEquals(1, mocked_api.new_batch.call_count) + self.assertEquals(1, mocked_schema.call_count) + + +class TestLeadsSyncBatches(unittest.TestCase): + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_attribute_error_sync_batches(self, mocked_schema, mocked_api): + """ + Leads.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of Leads. + We mock this method to raise a `AttributeError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="AttributeError") # Raise AttributeError exception + + # Initialize Leads and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + leads_object = Leads('', '', '', '', '') + leads_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of Leads and verify AttributeError is raised + with self.assertRaises(AttributeError): + leads_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count) + + @mock.patch("tap_facebook.API") + @mock.patch("singer.resolve_schema_references") + def test_retries_on_facebook_request_error_sync_batches(self, mocked_schema, mocked_api): + """ + Leads.sync_batches calls a `facebook_business` method,`api_batch.execute()`, to get a batch of Leads. + We mock this method to raise a `FacebookRequestError` and expect the tap to retry this that function up to 5 times, + which is the current hard coded `max_tries` value. + """ + # Mock new_batch() function of API + mocked_api.new_batch = Mock() + mocked_api.new_batch.return_value = MockBatch(exception="FacebookRequestError") # Raise FacebookRequestError exception + + # Initialize Leads and mock catalog_entry + mock_catalog_entry = CatalogEntry(schema=Schema()) + leads_object = Leads('', '', '', '', '') + leads_object.catalog_entry = mock_catalog_entry + + # Call sync_batches() function of Leads and verify FacebookRequestError is raised + with self.assertRaises(FacebookRequestError): + leads_object.sync_batches([]) + + # verify calls inside sync_batches are called 5 times as max 5 reties provided for function + self.assertEquals(5, mocked_api.new_batch.call_count) + self.assertEquals(5, mocked_schema.call_count)