-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_function.py
100 lines (84 loc) · 3.19 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import boto3
import datetime
import urllib3
import os
def lambda_handler(event, context):
r={}
iamClient = boto3.client('iam')
users=iamClient.list_users()['Users']
for user in users:
r["root"+"/"+user["UserName"]]=checkUser(iamClient, user)
orgClient = boto3.client('organizations')
accounts = orgClient.list_accounts()['Accounts']
i=1
while i<len(accounts):
acct=accounts[i];i=i+1
if isInt(acct["Name"])==True: continue
r = {**r, **checkAccount(acct)}
reportToSlack(r)
return
def isInt(s):
try:
int (s)
return True
except:
return False
def checkAccount(acct):
r={}
sts_client = boto3.client('sts')
assumed_role_object=sts_client.assume_role(RoleArn="arn:aws:iam::"+acct["Id"]+":role/"+acct["Name"],RoleSessionName=acct["Name"]+"@"+acct["Id"])
credentials=assumed_role_object['Credentials']
iamClient=boto3.client('iam',aws_access_key_id=credentials['AccessKeyId'],aws_secret_access_key=credentials['SecretAccessKey'],aws_session_token=credentials['SessionToken'])
users=iamClient.list_users()['Users']
for user in users:
r[acct["Name"]+"/"+user["UserName"]]=checkUser(iamClient, user)
return r
def checkUser(iamClient, user):
r={}
res=iamClient.list_access_keys(UserName=user["UserName"])
keys=res["AccessKeyMetadata"]
for key in keys:
age = datetime.datetime.now(datetime.timezone.utc) - key["CreateDate"]
if age > datetime.timedelta(days=int(os.getenv("max_allowed_days"))):
r[key["AccessKeyId"]] = str(age.days) + " days old"
return r
def reportToSlack(msg):
http = urllib3.PoolManager()
r = http.request(
'POST', os.getenv("slack_hook"), headers={'Content-Type': 'application/json'},
body=json.dumps({
"username":"iamKeyWatcher",
"text": "###cronjob-iamKeyWatcher###\nexecution @ (UTC) "+ str(datetime.datetime.now(datetime.timezone.utc)),
'icon_emoji': ':hamburger:',
})
)
r = http.request(
'POST', os.getenv("slack_hook"), headers={'Content-Type': 'application/json'},
body=json.dumps({
"username":"iamKeyWatcher",
"text": "done -- all good",
"blocks": mapToSlackBlocks(msg),
'icon_emoji': ':hamburger:',
})
)
return
def mapToSlackBlocks(map):
# block={"type": "section","text": {"type": "mrkdwn","text": "A message *with some bold text* and _some italicized text_."}}
blocks=[]
for k in map:
if bool(map[k]):
# block={"type": "section","text": {"type": "mrkdwn","text": "`"+k+"` containers expired keys:\n"+json.dumps(map[k])}}
block={"type": "section","text": {"type": "mrkdwn","text": "`"+k+"` containers expired keys:\n"+beautifyStrForKeyMap(map[k])}}
blocks.append(block)
return blocks
def beautifyStrForKeyMap(map):
s=""
for k in map:
s=s + "key *"+k + "* is " + str(map[k]) + "\n"
return s
# TODO implement
# return {
# 'statusCode': 200,
# 'body': json.dumps('Hello from Lambda!')
# }