-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathlog4pot-loganalyzer.py
114 lines (100 loc) · 4.4 KB
/
log4pot-loganalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Generate summaries from Log4Pot logs
from argparse import ArgumentParser
import argparse
from pathlib import Path
from log4pot.loganalyzer import LogAnalyzer, LogParsingError
from sys import stderr, exit
import csv
default_csv_param = {
"sep": ";",
"quoting": csv.QUOTE_ALL,
}
argparser = ArgumentParser(description="Generate summaries from Log4Pot logs.")
argparser.add_argument("--output", "-o", type=Path, default=Path("."), help="Output directory for summaries.")
argparser.add_argument("--summaries", "-s", default="all", help="Summaries to generate (all, exploits, deobfuscates_exploits, deobfuscation, payload_urls, callback_urls, callback_full_urls) as comma-separated list")
argparser.add_argument("--keep-deobfuscation", "-k", action="store_true", help="Keep payload deobfuscation from logs instead of deobfuscate again.")
argparser.add_argument("--old-deobfuscator", "-O", action="store_true", help="Deobfuscate payloads with old deobfuscator.")
argparser.add_argument("--url-allowlist", "-ua", default="default-url-allowlist", type=argparse.FileType("r"), help="URL pattern allowlist to use for payload_url summary.")
argparser.add_argument("--url-denylist", "-ud", default="default-url-denylist", type=argparse.FileType("r"), help="URL pattern denylist to use for payload_url summary.")
argparser.add_argument("logfile", nargs="+", type=Path, help="Log4Pot log file or directory containing such files.")
args = argparser.parse_args()
summaries = frozenset(args.summaries.split(","))
url_allowlist = [
line.strip()
for line in args.url_allowlist.readlines()
]
args.url_allowlist.close()
url_denylist = [
line.strip()
for line in args.url_denylist.readlines()
]
args.url_denylist.close()
paths = list()
for logfile in args.logfile:
if not logfile.exists():
print(f"File '{str(logfile)}' not found!", file=stderr)
exit(1)
if logfile.is_dir():
paths.extend(logfile.glob("**/*"))
else:
paths.append(logfile)
logs = [
path.open("r").read()
for path in paths
]
loganalyzer = LogAnalyzer(logs, args.keep_deobfuscation, args.old_deobfuscator)
print(f"Loaded {loganalyzer.event_count()} events")
if "all" in summaries or "exploits" in summaries:
df_payload_summary = loganalyzer.exploit_summary()
df_payload_summary.reset_index().to_csv(
args.output / "exploit_summary.csv",
columns=("first_seen", "last_seen", "payload"),
index=False,
**default_csv_param,
)
print(f"Wrote {len(df_payload_summary)} raw exploits.")
if "all" in summaries or "deobfuscated_exploits" in summaries:
df_deobfuscated_payload_summary = loganalyzer.deobfuscated_exploit_summary()
df_deobfuscated_payload_summary.reset_index().to_csv(
args.output / "deobfuscated_exploit_summary.csv",
columns=("first_seen", "last_seen", "deobfuscated_payload"),
index=False,
**default_csv_param,
)
print(f"Wrote {len(df_deobfuscated_payload_summary)} deobfuscated exploits.")
if "all" in summaries or "deobfuscation" in summaries:
df_deobfuscation_summary = loganalyzer.deobfuscation_summary()
df_deobfuscation_summary.reset_index().to_csv(
args.output / "deobfuscation_summary.csv",
columns=("first_seen", "last_seen", "payload", "deobfuscated_payload"),
index=False,
**default_csv_param,
)
print(f"Wrote deobfuscation_summary with {len(df_deobfuscation_summary)} items.")
if "all" in summaries or "payload_urls" in summaries:
df = loganalyzer.payload_url_summary(url_allowlist, url_denylist)
df.reset_index().to_csv(
args.output / "payload_urls.csv",
columns=("first_seen", "last_seen", "url"),
index=False,
**default_csv_param,
)
print(f"Wrote {len(df)} payload URLs.")
if "all" in summaries or "callback_urls" in summaries:
df = loganalyzer.callback_url_summary()
df.reset_index().to_csv(
args.output / "callback_urls.csv",
columns=("first_seen", "last_seen", "url"),
index=False,
**default_csv_param,
)
print(f"Wrote {len(df)} callback URLs.")
if "all" in summaries or "callback_full_urls" in summaries:
df = loganalyzer.callback_full_url_summary()
df.reset_index().to_csv(
args.output / "callback_full_urls.csv",
columns=("first_seen", "last_seen", "url"),
index=False,
**default_csv_param,
)
print(f"Wrote {len(df)} full callback URLs.")