Skip to content

Commit

Permalink
Merge pull request #2 from tgriek/keyfile-login
Browse files Browse the repository at this point in the history
Support for JSON key files & Management API
  • Loading branch information
Ben authored Mar 1, 2018
2 parents 40d6503 + 0af29f8 commit 1167303
Showing 1 changed file with 74 additions and 18 deletions.
92 changes: 74 additions & 18 deletions hooks/google_analytics_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,81 @@
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "{CERT_URL}"
In Airflow 1.9.0 this requires to use the web interface or cli to set connection extra's. If you prefer to not use the
web interface to manage connections you can also supply the key as a json file.
@TODO: add support for p12 keys
More details can be found here:
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
"""

from airflow.hooks.base_hook import BaseHook
import time
import os

from airflow.hooks.base_hook import BaseHook
from airflow import configuration as conf
from apiclient.discovery import build
from apiclient.http import MediaInMemoryUpload
from oauth2client.service_account import ServiceAccountCredentials
from oauth2client.client import AccessTokenCredentials
import time
from collections import namedtuple


class GoogleAnalyticsHook(BaseHook):
def __init__(self, google_analytics_conn_id='google_analytics_default'):
GAService = namedtuple('GAService', ['name', 'version', 'scopes'])
# We need to rely on 2 services depending on the task at hand: reading from or writing to GA.
_services = {
'reporting': GAService(name='analyticsreporting',
version='v4',
scopes=['https://www.googleapis.com/auth/analytics.readonly']),
'management': GAService(name='analytics',
version='v3',
scopes=['https://www.googleapis.com/auth/analytics'])
}
_key_folder = os.path.join(conf.get('core', 'airflow_home'), 'keys')

def __init__(self, google_analytics_conn_id='google_analytics_default', key_file=None):
self.google_analytics_conn_id = google_analytics_conn_id
self.connection = self.get_connection(google_analytics_conn_id)
if 'client_secrets' in self.connection.extra_dejson:
self.client_secrets = self.connection.extra_dejson['client_secrets']
if key_file:
self.file_location = os.path.join(GoogleAnalyticsHook._key_folder, key_file)

self.client_secrets = self.connection.extra_dejson['client_secrets']
def get_service_object(self, name):
service = GoogleAnalyticsHook._services[name]

def get_service_object(self,
api_name,
api_version,
scopes):
if self.connection.password:
credentials = AccessTokenCredentials(self.connection.password,
'Astronomer/1.0')
elif self.client_secrets:
'Airflow/1.0')
elif hasattr(self, 'client_secrets'):
credentials = ServiceAccountCredentials.from_json_keyfile_dict(self.client_secrets,
scopes)
service.scopes)

elif hasattr(self, 'file_location'):
credentials = ServiceAccountCredentials.from_json_keyfile_name(self.file_location,
service.scopes)
else:
raise ValueError('No valid credentials could be found')

return build(api_name, api_version, credentials=credentials)
return build(service.name, service.version, credentials=credentials)

def get_management_report(self,
view_id,
since,
until,
metrics,
dimensions):

analytics = self.get_service_object(name='management')

return analytics.data().ga().get(
ids=view_id,
start_date=since,
end_date=until,
metrics=metrics,
dimensions=dimensions).execute()

def get_analytics_report(self,
view_id,
Expand All @@ -61,9 +105,8 @@ def get_analytics_report(self,
metrics,
page_size,
include_empty_rows):
analytics = self.get_service_object('analyticsreporting',
'v4',
['https://www.googleapis.com/auth/analytics.readonly'])

analytics = self.get_service_object(name='reporting')

reportRequest = {
'viewId': view_id,
Expand All @@ -88,9 +131,9 @@ def get_analytics_report(self,
time.sleep(1)
reportRequest.update({'pageToken': report['nextPageToken']})
response = (analytics
.reports()
.batchGet(body={'reportRequests': [reportRequest]})
.execute())
.reports()
.batchGet(body={'reportRequests': [reportRequest]})
.execute())
report = response['reports'][0]
rows.extend(report.get('data', {}).get('rows', []))

Expand All @@ -100,3 +143,16 @@ def get_analytics_report(self,
return report
else:
return {}

def upload_string(self, account_id, profile_id, string, data_source_id):
"""
Upload to custom data sources - example function
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/uploads/uploadData
"""
analytics = self.get_service_object(name='management')
media = MediaInMemoryUpload(string, mimetype='application/octet-stream', resumable=False)
analytics.management().uploads().uploadData(
accountId=account_id,
webPropertyId=profile_id,
customDataSourceId=data_source_id,
media_body=media).execute()

0 comments on commit 1167303

Please sign in to comment.