Skip to content

Commit

Permalink
TDl-19503 (#225)
Browse files Browse the repository at this point in the history
* TDl-19503

* new base file
  • Loading branch information
JYOTHINARAYANSETTY committed Aug 31, 2023
1 parent 9761160 commit a30c5b3
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 115 deletions.
168 changes: 168 additions & 0 deletions tests/base_new_frmwrk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@

import os
from datetime import timedelta
from tap_tester import connections, menagerie, runner, LOGGER
from tap_tester.base_suite_tests.base_case import BaseCase


class FacebookBaseTest(BaseCase):
"""
Setup expectations for test sub classes.
Metadata describing streams.
A bunch of shared methods that are used in tap-tester tests.
Shared tap-specific methods (as needed).
Insights Test Data by Date Ranges
"ads_insights":
"2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_age_and_gender":
"2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_country":
"2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_platform_and_device":
"2019-08-02T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_region":
"2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_dma":
"2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"ads_insights_hourly_advertiser":
"2019-08-03T00:00:00.000000Z" -> "2019-10-30T00:00:00.000000Z"
"2021-04-07T00:00:00.000000Z" -> "2021-04-08T00:00:00.000000Z"
"""
FULL_TABLE = "FULL_TABLE"
BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00"

start_date = ""
end_date = ""

@staticmethod
def tap_name():
"""The name of the tap"""
return "tap-facebook"

@staticmethod
def get_type():
"""the expected url route ending"""
return "platform.facebook"

def get_properties(self):
"""Configuration properties required for the tap."""
return {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date' : '2021-04-07T00:00:00Z',
'end_date': '2021-04-09T00:00:00Z',
'insights_buffer_days': '1',
}

@staticmethod
def get_credentials():
"""Authentication information for the test account"""
return {'access_token': os.getenv('TAP_FACEBOOK_ACCESS_TOKEN')}
@staticmethod
def expected_metadata():
"""The expected streams and metadata about the streams"""
return {
"ads": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
},
"adcreative": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE,
},
"adsets": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
},
"campaigns": {
BaseCase.PRIMARY_KEYS: {"id", },
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
},
"ads_insights": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_age_and_gender": {
BaseCase.PRIMARY_KEYS: {
"campaign_id", "adset_id", "ad_id", "date_start", "age", "gender"
},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_country": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", "country"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_platform_and_device": {
BaseCase.PRIMARY_KEYS: {
"campaign_id", "adset_id", "ad_id", "date_start",
"publisher_platform", "platform_position", "impression_device"
},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_region": {
BaseCase.PRIMARY_KEYS: {"region", "campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_dma": {
BaseCase.PRIMARY_KEYS: {"dma", "campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_hourly_advertiser": {
BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", "campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
# "leads": {
# BaseCase.PRIMARY_KEYS: {"id"},
# BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
# BaseCase.REPLICATION_KEYS: {"created_time"}
# },
}

def set_replication_methods(self, conn_id, catalogs, replication_methods):

replication_keys = self.expected_replication_keys()
for catalog in catalogs:
replication_method = replication_methods.get(catalog['stream_name'])
annt=menagerie.get_annotated_schema(conn_id, catalog['stream_id'])
if replication_method == self.INCREMENTAL:
replication_key = list(replication_keys.get(catalog['stream_name']))[0]
replication_md = [{ "breadcrumb": [], "metadata":{ "selected" : True}}]
else:
replication_md = [{ "breadcrumb": [], "metadata": { "selected": None}}]
connections.set_non_discoverable_metadata(
conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md)

@classmethod
def setUpClass(cls,logging="Ensuring environment variables are sourced."):
super().setUpClass(logging=logging)
missing_envs = [x for x in [os.getenv('TAP_FACEBOOK_ACCESS_TOKEN'),
os.getenv('TAP_FACEBOOK_ACCOUNT_ID')] if x is None]
if len(missing_envs) != 0:
raise Exception("set environment variables")


##########################################################################
### Tap Specific Methods
##########################################################################

@staticmethod
def is_insight(stream):
return stream.startswith('ads_insights')
122 changes: 7 additions & 115 deletions tests/test_facebook_discovery.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,14 @@
"""Test tap discovery mode and metadata."""
import re
import unittest
from tap_tester.base_suite_tests.discovery_test import DiscoveryTest

from tap_tester import menagerie, connections
from base_new_frmwrk import FacebookBaseTest

from base import FacebookBaseTest

class FacebookDiscoveryTest(DiscoveryTest, FacebookBaseTest):
"""Standard Discovery Test"""

class DiscoveryTest(FacebookBaseTest):
"""Test tap discovery mode and metadata conforms to standards."""
@staticmethod
def name():
return "tap_tester_facebook_discovery_test"

return "tt_facebook_discovery"
def streams_to_test(self):
return self.expected_streams()

def test_run(self):
"""
Testing that discovery creates the appropriate catalog with valid metadata.
• Verify number of actual streams discovered match expected
• Verify the stream names discovered were what we expect
• Verify stream names follow naming convention
streams should only have lowercase alphas and underscores
• verify there is only 1 top level breadcrumb
• verify replication key(s)
• verify primary key(s)
• verify that if there is a replication key we are doing INCREMENTAL otherwise FULL
• verify the actual replication matches our expected replication method
• verify that primary, replication and foreign keys
are given the inclusion of automatic.
• verify that all other fields have inclusion of available metadata.
"""
streams_to_test = self.streams_to_test()

conn_id = connections.ensure_connection(self)

found_catalogs = self.run_and_verify_check_mode(conn_id)

# Verify stream names follow naming convention
# streams should only have lowercase alphas and underscores
found_catalog_names = {c['tap_stream_id'] for c in found_catalogs}
self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]),
msg="One or more streams don't follow standard naming")

