forked from RedHatProductSecurity/osidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
helpers.py
145 lines (123 loc) · 4.88 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from django.conf import settings
from apps.exploits.models import Exploit
from osidb.core import set_user_acls
from osidb.models import Flaw
def update_objects_with_flaws(exploit_objects):
"""
Take a list of exploit like objects (Exploit or EPSS) which need to be linked with Flaw
objects. Create a link to Flaw object if it exists for the relevant CVE, otherwise set it to
None.
Optimizations:
* Make a one optimized query to the database instead of N queries (~180k for EPSS).
* Store just uuid (primary key) instead of whole object in the intermediate map, that saves
about 500 MB of RAM in case of EPSS and makes it faster
* Linking is done using 'flaw_id' instead of 'flaw' attribute, which accepts Flaw primary key
(uuid) instead of Flaw object
"""
cves = set(exploit.cve for exploit in exploit_objects)
flaws = Flaw.objects.filter(cve_id__in=cves).values("cve_id", "uuid")
flaw_map = {flaw["cve_id"]: flaw["uuid"] for flaw in flaws}
for one_object in exploit_objects:
one_object.flaw_id = flaw_map.get(one_object.cve, None)
def store_or_update_exploits(exploit_objects):
"""
Store exploits described by a list of Exploit objects. If object already exists, update
reference to the Flaw object. This simplifies updating links to flaws by just running
collection again.
Date and preliminary maturity should be the same for a specific exploit, but update it
also if it changes.
"""
for exploit in exploit_objects:
Exploit.objects.update_or_create(
cve=exploit.cve,
source=exploit.source,
reference=exploit.reference,
defaults={
"flaw": exploit.flaw,
"date": exploit.date,
"maturity_preliminary": exploit.maturity_preliminary,
},
)
def set_exploit_collector_acls():
"""
Exploit collectors need access to all flaws, so they can be correctly linked to exploit objects.
"""
set_user_acls(settings.ALL_GROUPS)
class ExploitResult:
"""
Class to analyze Exploit and store analysis results for report generation.
"""
def __init__(self, exploit):
self.cve = exploit.cve
self.date = exploit.date
self.flaw = None
self.affects = None
self.supported_affects = None
self.affected_affects = None
self.unfixed_affects = None
self.included_affects = None
self.in_database = False
self.supported = False
self.affected = False
self.unfixed = False
self.included = False
self.message = None
self.analyze()
def analyze(self):
self.flaw = Flaw.objects.filter(cve_id=self.cve).first()
if not self.flaw:
self.message = "CVE not in Red Hat database"
return
self.in_database = True
self.affects = self.flaw.affects.all()
self.supported_affects = self.affects.supported()
self.affected_affects = self.supported_affects.affected()
self.unfixed_affects = self.affected_affects.unfixed()
self.included_affects = [
a for a in self.unfixed_affects if not a.should_be_excluded
]
if self.supported_affects:
self.supported = True
else:
if self.affects:
relevant = self.affects
products = ", ".join(sorted({a.ps_module for a in relevant}))
self.message = f"No supported products ({products})"
else:
self.message = "No products in database"
return
if self.affected_affects:
self.affected = True
else:
relevant = list(self.supported_affects)
components = ", ".join(
sorted({f"{a.ps_module}:{a.ps_component}" for a in relevant})
)
self.message = (
f"All components for supported products are not affected ({components})"
)
return
if self.unfixed_affects:
self.unfixed = True
else:
relevant = list(self.affected_affects)
components = ", ".join(
sorted({f"{a.ps_module}:{a.ps_component}" for a in relevant})
)
self.message = (
f'All affected components for supported products are marked as "fix" '
f"({components})"
)
return
if self.included_affects:
self.included = True
self.message = ">> ACTION PENDING <<"
else:
relevant = list(self.unfixed_affects)
components = ", ".join(
sorted({f"{a.ps_module}:{a.ps_component}" for a in relevant})
)
self.message = (
f"All affected unfixed components for supported products are not on "
f"inclusion list ({components})"
)