-
Notifications
You must be signed in to change notification settings - Fork 0
/
gsuite-activity.py
81 lines (73 loc) · 3.59 KB
/
gsuite-activity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import boto3
import json
import os
import logging
from botocore.exceptions import ClientError
from datetime import datetime,timedelta
from utils.helpers import chunks
from utils.dates import utcnow
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
logger = logging.getLogger()
logger.setLevel(logging.INFO)
FIREHOSE_DELIVERY_STREAM= os.environ.get('FIREHOSE_DELIVERY_STREAM','test')
FIREHOSE_BATCH_SIZE=os.environ.get('FIREHOSE_BATCH_SIZE',100)
GSUITE_CREDENTIALS_SECRET_NAME=os.environ.get('GSUITE_CREDENTIALS_SECRET_NAME','unknown')
SCOPES = ['https://www.googleapis.com/auth/admin.reports.audit.readonly']
GSUITE_DELEGATED_ACCOUNT=os.environ.get('GSUITE_DELEGATED_ACCOUNT','[email protected]')
ssmclient=boto3.client('ssm')
secrets_manager = boto3.client('secretsmanager')
f_hose = boto3.client('firehose')
def get_parameter(parameter_name,default):
try:
return(ssmclient.get_parameter(Name=parameter_name)["Parameter"]['Value'])
except ClientError as e:
if e.response['Error']['Code'] == 'ParameterNotFound':
return default
def put_parameter(parameter_name,value):
ssmclient.put_parameter(Name=parameter_name,Type='String',Value=value,Overwrite=True)
def send_to_firehose(records):
# records should be a list of dicts
if type(records) is list:
# batch up the list below the limits of firehose
for batch in chunks(records,FIREHOSE_BATCH_SIZE):
response = f_hose.put_record_batch(
DeliveryStreamName=FIREHOSE_DELIVERY_STREAM,
Records=[{'Data': bytes(str(json.dumps(record)+'\n').encode('UTF-8'))} for record in batch]
)
logger.debug('firehose response is: {}'.format(response))
def handler(event,context):
# get the gsuite credentials
credentials = json.loads(secrets_manager.get_secret_value(SecretId=GSUITE_CREDENTIALS_SECRET_NAME)["SecretString"])
credentials = ServiceAccountCredentials.from_json_keyfile_dict(credentials, SCOPES)
# delegate to an account (should be super user with permisisons to run activity reports)
credentials = credentials.create_delegated(GSUITE_DELEGATED_ACCOUNT)
# get events since last time we ran (default an hour ago)
since=get_parameter('/gsuite-events/lastquerytime',(utcnow()-timedelta(minutes=60)).isoformat())
logger.info(f"Looking for records since: {since}")
service = build('admin', 'reports_v1', credentials=credentials,cache_discovery=False)
next_page = True
page_token = None
last_run_time = utcnow().isoformat()
records_retrieved = False
while next_page:
results = service.activities().list(userKey='all',
applicationName='login',
maxResults=1000,
startTime=since,
pageToken=page_token,
).execute()
records = results.get('items', [])
logger.info(f"sending: {len(records)} gsuite records to firehose")
if records:
send_to_firehose(records)
records_retrieved = True
if "nextPageToken" not in results:
next_page=False
else:
page_token=results["nextPageToken"]
# sometimes the gsuite activity log lags behind realtime
# so regardless of the time we request, there won't be records available until later
# only move the time forward if we received records.
if records_retrieved:
put_parameter('/gsuite-events/lastquerytime',last_run_time)