-
Notifications
You must be signed in to change notification settings - Fork 3
/
create_webhooks_simple.py
84 lines (75 loc) · 3.14 KB
/
create_webhooks_simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
from datetime import date
import requests
import sys
import csv
import validators
# grab all the domains on the account
def get_domains(customer_api_key):
skip = 0
domains = []
while True:
data = requests.get("https://api.mailgun.net/v3/domains",
auth=("api", customer_api_key),
params={"skip": skip, "limit": 1000})
if data.status_code != 200:
print("Error communicating with API endpoint. Error Code: {} - {}"
.format(data.status_code, data.text))
sys.exit()
items = data.json()['items']
domains.extend(items)
if len(items) == 1000:
print("Fetching next domain page")
skip += 1000
continue
break
return domains
def assign_webhook(customer_api_key, domain, webhook, customer_url):
endpoint = "https://api.mailgun.net/v3/domains/" + domain + "/webhooks"
data = requests.post(endpoint,auth=("api", customer_api_key),params={"id": webhook, "url": customer_url})
if data.status_code != 200:
message = "Error Code: " + str(data.status_code) + " - Message: " + data.text
else:
message = data.json()['message']
return message
def main():
customer_api_key = ""
customer_url = ""
webhooks = ["clicked","complained","delivered","opened","permanent_fail","temporary_fail","unsubscribed"]
today = date.today()
file_name = "create-webhooks-log-" + str(today) + ".csv"
#request api key
while len(customer_api_key) == 0:
print("This script was written to assign a single endpoint to every webhook for \n\
all domains on a Mailgun account. If this is not what you wish to do, \n\
please type 'exit' at the prompt below. Otherwise, please provide your \n\
Mailgun API key to continue. \n\n")
customer_api_key = input("What is your Mailgun API key? ")
if customer_api_key == "exit":
sys.exit()
#request URL to receive webhooks
while len(customer_url) == 0:
customer_url = input("\n\nWhat the URL of the endpoint you wish to assign? \n\n")
if customer_url == "exit":
sys.exit()
else:
customer_url_validated = validators.url(customer_url)
if not customer_url_validated:
customer_url = ""
print("\nURL provided does not pass validation. Please try again.")
#pull all domains on account
domains_list = [d["name"] for d in get_domains(customer_api_key)]
#create csv log file and write the header
log_file = open(file_name, 'w', newline='', encoding='utf-8')
headeroutput = ["Domain","Webhook","URL","Message"]
writer = csv.DictWriter(log_file, fieldnames=headeroutput)
writer.writeheader()
#loop through the domains creating one of each webhook
for domain in domains_list:
for webhook in webhooks:
#create webhook
call = assign_webhook(customer_api_key, domain, webhook, customer_url)
#log attempt
writer.writerow({'Domain': domain, 'Webhook': webhook, 'URL': customer_url, 'Message': call})
if __name__ == "__main__":
main()