for stream in streams_to_test:
with self.subTest(stream=stream):

# Verify ensure the caatalog is found for a given stream
catalog = next(iter([catalog for catalog in found_catalogs
if catalog["stream_name"] == stream]))
self.assertIsNotNone(catalog)

# collecting expected values
expected_primary_keys = self.expected_primary_keys()[stream]
expected_replication_keys = self.expected_replication_keys()[stream]
expected_automatic_fields = expected_primary_keys | expected_replication_keys
expected_replication_method = self.expected_replication_method()[stream]

# collecting actual values...
schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id'])
metadata = schema_and_metadata["metadata"]
stream_properties = [item for item in metadata if item.get("breadcrumb") == []]
actual_primary_keys = set(
stream_properties[0].get(
"metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])
)
actual_replication_keys = set(
stream_properties[0].get(
"metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, [])
)
actual_replication_method = stream_properties[0].get(
"metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD)
actual_automatic_fields = set(
item.get("breadcrumb", ["properties", None])[1] for item in metadata
if item.get("metadata").get("inclusion") == "automatic"
)

##########################################################################
### metadata assertions
##########################################################################

# verify there is only 1 top level breadcrumb in metadata
self.assertTrue(len(stream_properties) == 1,
msg="There is NOT only one top level breadcrumb for {}".format(stream) + \
"\nstream_properties | {}".format(stream_properties))

# verify replication key(s) match expectations
self.assertSetEqual(
expected_replication_keys, actual_replication_keys
)

# verify primary key(s) match expectations
self.assertSetEqual(
expected_primary_keys, actual_primary_keys,
)

# verify the replication method matches our expectations
self.assertEqual(
expected_replication_method, actual_replication_method
)

# verify that if there is a replication key we are doing INCREMENTAL otherwise FULL
if actual_replication_keys:
self.assertEqual(self.INCREMENTAL, actual_replication_method)
else:
self.assertEqual(self.FULL_TABLE, actual_replication_method)

# verify that primary keys and replication keys
# are given the inclusion of automatic in metadata.
self.assertSetEqual(expected_automatic_fields, actual_automatic_fields)

# verify that all other fields have inclusion of available
# This assumes there are no unsupported fields for SaaS sources
self.assertTrue(
all({item.get("metadata").get("inclusion") == "available"
for item in metadata
if item.get("breadcrumb", []) != []
and item.get("breadcrumb", ["properties", None])[1]
not in actual_automatic_fields}),
msg="Not all non key properties are set to available in metadata")
return self.expected_stream_names()
62 changes: 62 additions & 0 deletions tests/test_facebook_table_reset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import dateutil.parser
import datetime
from base_new_frmwrk import FacebookBaseTest
from tap_tester.base_suite_tests.table_reset_test import TableResetTest


class FacebookTableResetTest(TableResetTest, FacebookBaseTest):
"""tap-salesforce Table reset test implementation
Currently tests only the stream with Incremental replication method"""

@staticmethod
def name():
return "tt_facebook_table_reset"

def streams_to_test(self):
return self.expected_stream_names()

@property
def reset_stream(self):
return ('ads_insights_dma')


def calculated_states_by_stream(self, current_state):

""" The following streams barely make the cut:
campaigns "2021-02-09T18:17:30.000000Z"
"2021-02-09T16:24:58.000000Z"
adsets "2021-02-09T18:17:41.000000Z"
"2021-02-09T17:10:09.000000Z"
leads '2021-04-07T20:09:39+0000',
'2021-04-07T20:08:27+0000',
"""
timedelta_by_stream = {stream: [0,0,0] # {stream_name: [days, hours, minutes], ...}
for stream in self.expected_stream_names()}
timedelta_by_stream['campaigns'] = [0, 1, 0]
timedelta_by_stream['adsets'] = [0, 1, 0]
timedelta_by_stream['leads'] = [0, 0 , 1]

stream_to_calculated_state = {stream: "" for stream in current_state['bookmarks'].keys()}
for stream, state in current_state['bookmarks'].items():
state_key, state_value = next(iter(state.keys())), next(iter(state.values()))
state_as_datetime = dateutil.parser.parse(state_value)
days, hours, minutes = timedelta_by_stream[stream]
calculated_state_as_datetime = state_as_datetime - datetime.timedelta(days=days, hours=hours, minutes=minutes)

state_format = '%Y-%m-%dT00:00:00+00:00' if self.is_insight(stream) else '%Y-%m-%dT%H:%M:%S-00:00'
calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format)

stream_to_calculated_state[stream] = {state_key: calculated_state_formatted}

return stream_to_calculated_state

def manipulate_state(self,current_state):
new_states = {'bookmarks': dict()}
simulated_states = self.calculated_states_by_stream(current_state)
for stream, new_state in simulated_states.items():
new_states['bookmarks'][stream] = new_state
return new_states

0 comments on commit a30c5b3

Please sign in to comment.