-
Notifications
You must be signed in to change notification settings - Fork 0
/
notify.py
104 lines (83 loc) · 3.33 KB
/
notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Send notifications for blocked DNS lookups."""
import datetime
import http.client
import io
import json
import logging
import smtplib
import mailer
logger = logging.getLogger(__name__)
def notify(config, entries):
"""Send a notification for each blocked domain."""
if config.CLOUDFLARE_API_KEY and config.CLOUDFLARE_ACCOUNT_ID:
_add_domain_categories(config, entries)
for entry in entries:
request_time = datetime.datetime.fromtimestamp(entry.timestamp)
logger.info(
"Upstream block at %s: %s (%s), client=%s",
request_time,
entry.domain,
", ".join(entry.categories) if entry.categories else "unknown",
entry.client,
)
try:
with _configure_smtp(config) as client:
mail = mailer.Mailer(client)
mail.send_message(
config.MAIL_RECIPIENTS,
config.MAIL_SENDER,
"DNS Block",
_render_message_body(entries),
)
except Exception:
logger.exception("Error sending mail.")
def _add_domain_categories(config, entries):
"""Add categories to each entry."""
for entry in entries:
categories = _lookup_domain_categories(config, entry.domain)
entry.categories = categories
def _lookup_domain_categories(config, domain):
"""Lookup categories for a domain."""
conn = http.client.HTTPSConnection("api.cloudflare.com")
url = f"/client/v4/accounts/{config.CLOUDFLARE_ACCOUNT_ID}/intel/domain?domain={domain}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config.CLOUDFLARE_API_KEY}",
}
conn.request("GET", url=url, headers=headers)
res = conn.getresponse()
data = res.read().decode("utf-8")
content = json.loads(data)
if res.status != 200:
logger.error("Error looking up domain categories: %s", content)
return []
if not content.get("success", False):
logger.error("Error looking up domain categories: %s", content)
return []
categories = content.get("result", {}).get("content_categories", [])
return [category["name"] for category in categories]
def _configure_smtp(config):
client = smtplib.SMTP_SSL(config.SMTP_HOST, config.SMTP_PORT)
client.login(config.SMTP_USERNAME, config.SMTP_PASSWORD)
return client
def _render_message_body(entries):
buffer = io.StringIO()
print("DNS lookup(s) blocked by upstream sever:", file=buffer)
print(file=buffer)
for entry in entries:
logger.debug("Entry: %s", entry)
request_time = datetime.datetime.fromtimestamp(entry.timestamp)
categories = f"{', '.join(entry.categories)}" if entry.categories else ""
print("-", request_time, entry.domain, f"(from {entry.client})", file=buffer)
print(" Categories:", categories, file=buffer)
print(file=buffer, flush=True)
body = buffer.getvalue()
logger.debug("Message: %s", body)
return body
# Entry point for testing notifications without running the main application:
if __name__ == "__main__":
from config import Config
from pihole import Entry
config = Config()
entry = Entry(1, 1250000, 7, "google.com", "1.2.3.4")
notify(config, [entry])