Skip to content

Commit d8f11a8

Browse files
committed
feat: cdx vex
1 parent 4766383 commit d8f11a8

File tree

7 files changed

+696
-18
lines changed

7 files changed

+696
-18
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.4 on 2025-07-30 12:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vex", "0007_alter_csaf_tracking_current_release_date_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="vex_document",
15+
name="type",
16+
field=models.CharField(
17+
choices=[("CSAF", "CSAF"), ("OpenVEX", "OpenVEX"), ("CycloneDX", "CycloneDX")], max_length=16
18+
),
19+
),
20+
]
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from rest_framework.exceptions import ValidationError
5+
6+
from application.core.api.serializers_helpers import validate_purl
7+
from application.vex.models import VEX_Document, VEX_Statement
8+
from application.vex.services.vex_engine import apply_vex_statements_after_import
9+
from application.vex.types import (
10+
CycloneDX_Analysis_Justification,
11+
CycloneDX_Analysis_State,
12+
VEX_Document_Type,
13+
VEX_Justification,
14+
VEX_Status,
15+
)
16+
17+
18+
@dataclass
19+
class CycloneDX_Analysis:
20+
state: str = ""
21+
justification: str = ""
22+
response: Optional[list[str]] = None
23+
detail: str = ""
24+
first_issued: str = ""
25+
last_updated: str = ""
26+
27+
28+
@dataclass
29+
class VexStatementData:
30+
vulnerability_id: str
31+
description: str
32+
status: str
33+
justification: str
34+
impact: str
35+
remediation: str
36+
product_purl: str
37+
component_purl: str
38+
39+
40+
def parse_cyclonedx_data(data: dict) -> None:
41+
cyclonedx_document = _create_cyclonedx_document(data)
42+
43+
product_purls, vex_statements = _process_vex_statements(data, cyclonedx_document)
44+
45+
apply_vex_statements_after_import(product_purls, vex_statements)
46+
47+
48+
def _create_cyclonedx_document(data: dict) -> VEX_Document:
49+
document_id = data.get("serialNumber")
50+
if not document_id:
51+
raise ValidationError("serialNumber is missing")
52+
53+
version_value = data.get("version")
54+
if version_value is None:
55+
raise ValidationError("version is missing")
56+
version = str(version_value)
57+
58+
metadata = data.get("metadata", {})
59+
60+
timestamp = metadata.get("timestamp")
61+
if not timestamp:
62+
raise ValidationError("metadata/timestamp is missing")
63+
64+
author = None
65+
# Prefer authors list if available
66+
authors = metadata.get("authors")
67+
if authors and isinstance(authors, list) and len(authors) > 0:
68+
# Find the first author with a name set
69+
author = next(
70+
(item.get("name") for item in authors if isinstance(item, dict) and item.get("name")),
71+
None,
72+
)
73+
74+
# Fall back to manufacturer or supplier if no authors
75+
if not author:
76+
author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
77+
78+
if not author:
79+
raise ValidationError("author is missing")
80+
81+
try:
82+
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
83+
cyclonedx_document.delete()
84+
except VEX_Document.DoesNotExist:
85+
pass
86+
87+
cyclonedx_document = VEX_Document.objects.create(
88+
type=VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX,
89+
document_id=document_id,
90+
version=version,
91+
initial_release_date=timestamp,
92+
current_release_date=timestamp,
93+
author=author,
94+
role="",
95+
)
96+
97+
return cyclonedx_document
98+
99+
100+
def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tuple[set[str], set[VEX_Statement]]:
101+
vulnerabilities = data.get("vulnerabilities", [])
102+
if not vulnerabilities:
103+
raise ValidationError("CycloneDX document doesn't contain any vulnerabilities")
104+
if not isinstance(vulnerabilities, list):
105+
raise ValidationError("vulnerabilities is not a list")
106+
107+
components_map = _build_components_map(data)
108+
109+
product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
110+
if not product_purl:
111+
raise ValidationError("metadata/component/purl is missing")
112+
validate_purl(product_purl)
113+
114+
product_purls: set[str] = set()
115+
vex_statements: set[VEX_Statement] = set()
116+
117+
vulnerability_counter = 0
118+
for vulnerability in vulnerabilities:
119+
if not isinstance(vulnerability, dict):
120+
raise ValidationError(f"vulnerability[{vulnerability_counter}] is not a dictionary")
121+
122+
vulnerability_id = vulnerability.get("id")
123+
if not vulnerability_id:
124+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/id is missing")
125+
126+
analysis = vulnerability.get("analysis", {})
127+
if not analysis:
128+
# Skip vulnerabilities without analysis
129+
vulnerability_counter += 1
130+
continue
131+
132+
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
133+
134+
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
135+
if not vex_status:
136+
raise ValidationError(
137+
f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}"
138+
)
139+
140+
description = vulnerability.get("description", "")
141+
detail = vulnerability.get("detail", "")
142+
if detail:
143+
description += f"\n\n{detail}"
144+
145+
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
146+
147+
affects = vulnerability.get("affects", [])
148+
if not affects:
149+
# General statement for the product
150+
_create_vex_statement(
151+
cyclonedx_document,
152+
product_purls,
153+
vex_statements,
154+
data=VexStatementData(
155+
vulnerability_id=vulnerability_id,
156+
description=description,
157+
status=vex_status,
158+
justification=cyclonedx_analysis.justification,
159+
impact=cyclonedx_analysis.detail,
160+
remediation=remediation,
161+
product_purl=product_purl,
162+
component_purl="",
163+
),
164+
)
165+
elif not isinstance(affects, list):
166+
raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
167+
else:
168+
_process_affected_components(
169+
document=cyclonedx_document,
170+
product_purls=product_purls,
171+
vex_statements=vex_statements,
172+
vulnerability_counter=vulnerability_counter,
173+
vex_data=VexStatementData(
174+
vulnerability_id=vulnerability_id,
175+
description=description,
176+
status=vex_status,
177+
justification=cyclonedx_analysis.justification,
178+
impact=cyclonedx_analysis.detail,
179+
remediation=remediation,
180+
product_purl=product_purl,
181+
),
182+
affects=affects,
183+
components_map=components_map,
184+
)
185+
186+
vulnerability_counter += 1
187+
188+
return product_purls, vex_statements
189+
190+
191+
def _build_components_map(data: dict) -> dict[str, dict]:
192+
components_map = {}
193+
194+
# Add root component from metadata
195+
metadata_component = data.get("metadata", {}).get("component")
196+
if metadata_component and metadata_component.get("bom-ref"):
197+
components_map[metadata_component["bom-ref"]] = metadata_component
198+
199+
# Add all components
200+
for component in data.get("components", []):
201+
if component.get("bom-ref"):
202+
components_map[component["bom-ref"]] = component
203+
204+
return components_map
205+
206+
207+
def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Analysis:
208+
state = analysis.get("state", "")
209+
if not state:
210+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is missing")
211+
212+
justification = analysis.get("justification", "")
213+
if justification:
214+
justification = _map_cyclonedx_justification_to_vex_justification(justification) or ""
215+
response = analysis.get("response", [])
216+
if not isinstance(response, list):
217+
response = []
218+
219+
detail = analysis.get("detail", "")
220+
first_issued = analysis.get("firstIssued", "")
221+
last_updated = analysis.get("lastUpdated", "")
222+
223+
return CycloneDX_Analysis(
224+
state=state,
225+
justification=justification,
226+
response=response,
227+
detail=detail,
228+
first_issued=first_issued,
229+
last_updated=last_updated,
230+
)
231+
232+
233+
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
234+
mapping = {
235+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
236+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
237+
CycloneDX_Analysis_State.CYCLONEDX_STATE_EXPLOITABLE: VEX_Status.VEX_STATUS_AFFECTED,
238+
CycloneDX_Analysis_State.CYCLONEDX_STATE_IN_TRIAGE: VEX_Status.VEX_STATUS_UNDER_INVESTIGATION,
239+
CycloneDX_Analysis_State.CYCLONEDX_STATE_FALSE_POSITIVE: VEX_Status.VEX_STATUS_NOT_AFFECTED,
240+
CycloneDX_Analysis_State.CYCLONEDX_STATE_NOT_AFFECTED: VEX_Status.VEX_STATUS_NOT_AFFECTED,
241+
}
242+
return mapping.get(state)
243+
244+
245+
def _build_remediation_text(response: Optional[list[str]], recommendation: str) -> str:
246+
remediation_parts = []
247+
248+
if response:
249+
response_text = ", ".join(response)
250+
remediation_parts.append(f"Response: {response_text}")
251+
252+
if recommendation:
253+
remediation_parts.append(recommendation)
254+
255+
return "; ".join(remediation_parts)
256+
257+
258+
def _map_cyclonedx_justification_to_vex_justification(justification: str) -> Optional[str]:
259+
mapping = {
260+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_CODE_NOT_PRESENT: VEX_Justification.STATUS_VULNERABLE_CODE_NOT_PRESENT, # noqa: E501 pylint: disable=line-too-long
261+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_CODE_NOT_REACHABLE: VEX_Justification.STATUS_VULNERABLE_CODE_NOT_IN_EXECUTE_PATH, # noqa: E501 pylint: disable=line-too-long
262+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_REQUIRES_CONFIGURATION: VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY, # noqa: E501 pylint: disable=line-too-long
263+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_REQUIRES_DEPENDENCY: VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY, # noqa: E501 pylint: disable=line-too-long
264+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_REQUIRES_ENVIRONMENT: VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY, # noqa: E501 pylint: disable=line-too-long
265+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_PROTECTED_BY_COMPILER: VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST, # noqa: E501 pylint: disable=line-too-long
266+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_PROTECTED_AT_RUNTIME: VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST, # noqa: E501 pylint: disable=line-too-long
267+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_PROTECTED_AT_PERIMETER: VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST, # noqa: E501 pylint: disable=line-too-long
268+
CycloneDX_Analysis_Justification.CYCLONEDX_JUSTIFICATION_PROTECTED_BY_MITIGATING_CONTROL: VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST, # noqa: E501 pylint: disable=line-too-long
269+
}
270+
return mapping.get(justification)
271+
272+
273+
def _process_affected_components(
274+
*,
275+
document: VEX_Document,
276+
product_purls: set[str],
277+
vex_statements: set[VEX_Statement],
278+
vulnerability_counter: int,
279+
vex_data: VexStatementData,
280+
affects: list,
281+
components_map: dict,
282+
) -> None:
283+
affected_counter = 0
284+
for affected in affects:
285+
if not isinstance(affected, dict):
286+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}] is not a dictionary")
287+
288+
ref = affected.get("ref")
289+
if not ref:
290+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
291+
292+
component = components_map.get(ref)
293+
if not component:
294+
raise ValidationError(
295+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' not found in components"
296+
)
297+
298+
component_purl = component.get("purl", "")
299+
if not component_purl:
300+
raise ValidationError(
301+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
302+
)
303+
validate_purl(component_purl)
304+
305+
_create_vex_statement(
306+
document,
307+
product_purls,
308+
vex_statements,
309+
data=VexStatementData(
310+
vulnerability_id=vex_data.vulnerability_id,
311+
description=vex_data.description,
312+
status=vex_data.status,
313+
justification=vex_data.justification,
314+
impact=vex_data.impact,
315+
remediation=vex_data.remediation,
316+
product_purl=vex_data.product_purl,
317+
component_purl=component_purl,
318+
),
319+
)
320+
321+
affected_counter += 1
322+
323+
324+
def _create_vex_statement(
325+
document: VEX_Document,
326+
product_purls: set[str],
327+
vex_statements: set[VEX_Statement],
328+
data: VexStatementData,
329+
) -> None:
330+
vex_statement = VEX_Statement(
331+
document=document,
332+
vulnerability_id=data.vulnerability_id,
333+
description=data.description,
334+
status=data.status,
335+
justification=data.justification,
336+
impact=data.impact,
337+
remediation=data.remediation,
338+
product_purl=data.product_purl,
339+
component_purl=data.component_purl,
340+
)
341+
vex_statement.save()
342+
vex_statements.add(vex_statement)
343+
product_purls.add(data.product_purl)

backend/application/vex/services/vex_import.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from rest_framework.exceptions import ValidationError
66

77
from application.vex.services.csaf_parser import parse_csaf_data
8+
from application.vex.services.cyclonedx_parser import parse_cyclonedx_data
89
from application.vex.services.openvex_parser import parse_openvex_data
910
from application.vex.types import VEX_Document_Type
1011

@@ -22,6 +23,8 @@ def import_vex(vex_file: File) -> None:
2223
parse_openvex_data(data)
2324
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF:
2425
parse_csaf_data(data)
26+
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX:
27+
parse_cyclonedx_data(data)
2528

2629

2730
def _get_json_data(vex_file: File) -> Optional[dict]:
@@ -40,4 +43,7 @@ def _get_vex_type(data: dict) -> Optional[str]:
4043
if data.get("document", {}).get("category") == "csaf_vex" and data.get("document", {}).get("csaf_version") == "2.0":
4144
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF
4245

46+
if data.get("bomFormat") == "CycloneDX":
47+
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX
48+
4349
return None

0 commit comments

Comments
 (0)