-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpatrowl_finding_cleaner.py
executable file
·134 lines (114 loc) · 3.74 KB
/
patrowl_finding_cleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
"""
Patrowl Finding Cleaner
Copyright 2020 Leboncoin
Licensed under the Apache License
Written by Nicolas BEGUIER ([email protected])
"""
# Standard library imports
import logging
import sys
# Third party library imports
from patrowl4py.api import PatrowlManagerApi
import urllib3
# Own libraries
import settings
# Debug
# from pdb import set_trace as st
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
VERSION = '1.1.2'
logging.basicConfig()
LOGGER = logging.getLogger('patrowl-finding-cleaner')
PATROWL_API = PatrowlManagerApi(
url=settings.PATROWL_PRIVATE_ENDPOINT,
auth_token=settings.PATROWL_APITOKEN
)
MULTIPLE_THRESHOLD = {
# Eyewitness
'eyewitness_screenshot': 3,
'eyewitness_screenshot_diff': 1,
# Virustotal
# 'vt_url_positivematch': 1,
'domain_categories': 1,
'domain_detected_referrer_samples': 1,
'domain_detected_urls': 1,
'domain_report': 2,
'domain_resolutions': 2,
'domain_siblings': 1,
'domain_undetected_referrer_samples': 1,
'domain_webutation_info': 1,
'domain_webutation_verdict': 1,
'domain_whois': 2,
'ip_detected_samples': 1,
'ip_undetected_samples': 1,
'subdomain_list': 1,
# Certstream
'certstream_report': 1,
}
DUPLICATE_LIST = [
'Current IP',
'Domain for sale',
'Threat codename',
]
def delete_finding(finding_id, test_only=False):
"""
This function is a wrapper around PATROWL_API.delete_finding
"""
if test_only:
LOGGER.warning('[TEST-ONLY] Delete finding.')
return
try:
PATROWL_API.delete_finding(
finding_id)
except:
LOGGER.critical('Error during delete finding #%s', finding_id)
pass
def clean_multiples(findings, test_only=False):
"""
Remove multiple findings
"""
multiples = dict()
for finding_type in MULTIPLE_THRESHOLD:
multiples[finding_type] = list()
for finding in findings:
if 'type' in finding and finding['type'] in multiples:
multiples[finding['type']].append(finding['id'])
for finding_type in MULTIPLE_THRESHOLD:
multiples[finding_type].sort()
if len(multiples[finding_type]) > MULTIPLE_THRESHOLD[finding_type]:
for old_finding in multiples[finding_type][:-MULTIPLE_THRESHOLD[finding_type]]:
LOGGER.warning('Remove old %s #%s', finding_type, old_finding)
delete_finding(old_finding, test_only=test_only)
def clean_duplicates(findings, test_only=False):
"""
Remove duplicate findings
"""
duplicates = dict()
for finding_title in DUPLICATE_LIST:
duplicates[finding_title] = list()
for finding in findings:
for finding_title in DUPLICATE_LIST:
if 'title' in finding and finding_title in finding['title']:
duplicates[finding_title].append(finding['id'])
for finding_title in DUPLICATE_LIST:
duplicates[finding_title].sort()
if len(duplicates[finding_title]) > 1:
for old_finding in duplicates[finding_title][:-1]:
LOGGER.warning('Remove duplicate %s #%s', finding_title, old_finding)
delete_finding(old_finding, test_only=test_only)
def main(test_only=False):
"""
Main function
"""
assets = PATROWL_API.get_assets()
for asset in assets:
try:
findings = PATROWL_API.get_asset_findings_by_id(asset['id'])
except:
LOGGER.critical('Cannot get findings for asset #%s', asset['id'])
continue
clean_multiples(findings, test_only=test_only)
clean_duplicates(findings, test_only=test_only)
if __name__ == '__main__':
DEBUG = len(sys.argv) > 1 and sys.argv[1] == '--test-only'
main(test_only=DEBUG)