forked from openedx-unsupported/configuration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
asg_event_notifications_util.py
100 lines (75 loc) · 2.88 KB
/
asg_event_notifications_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from __future__ import absolute_import
from __future__ import print_function
import boto3
import click
@click.group()
def cli():
pass
def get_asg_infos():
response = client.describe_auto_scaling_groups(MaxRecords=100)
auto_scaling_groups = response['AutoScalingGroups']
return auto_scaling_groups
def get_asg_names():
asg_names = list()
for asg in get_asg_infos():
asg_names.append(asg['AutoScalingGroupName'])
return asg_names
def get_asg_event_notifications(asg):
event_notifications = list()
response = \
client.describe_notification_configurations(AutoScalingGroupNames=[asg],
MaxRecords=100)
notification_configs = response['NotificationConfigurations']
for notification in notification_configs:
event_notifications.append(notification['NotificationType'])
return event_notifications
@click.command()
def show_asg_event_notifications():
try:
for asg in get_asg_names():
event_notifications = get_asg_event_notifications(asg)
if event_notifications:
print(("Event notifications: {0} are set for ASG: {1}".format(event_notifications,
asg)))
else:
print(("No Event Notifications found for ASG {}".format(asg)))
except Exception as e:
print(e)
@click.command()
@click.option('--topic_arn', help='The ARN of Amazon SNS topic',
required=True)
@click.option('--event',
help='The type of event that causes the notification to be sent'
, default='autoscaling:EC2_INSTANCE_LAUNCH_ERROR')
@click.option('--confirm', default=False, required=False, is_flag=True,
help='Set this to create event notification for asg')
def create_asg_event_notifications(
topic_arn,
event,
confirm,
):
asg_names = get_asg_names()
asg_to_create_event_notifications = list()
for asg_name in asg_names:
event_notifications = get_asg_event_notifications(asg_name)
if event in event_notifications:
continue
else:
asg_to_create_event_notifications.append(asg_name)
if confirm is False:
print(("Would have created the event notification for asgs {}".format(asg_to_create_event_notifications)))
else:
try:
for asg in asg_to_create_event_notifications:
response = \
client.put_notification_configuration(AutoScalingGroupName=asg,
TopicARN=topic_arn, NotificationTypes=[event])
print(("Created {0} event notifications for auto scaling group {1}").format(event,
asg))
except Exception as e:
print(e)
cli.add_command(show_asg_event_notifications)
cli.add_command(create_asg_event_notifications)
if __name__ == '__main__':
client = boto3.client('autoscaling')
cli()