Skip to content

Commit 381e4c1

Browse files
committed
feat: cdx vex
1 parent ca00903 commit 381e4c1

File tree

7 files changed

+705
-18
lines changed

7 files changed

+705
-18
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.4 on 2025-07-30 12:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vex", "0007_alter_csaf_tracking_current_release_date_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="vex_document",
15+
name="type",
16+
field=models.CharField(
17+
choices=[("CSAF", "CSAF"), ("OpenVEX", "OpenVEX"), ("CycloneDX", "CycloneDX")], max_length=16
18+
),
19+
),
20+
]
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from rest_framework.exceptions import ValidationError
5+
6+
from application.core.api.serializers_helpers import validate_purl
7+
from application.vex.models import VEX_Document, VEX_Statement
8+
from application.vex.services.vex_engine import apply_vex_statements_after_import
9+
from application.vex.types import (
10+
CycloneDX_Analysis_State,
11+
VEX_Document_Type,
12+
VEX_Justification,
13+
VEX_Status,
14+
)
15+
16+
17+
@dataclass
18+
class CycloneDX_Analysis:
19+
state: str = ""
20+
justification: str = ""
21+
response: Optional[list[str]] = None
22+
detail: str = ""
23+
first_issued: str = ""
24+
last_updated: str = ""
25+
26+
def __post_init__(self) -> None:
27+
if self.response is None:
28+
self.response = []
29+
30+
31+
@dataclass
32+
class VexStatementData:
33+
vulnerability_id: str
34+
description: str
35+
status: str
36+
justification: str
37+
impact: str
38+
remediation: str
39+
product_purl: str
40+
component_purl: str = ""
41+
42+
43+
def parse_cyclonedx_data(data: dict) -> None:
44+
cyclonedx_document = _create_cyclonedx_document(data)
45+
46+
product_purls, vex_statements = _process_vex_statements(data, cyclonedx_document)
47+
48+
apply_vex_statements_after_import(product_purls, vex_statements)
49+
50+
51+
def _create_cyclonedx_document(data: dict) -> VEX_Document:
52+
document_id = data.get("serialNumber")
53+
if not document_id:
54+
raise ValidationError("serialNumber is missing")
55+
56+
version_value = data.get("version")
57+
if version_value is None:
58+
raise ValidationError("version is missing")
59+
version = str(version_value)
60+
61+
metadata = data.get("metadata", {})
62+
63+
timestamp = metadata.get("timestamp")
64+
if not timestamp:
65+
raise ValidationError("metadata/timestamp is missing")
66+
67+
author = None
68+
# Prefer authors list if available
69+
authors = metadata.get("authors")
70+
if authors and isinstance(authors, list) and len(authors) > 0:
71+
# Find the first author with a name set
72+
author = next(
73+
(item.get("name") for item in authors if isinstance(item, dict) and item.get("name")),
74+
None,
75+
)
76+
77+
# Fall back to manufacturer or supplier if no authors
78+
if not author:
79+
author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
80+
81+
# Fall back to tools name if available
82+
if not author:
83+
tools = metadata.get("tools")
84+
if isinstance(tools, list) and len(tools) > 0:
85+
first_tool = tools[0]
86+
if isinstance(first_tool, dict):
87+
author = first_tool.get("name") or author
88+
elif isinstance(tools, dict):
89+
author = tools.get("name") or author
90+
91+
if not author:
92+
raise ValidationError("author is missing")
93+
94+
try:
95+
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
96+
cyclonedx_document.delete()
97+
except VEX_Document.DoesNotExist:
98+
pass
99+
100+
cyclonedx_document = VEX_Document.objects.create(
101+
type=VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX,
102+
document_id=document_id,
103+
version=version,
104+
initial_release_date=timestamp,
105+
current_release_date=timestamp,
106+
author=author,
107+
role="",
108+
)
109+
110+
return cyclonedx_document
111+
112+
113+
def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tuple[set[str], set[VEX_Statement]]:
114+
vulnerabilities = data.get("vulnerabilities", [])
115+
if not vulnerabilities:
116+
raise ValidationError("CycloneDX document doesn't contain any vulnerabilities")
117+
if not isinstance(vulnerabilities, list):
118+
raise ValidationError("vulnerabilities is not a list")
119+
120+
components_map = _build_components_map(data)
121+
122+
product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
123+
if not product_purl:
124+
raise ValidationError("metadata/component/purl is missing")
125+
validate_purl(product_purl)
126+
127+
product_purls: set[str] = set()
128+
vex_statements: set[VEX_Statement] = set()
129+
130+
vulnerability_counter = 0
131+
for vulnerability in vulnerabilities:
132+
if not isinstance(vulnerability, dict):
133+
raise ValidationError(f"vulnerability[{vulnerability_counter}] is not a dictionary")
134+
135+
vulnerability_id = vulnerability.get("id")
136+
if not vulnerability_id:
137+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/id is missing")
138+
139+
analysis = vulnerability.get("analysis", {})
140+
if not analysis:
141+
# Skip vulnerabilities without analysis
142+
vulnerability_counter += 1
143+
continue
144+
145+
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
146+
147+
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
148+
if not vex_status:
149+
raise ValidationError(
150+
f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}"
151+
)
152+
153+
description = vulnerability.get("description", "")
154+
detail = vulnerability.get("detail", "")
155+
if detail:
156+
description += f"\n\n{detail}"
157+
158+
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
159+
160+
affects = vulnerability.get("affects", [])
161+
if not affects:
162+
# General statement for the product
163+
_create_vex_statement(
164+
cyclonedx_document,
165+
product_purls,
166+
vex_statements,
167+
data=VexStatementData(
168+
vulnerability_id=vulnerability_id,
169+
description=description,
170+
status=vex_status,
171+
justification=cyclonedx_analysis.justification,
172+
impact=cyclonedx_analysis.detail,
173+
remediation=remediation,
174+
product_purl=product_purl,
175+
component_purl="",
176+
),
177+
)
178+
elif not isinstance(affects, list):
179+
raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
180+
else:
181+
_process_affected_components(
182+
document=cyclonedx_document,
183+
product_purls=product_purls,
184+
vex_statements=vex_statements,
185+
vulnerability_counter=vulnerability_counter,
186+
base_data=VexStatementData(
187+
vulnerability_id=vulnerability_id,
188+
description=description,
189+
status=vex_status,
190+
justification=cyclonedx_analysis.justification,
191+
impact=cyclonedx_analysis.detail,
192+
remediation=remediation,
193+
product_purl=product_purl,
194+
),
195+
affects=affects,
196+
components_map=components_map,
197+
)
198+
199+
vulnerability_counter += 1
200+
201+
return product_purls, vex_statements
202+
203+
204+
def _build_components_map(data: dict) -> dict[str, dict]:
205+
components_map = {}
206+
207+
# Add root component from metadata
208+
metadata_component = data.get("metadata", {}).get("component")
209+
if metadata_component and metadata_component.get("bom-ref"):
210+
components_map[metadata_component["bom-ref"]] = metadata_component
211+
212+
# Add all components
213+
for component in data.get("components", []):
214+
if component.get("bom-ref"):
215+
components_map[component["bom-ref"]] = component
216+
217+
return components_map
218+
219+
220+
def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Analysis:
221+
state = analysis.get("state", "")
222+
if not state:
223+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is missing")
224+
225+
justification = analysis.get("justification", "")
226+
if justification:
227+
justification = _map_cyclonedx_justification_to_vex_justification(justification) or ""
228+
response = analysis.get("response", [])
229+
if not isinstance(response, list):
230+
response = []
231+
232+
detail = analysis.get("detail", "")
233+
first_issued = analysis.get("firstIssued", "")
234+
last_updated = analysis.get("lastUpdated", "")
235+
236+
return CycloneDX_Analysis(
237+
state=state,
238+
justification=justification,
239+
response=response,
240+
detail=detail,
241+
first_issued=first_issued,
242+
last_updated=last_updated,
243+
)
244+
245+
246+
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
247+
mapping = {
248+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
249+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
250+
CycloneDX_Analysis_State.CYCLONEDX_STATE_EXPLOITABLE: VEX_Status.VEX_STATUS_AFFECTED,
251+
CycloneDX_Analysis_State.CYCLONEDX_STATE_IN_TRIAGE: VEX_Status.VEX_STATUS_UNDER_INVESTIGATION,
252+
CycloneDX_Analysis_State.CYCLONEDX_STATE_FALSE_POSITIVE: VEX_Status.VEX_STATUS_NOT_AFFECTED,
253+
CycloneDX_Analysis_State.CYCLONEDX_STATE_NOT_AFFECTED: VEX_Status.VEX_STATUS_NOT_AFFECTED,
254+
}
255+
return mapping.get(state)
256+
257+
258+
def _build_remediation_text(response: Optional[list[str]], recommendation: str) -> str:
259+
remediation_parts = []
260+
261+
if response:
262+
response_text = ", ".join(response)
263+
remediation_parts.append(f"Response: {response_text}")
264+
265+
if recommendation:
266+
remediation_parts.append(recommendation)
267+
268+
return "; ".join(remediation_parts)
269+
270+
271+
def _map_cyclonedx_justification_to_vex_justification(justification: str) -> Optional[str]:
272+
mapping = {
273+
"code_not_present": VEX_Justification.STATUS_VULNERABLE_CODE_NOT_PRESENT,
274+
"code_not_reachable": VEX_Justification.STATUS_VULNERABLE_CODE_NOT_IN_EXECUTE_PATH,
275+
"requires_configuration": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
276+
"requires_dependency": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
277+
"requires_environment": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
278+
"protected_by_compiler": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
279+
"protected_at_runtime": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
280+
"protected_at_perimeter": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
281+
"protected_by_mitigating_control": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
282+
}
283+
return mapping.get(justification)
284+
285+
286+
def _process_affected_components(
287+
*,
288+
document: VEX_Document,
289+
product_purls: set[str],
290+
vex_statements: set[VEX_Statement],
291+
vulnerability_counter: int,
292+
base_data: VexStatementData,
293+
affects: list,
294+
components_map: dict,
295+
) -> None:
296+
affected_counter = 0
297+
for affected in affects:
298+
if not isinstance(affected, dict):
299+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}] is not a dictionary")
300+
301+
ref = affected.get("ref")
302+
if not ref:
303+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
304+
305+
component = components_map.get(ref)
306+
if not component:
307+
raise ValidationError(
308+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' not found in components"
309+
)
310+
311+
component_purl = component.get("purl", "")
312+
if not component_purl:
313+
raise ValidationError(
314+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
315+
)
316+
validate_purl(component_purl)
317+
318+
_create_vex_statement(
319+
document,
320+
product_purls,
321+
vex_statements,
322+
data=VexStatementData(
323+
vulnerability_id=base_data.vulnerability_id,
324+
description=base_data.description,
325+
status=base_data.status,
326+
justification=base_data.justification,
327+
impact=base_data.impact,
328+
remediation=base_data.remediation,
329+
product_purl=base_data.product_purl,
330+
component_purl=component_purl,
331+
),
332+
)
333+
334+
affected_counter += 1
335+
336+
337+
def _create_vex_statement(
338+
document: VEX_Document,
339+
product_purls: set[str],
340+
vex_statements: set[VEX_Statement],
341+
data: VexStatementData,
342+
) -> None:
343+
vex_statement = VEX_Statement(
344+
document=document,
345+
vulnerability_id=data.vulnerability_id,
346+
description=data.description,
347+
status=data.status,
348+
justification=data.justification,
349+
impact=data.impact,
350+
remediation=data.remediation,
351+
product_purl=data.product_purl,
352+
component_purl=data.component_purl,
353+
)
354+
vex_statement.save()
355+
vex_statements.add(vex_statement)
356+
product_purls.add(data.product_purl)

0 commit comments

Comments
 (0)