Skip to content

Commit a20cd97

Browse files
committed
feat: cdx vex
1 parent ca00903 commit a20cd97

7 files changed

Lines changed: 704 additions & 18 deletions

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.4 on 2025-07-30 12:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vex", "0007_alter_csaf_tracking_current_release_date_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="vex_document",
15+
name="type",
16+
field=models.CharField(
17+
choices=[("CSAF", "CSAF"), ("OpenVEX", "OpenVEX"), ("CycloneDX", "CycloneDX")], max_length=16
18+
),
19+
),
20+
]
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from rest_framework.exceptions import ValidationError
5+
6+
from application.core.api.serializers_helpers import validate_purl
7+
from application.vex.models import VEX_Document, VEX_Statement
8+
from application.vex.services.vex_engine import apply_vex_statements_after_import
9+
from application.vex.types import (
10+
CycloneDX_Analysis_State,
11+
VEX_Document_Type,
12+
VEX_Justification,
13+
VEX_Status,
14+
)
15+
16+
17+
@dataclass
18+
class CycloneDX_Analysis:
19+
state: str = ""
20+
justification: str = ""
21+
response: Optional[list[str]] = None
22+
detail: str = ""
23+
first_issued: str = ""
24+
last_updated: str = ""
25+
26+
def __post_init__(self) -> None:
27+
if self.response is None:
28+
self.response = []
29+
30+
31+
def parse_cyclonedx_data(data: dict) -> None:
32+
cyclonedx_document = _create_cyclonedx_document(data)
33+
34+
product_purls, vex_statements = _process_vex_statements(data, cyclonedx_document)
35+
36+
apply_vex_statements_after_import(product_purls, vex_statements)
37+
38+
39+
def _create_cyclonedx_document(data: dict) -> VEX_Document:
40+
document_id = data.get("serialNumber")
41+
if not document_id:
42+
raise ValidationError("serialNumber is missing")
43+
44+
version_value = data.get("version")
45+
if version_value is None:
46+
raise ValidationError("version is missing")
47+
version = str(version_value)
48+
49+
metadata = data.get("metadata", {})
50+
51+
timestamp = metadata.get("timestamp")
52+
if not timestamp:
53+
raise ValidationError("metadata/timestamp is missing")
54+
55+
author = None
56+
# Prefer authors list if available
57+
authors = metadata.get("authors")
58+
if authors and isinstance(authors, list) and len(authors) > 0:
59+
# Find the first author with a name set
60+
author = next(
61+
(item.get("name") for item in authors if isinstance(item, dict) and item.get("name")),
62+
None,
63+
)
64+
65+
# Fall back to manufacturer or supplier if no authors
66+
if not author:
67+
author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
68+
69+
# Fall back to tools name if available
70+
if not author:
71+
tools = metadata.get("tools")
72+
if isinstance(tools, list) and len(tools) > 0:
73+
first_tool = tools[0]
74+
if isinstance(first_tool, dict):
75+
author = first_tool.get("name") or author
76+
elif isinstance(tools, dict):
77+
author = tools.get("name") or author
78+
79+
if not author:
80+
raise ValidationError("author is missing")
81+
82+
try:
83+
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
84+
cyclonedx_document.delete()
85+
except VEX_Document.DoesNotExist:
86+
pass
87+
88+
cyclonedx_document = VEX_Document.objects.create(
89+
type=VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX,
90+
document_id=document_id,
91+
version=version,
92+
initial_release_date=timestamp,
93+
current_release_date=timestamp,
94+
author=author,
95+
role="",
96+
)
97+
98+
return cyclonedx_document
99+
100+
101+
def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tuple[set[str], set[VEX_Statement]]:
102+
vulnerabilities = _extract_vulnerabilities(data)
103+
components_map = _build_components_map(data)
104+
product_purl = _extract_product_purl(data)
105+
106+
product_purls: set[str] = set()
107+
vex_statements: set[VEX_Statement] = set()
108+
109+
for vulnerability_counter, vulnerability in enumerate(vulnerabilities):
110+
context = _prepare_vulnerability_context(vulnerability, vulnerability_counter)
111+
if context is None:
112+
# skip vulnerabilities without analysis
113+
continue
114+
115+
if not context["affects"]:
116+
statement = _create_general_vex_statement(
117+
cyclonedx_document=cyclonedx_document,
118+
product_purl=product_purl,
119+
context=context,
120+
)
121+
vex_statements.add(statement)
122+
product_purls.add(product_purl)
123+
continue
124+
125+
if not isinstance(context["affects"], list):
126+
raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
127+
128+
statements = _create_component_vex_statements(
129+
cyclonedx_document=cyclonedx_document,
130+
context=context,
131+
components_map=components_map,
132+
product_purl=product_purl,
133+
vulnerability_counter=vulnerability_counter,
134+
)
135+
vex_statements.update(statements)
136+
if statements:
137+
product_purls.add(product_purl)
138+
139+
return product_purls, vex_statements
140+
141+
142+
def _extract_vulnerabilities(data: dict) -> list[dict]:
143+
vulnerabilities = data.get("vulnerabilities", [])
144+
if not vulnerabilities:
145+
raise ValidationError("CycloneDX document doesn't contain any vulnerabilities")
146+
if not isinstance(vulnerabilities, list):
147+
raise ValidationError("vulnerabilities is not a list")
148+
return vulnerabilities
149+
150+
151+
def _extract_product_purl(data: dict) -> str:
152+
product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
153+
if not product_purl:
154+
raise ValidationError("metadata/component/purl is missing")
155+
validate_purl(product_purl)
156+
return product_purl
157+
158+
159+
def _prepare_vulnerability_context(vulnerability: dict, vulnerability_counter: int) -> Optional[dict]:
160+
if not isinstance(vulnerability, dict):
161+
raise ValidationError(f"vulnerability[{vulnerability_counter}] is not a dictionary")
162+
163+
vulnerability_id = vulnerability.get("id")
164+
if not vulnerability_id:
165+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/id is missing")
166+
167+
analysis = vulnerability.get("analysis", {})
168+
if not analysis:
169+
return None
170+
171+
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
172+
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
173+
if not vex_status:
174+
raise ValidationError(
175+
f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}"
176+
)
177+
178+
description = vulnerability.get("description", "")
179+
detail = vulnerability.get("detail", "")
180+
if detail:
181+
description += f"\n\n{detail}"
182+
183+
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
184+
affects = vulnerability.get("affects", [])
185+
186+
return {
187+
"vulnerability_id": vulnerability_id,
188+
"cyclonedx_analysis": cyclonedx_analysis,
189+
"vex_status": vex_status,
190+
"description": description,
191+
"remediation": remediation,
192+
"affects": affects,
193+
}
194+
195+
196+
def _create_general_vex_statement(
197+
*, cyclonedx_document: VEX_Document, product_purl: str, context: dict
198+
) -> VEX_Statement:
199+
statement = VEX_Statement(
200+
document=cyclonedx_document,
201+
vulnerability_id=context["vulnerability_id"],
202+
description=context["description"],
203+
status=context["vex_status"],
204+
justification=context["cyclonedx_analysis"].justification,
205+
detail=context["cyclonedx_analysis"].detail,
206+
remediation=context["remediation"],
207+
product_purl=product_purl,
208+
component_purl="",
209+
)
210+
statement.save()
211+
return statement
212+
213+
214+
def _create_component_vex_statements(
215+
*,
216+
cyclonedx_document: VEX_Document,
217+
context: dict,
218+
components_map: dict[str, dict],
219+
product_purl: str,
220+
vulnerability_counter: int,
221+
) -> set[VEX_Statement]:
222+
statements: set[VEX_Statement] = set()
223+
for affected_counter, affected in enumerate(context["affects"]):
224+
component_purl = _get_component_purl_from_affect(
225+
affected=affected,
226+
components_map=components_map,
227+
vulnerability_counter=vulnerability_counter,
228+
affected_counter=affected_counter,
229+
)
230+
231+
statement = VEX_Statement(
232+
document=cyclonedx_document,
233+
vulnerability_id=context["vulnerability_id"],
234+
description=context["description"],
235+
status=context["vex_status"],
236+
justification=context["cyclonedx_analysis"].justification,
237+
detail=context["cyclonedx_analysis"].detail,
238+
remediation=context["remediation"],
239+
product_purl=product_purl,
240+
component_purl=component_purl,
241+
)
242+
statement.save()
243+
statements.add(statement)
244+
return statements
245+
246+
247+
def _get_component_purl_from_affect(
248+
*,
249+
affected: dict,
250+
components_map: dict[str, dict],
251+
vulnerability_counter: int,
252+
affected_counter: int,
253+
) -> str:
254+
if not isinstance(affected, dict):
255+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}] is not a dictionary")
256+
257+
ref = affected.get("ref")
258+
if not ref:
259+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
260+
261+
component = components_map.get(ref)
262+
if not component:
263+
raise ValidationError(
264+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' not found in components"
265+
)
266+
267+
component_purl = component.get("purl", "")
268+
if not component_purl:
269+
raise ValidationError(
270+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
271+
)
272+
validate_purl(component_purl)
273+
return component_purl
274+
275+
276+
def _build_components_map(data: dict) -> dict[str, dict]:
277+
components_map = {}
278+
279+
# Add root component from metadata
280+
metadata_component = data.get("metadata", {}).get("component")
281+
if metadata_component and metadata_component.get("bom-ref"):
282+
components_map[metadata_component["bom-ref"]] = metadata_component
283+
284+
# Add all components
285+
for component in data.get("components", []):
286+
if component.get("bom-ref"):
287+
components_map[component["bom-ref"]] = component
288+
289+
return components_map
290+
291+
292+
def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Analysis:
293+
state = analysis.get("state", "")
294+
if not state:
295+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is missing")
296+
297+
justification = analysis.get("justification", "")
298+
if justification:
299+
justification = _map_cyclonedx_justification_to_vex_justification(justification) or ""
300+
response = analysis.get("response", [])
301+
if not isinstance(response, list):
302+
response = []
303+
304+
detail = analysis.get("detail", "")
305+
first_issued = analysis.get("firstIssued", "")
306+
last_updated = analysis.get("lastUpdated", "")
307+
308+
return CycloneDX_Analysis(
309+
state=state,
310+
justification=justification,
311+
response=response,
312+
detail=detail,
313+
first_issued=first_issued,
314+
last_updated=last_updated,
315+
)
316+
317+
318+
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
319+
mapping = {
320+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
321+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
322+
CycloneDX_Analysis_State.CYCLONEDX_STATE_EXPLOITABLE: VEX_Status.VEX_STATUS_AFFECTED,
323+
CycloneDX_Analysis_State.CYCLONEDX_STATE_IN_TRIAGE: VEX_Status.VEX_STATUS_UNDER_INVESTIGATION,
324+
CycloneDX_Analysis_State.CYCLONEDX_STATE_FALSE_POSITIVE: VEX_Status.VEX_STATUS_NOT_AFFECTED,
325+
CycloneDX_Analysis_State.CYCLONEDX_STATE_NOT_AFFECTED: VEX_Status.VEX_STATUS_NOT_AFFECTED,
326+
}
327+
return mapping.get(state)
328+
329+
330+
def _build_remediation_text(response: Optional[list[str]], recommendation: str) -> str:
331+
remediation_parts = []
332+
333+
if response:
334+
response_text = ", ".join(response)
335+
remediation_parts.append(f"Response: {response_text}")
336+
337+
if recommendation:
338+
remediation_parts.append(recommendation)
339+
340+
return "; ".join(remediation_parts)
341+
342+
343+
def _map_cyclonedx_justification_to_vex_justification(justification: str) -> Optional[str]:
344+
mapping = {
345+
"code_not_present": VEX_Justification.STATUS_VULNERABLE_CODE_NOT_PRESENT,
346+
"code_not_reachable": VEX_Justification.STATUS_VULNERABLE_CODE_NOT_IN_EXECUTE_PATH,
347+
"requires_configuration": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
348+
"requires_dependency": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
349+
"requires_environment": VEX_Justification.STATUS_VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
350+
"protected_by_compiler": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
351+
"protected_at_runtime": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
352+
"protected_at_perimeter": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
353+
"protected_by_mitigating_control": VEX_Justification.STATUS_INLINE_MITIGATIONS_ALREADY_EXIST,
354+
}
355+
return mapping.get(justification)

0 commit comments

Comments
 (0)