From 82e05efa01ee4c91d07aec2b60e0a716552d24c2 Mon Sep 17 00:00:00 2001 From: tgriek Date: Thu, 1 Mar 2018 19:53:35 +0100 Subject: [PATCH 1/2] Key file login --- hooks/google_analytics_hook.py | 91 +++++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 18 deletions(-) diff --git a/hooks/google_analytics_hook.py b/hooks/google_analytics_hook.py index 8455239..e28d35d 100644 --- a/hooks/google_analytics_hook.py +++ b/hooks/google_analytics_hook.py @@ -20,37 +20,81 @@ "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "{CERT_URL}" +In Airflow 1.9.0 this requires to use the web interface or cli to set connection extra's. If you prefer to not use the +web interface to manage connections you can also supply the key as a json file. + +@TODO: add support for p12 keys + More details can be found here: https://developers.google.com/api-client-library/python/guide/aaa_client_secrets """ -from airflow.hooks.base_hook import BaseHook +import time +import os +from airflow.hooks.base_hook import BaseHook +from airflow import configuration as conf from apiclient.discovery import build +from apiclient.http import MediaInMemoryUpload from oauth2client.service_account import ServiceAccountCredentials from oauth2client.client import AccessTokenCredentials -import time +from collections import namedtuple class GoogleAnalyticsHook(BaseHook): - def __init__(self, google_analytics_conn_id='google_analytics_default'): + GAService = namedtuple('GAService', ['name', 'version', 'scopes']) + # We need to rely on 2 services depending on the task at hand: reading from or writing to GA. + _services = { + 'reporting': GAService(name='analyticsreporting', + version='v4', + scopes=['https://www.googleapis.com/auth/analytics.readonly']), + 'management': GAService(name='analytics', + version='v3', + scopes=['https://www.googleapis.com/auth/analytics']) + } + _key_folder = os.path.join(conf.get('core', 'airflow_home'), 'keys') + + def __init__(self, google_analytics_conn_id='google_analytics_default', key_file=None): self.google_analytics_conn_id = google_analytics_conn_id self.connection = self.get_connection(google_analytics_conn_id) + if 'client_secrets' in self.connection.extra_dejson: + self.client_secrets = self.connection.extra_dejson['client_secrets'] + if key_file: + self.file_location = os.path.join(GoogleAnalyticsHook._key_folder, key_file) - self.client_secrets = self.connection.extra_dejson['client_secrets'] + def get_service_object(self, name): + service = GoogleAnalyticsHook._services[name] - def get_service_object(self, - api_name, - api_version, - scopes): if self.connection.password: credentials = AccessTokenCredentials(self.connection.password, - 'Astronomer/1.0') - elif self.client_secrets: + 'Airflow/1.0') + elif hasattr(self, 'client_secrets'): credentials = ServiceAccountCredentials.from_json_keyfile_dict(self.client_secrets, - scopes) + service.scopes) + + elif hasattr(self, 'file_location'): + credentials = ServiceAccountCredentials.from_json_keyfile_name(self.file_location, + service.scopes) + else: + raise ValueError('No valid credentials could be found') - return build(api_name, api_version, credentials=credentials) + return build(service.name, service.version, credentials=credentials) + + def get_management_report(self, + view_id, + since, + until, + metrics, + dimensions): + + analytics = self.get_service_object(name='management') + + return analytics.data().ga().get( + ids=view_id, + start_date=since, + end_date=until, + metrics=metrics, + dimensions=dimensions).execute() def get_analytics_report(self, view_id, @@ -61,9 +105,8 @@ def get_analytics_report(self, metrics, page_size, include_empty_rows): - analytics = self.get_service_object('analyticsreporting', - 'v4', - ['https://www.googleapis.com/auth/analytics.readonly']) + + analytics = self.get_service_object(name='reporting') reportRequest = { 'viewId': view_id, @@ -88,9 +131,9 @@ def get_analytics_report(self, time.sleep(1) reportRequest.update({'pageToken': report['nextPageToken']}) response = (analytics - .reports() - .batchGet(body={'reportRequests': [reportRequest]}) - .execute()) + .reports() + .batchGet(body={'reportRequests': [reportRequest]}) + .execute()) report = response['reports'][0] rows.extend(report.get('data', {}).get('rows', [])) @@ -100,3 +143,15 @@ def get_analytics_report(self, return report else: return {} + + def upload_string(self, account_id, profile_id, string, data_source_id): + """ + Upload to custom data sources - example function + """ + analytics = self.get_service_object(name='management') + media = MediaInMemoryUpload(string, mimetype='application/octet-stream', resumable=False) + analytics.management().uploads().uploadData( + accountId=account_id, + webPropertyId=profile_id, + customDataSourceId=data_source_id, + media_body=media).execute() From 0af29f8db59c2eec238f71e09650c49bfa0a3660 Mon Sep 17 00:00:00 2001 From: tgriek Date: Thu, 1 Mar 2018 23:39:32 +0100 Subject: [PATCH 2/2] Docu link Signed-off-by: tgriek --- hooks/google_analytics_hook.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hooks/google_analytics_hook.py b/hooks/google_analytics_hook.py index e28d35d..99f4154 100644 --- a/hooks/google_analytics_hook.py +++ b/hooks/google_analytics_hook.py @@ -147,6 +147,7 @@ def get_analytics_report(self, def upload_string(self, account_id, profile_id, string, data_source_id): """ Upload to custom data sources - example function + https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/uploads/uploadData """ analytics = self.get_service_object(name='management') media = MediaInMemoryUpload(string, mimetype='application/octet-stream', resumable=False)