From 4c0c8bdd1b5003ed437558896b4f9d3d63011ddd Mon Sep 17 00:00:00 2001 From: shantanu73 Date: Wed, 1 Nov 2023 11:33:41 +0000 Subject: [PATCH] Changes: 1) Added new schema for video_ads stream. 2) Modified enpoint and query params for video_ads stream. 3) Removed transform logic to flatten video_ads schema for change_audit_stamps fields. --- tap_linkedin_ads/schemas/video_ads.json | 90 +++++++------------------ tap_linkedin_ads/streams.py | 25 +++---- tap_linkedin_ads/transform.py | 14 ---- 3 files changed, 36 insertions(+), 93 deletions(-) diff --git a/tap_linkedin_ads/schemas/video_ads.json b/tap_linkedin_ads/schemas/video_ads.json index adb192c..de8ca8b 100644 --- a/tap_linkedin_ads/schemas/video_ads.json +++ b/tap_linkedin_ads/schemas/video_ads.json @@ -5,110 +5,66 @@ ], "additionalProperties": false, "properties": { - "account": { - "type": [ - "null", - "string" - ] - }, "account_id": { "type": [ "null", "integer" ] }, - "change_audit_stamps": { + "ad_context": { "type": [ "null", "object" ], "additionalProperties": false, "properties": { - "created": { + "dsc_status": { + "type": [ + "null", + "string" + ] + }, + "dsc_name": { + "type": [ + "null", + "string" + ] + }, + "dsc_ad_type": { "type": [ "null", - "object" - ], - "additionalProperties": false, - "properties": { - "time": { - "type": [ - "null", - "string" - ], - "format": "date-time" - } - } + "string" + ] }, - "last_modified": { + "dsc_ad_account": { "type": [ "null", - "object" - ], - "additionalProperties": false, - "properties": { - "time": { - "type": [ - "null", - "string" - ], - "format": "date-time" - } - } + "string" + ] } } }, - "created_time": { + "created_at": { "type": [ "null", "string" ], "format": "date-time" }, - "last_modified_time": { + "last_modified_at": { "type": [ "null", "string" ], "format": "date-time" }, - "content_reference": { - "type": [ - "null", - "string" - ] - }, - "content_reference_ucg_post_id": { - "type": [ - "null", - "integer" - ] - }, - "content_reference_share_id": { - "type": [ - "null", - "integer" - ] - }, - "name": { - "type": [ - "null", - "string" - ] - }, - "owner": { + "id": { "type": [ "null", "string" ] }, - "owner_organization_id": { - "type": [ - "null", - "integer" - ] - }, - "type": { + "author": { "type": [ "null", "string" diff --git a/tap_linkedin_ads/streams.py b/tap_linkedin_ads/streams.py index 58244c4..ddb819c 100644 --- a/tap_linkedin_ads/streams.py +++ b/tap_linkedin_ads/streams.py @@ -7,6 +7,7 @@ from singer import Transformer, should_sync_field, UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING from singer.utils import strptime_to_utc, strftime from tap_linkedin_ads.transform import transform_json, snake_case_to_camel_case +from tap_linkedin_ads.client import BASE_URL LOGGER = singer.get_logger() @@ -312,16 +313,18 @@ def sync_endpoint(self, } querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in endpoint_params.items()]) - # next_url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) url_list = [] if self.tap_stream_id in NEW_PATH_STREAMS: querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in endpoint_params.items()]) account_list = config['accounts'].replace(" ", "").split(",") for account in account_list: - url = 'https://api.linkedin.com/rest/adAccounts/{}/{}?{}'.format(account, self.path, querystring) + url = '{}/adAccounts/{}/{}?{}'.format(BASE_URL, account, self.path, querystring) url_list.append(url) else: - url = 'https://api.linkedin.com/rest/{}?{}'.format(self.path, querystring) + if self.path == 'posts': + url = '{}/{}?{}&dscAdAccount=urn%3Ali%3AsponsoredAccount%3A{}'.format(BASE_URL, self.path, querystring, parent_id) + else: + url = '{}/{}?{}'.format(BASE_URL, self.path, querystring) url_list.append(url) @@ -379,11 +382,7 @@ def sync_endpoint(self, if self.tap_stream_id == 'accounts': account = 'urn:li:sponsoredAccount:{}'.format(parent_id) owner_id = record.get('reference_organization_id', None) - owner = 'urn:li:organization:{}'.format(owner_id) - if child_stream_name == 'video_ads' and owner_id is not None: - child_stream_params['account'] = account - child_stream_params['owner'] = owner - else: + if child_stream_name != 'video_ads' or owner_id is None: LOGGER.warning("Skipping video_ads call for %s account as reference_organization_id is not found.", account) continue elif self.tap_stream_id == 'campaigns': @@ -588,16 +587,18 @@ class VideoAds(LinkedInAds): https://docs.microsoft.com/en-us/linkedin/marketing/integrations/ads/advertising-targeting/create-and-manage-video#finders """ tap_stream_id = "video_ads" - replication_keys = ["last_modified_time"] + replication_keys = ["last_modified_at"] replication_method = "INCREMENTAL" - key_properties = ["content_reference"] + key_properties = ["id"] foreign_key = "id" - path = "adDirectSponsoredContents" + path = "posts" data_key = "elements" parent = "accounts" params = { - "q": "account" + "q": "dscAdAccount", + "dscAdTypes": "List(VIDEO)" } + headers = {'X-Restli-Protocol-Version': "2.0.0"} class AccountUsers(LinkedInAds): """ diff --git a/tap_linkedin_ads/transform.py b/tap_linkedin_ads/transform.py index 02c4e20..b8db09e 100644 --- a/tap_linkedin_ads/transform.py +++ b/tap_linkedin_ads/transform.py @@ -255,19 +255,6 @@ def transform_creatives(data_dict): return new_dict -# Copy audit fields to root level -def transform_audit_fields(data_dict): - if 'change_audit_stamps' in data_dict: - if 'last_modified' in data_dict['change_audit_stamps']: - if 'time' in data_dict['change_audit_stamps']['last_modified']: - data_dict['last_modified_time'] = data_dict['change_audit_stamps']\ - ['last_modified']['time'] - if 'created' in data_dict['change_audit_stamps']: - if 'time' in data_dict['change_audit_stamps']['created']: - data_dict['created_time'] = data_dict['change_audit_stamps']['created']['time'] - return data_dict - - # Create ID field for each URN def transform_urn(data_dict): data_dict_copy = data_dict.copy() @@ -309,7 +296,6 @@ def transform_data(data_dict, stream_name): elif stream_name == 'creatives': this_dict = transform_creatives(this_dict) this_dict = transform_urn(this_dict) - this_dict = transform_audit_fields(this_dict) new_dict['elements'][i] = this_dict i = i + 1