Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

summarizing multiple similar findings into problems #11432

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions dojo/db_migrations/0219_problem_finding_problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 5.0.8 on 2024-11-26 23:24

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dojo', '0218_system_settings_enforce_verified_status_and_more'),
]

operations = [
migrations.CreateModel(
name='Problem',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(help_text='A short name or title for the problem.', max_length=255, verbose_name='Name')),
('description', models.TextField(help_text='Detailed description of the problem.', verbose_name='Description')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when this problem was created.', verbose_name='Created At')),
('updated_at', models.DateTimeField(auto_now=True, help_text='Timestamp when this problem was last updated.', verbose_name='Updated At')),
('severity', models.CharField(choices=[('Critical', 'Critical'), ('High', 'High'), ('Medium', 'Medium'), ('Low', 'Low'), ('Info', 'Info')], help_text='The severity level of this problem.', max_length=50, verbose_name='Severity')),
],
),
migrations.AddField(
model_name='finding',
name='problem',
field=models.ForeignKey(blank=True, help_text='The problem this finding is related to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='findings', to='dojo.problem', verbose_name='Problem'),
),
]
33 changes: 33 additions & 0 deletions dojo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,32 @@ class Meta:
def __str__(self):
return f"{self.finding.id}: {self.action}"

class Problem(models.Model):
name = models.CharField(max_length=255,
verbose_name=_("Name"),
help_text=_("A short name or title for the problem."))
description = models.TextField(
verbose_name=_("Description"),
help_text=_("Detailed description of the problem."))
created_at = models.DateTimeField(auto_now_add=True,
verbose_name=_("Created At"),
help_text=_("Timestamp when this problem was created."))
updated_at = models.DateTimeField(auto_now=True,
verbose_name=_("Updated At"),
help_text=_("Timestamp when this problem was last updated."))
severity = models.CharField(max_length=50,
choices=[
('Critical', _("Critical")),
('High', _("High")),
('Medium', _("Medium")),
('Low', _("Low")),
('Info', _("Info")),
],
verbose_name=_("Severity"),
help_text=_("The severity level of this problem."))
def __str__(self):
return self.name


class Finding(models.Model):
title = models.CharField(max_length=511,
Expand All @@ -2283,6 +2309,13 @@ class Finding(models.Model):
blank=False,
verbose_name=_("Vulnerability Id"),
help_text=_("An id of a vulnerability in a security advisory associated with this finding. Can be a Common Vulnerabilities and Exposures (CVE) or from other sources."))
problem = models.ForeignKey(Problem,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name='findings',
verbose_name=_("Problem"),
help_text=_("The problem this finding is related to."))
epss_score = models.FloatField(default=None, null=True, blank=True,
verbose_name=_("EPSS Score"),
help_text=_("EPSS score for the CVE. Describes how likely it is the vulnerability will be exploited in the next 30 days."),
Expand Down
Empty file added dojo/problem/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions dojo/problem/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
"json_url": "https://homepages.dcc.ufmg.br/~leonardooliveira/disambiguator.json"
}
137 changes: 137 additions & 0 deletions dojo/problem/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import json
import os
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

from dojo.models import Problem, Finding

import logging
logger = logging.getLogger(__name__)

CONFIG_FILE = os.path.join(os.path.dirname(__file__), 'config.json')
CACHED_JSON_FILE = os.path.join('/app/media', 'cached_disambiguator.json')
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved

SEVERITY_ORDER = {
'Critical': 5,
'High': 4,
'Medium': 3,
'Low': 2,
'Info': 1
}

def load_config():
with open(CONFIG_FILE, 'r') as f:
return json.load(f)

def validate_json(data):
if not isinstance(data, dict):
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return False
for key, value in data.items():
if not isinstance(key, str) or not isinstance(value, list):
return False
if not all(isinstance(item, str) for item in value):
return False
return True

def download_json(json_url):
response = requests.get(json_url, timeout=5, verify=False)
response.raise_for_status()
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return response.json()

def load_cached_json():
try:
if os.path.exists(CACHED_JSON_FILE):
with open(CACHED_JSON_FILE, 'r') as f:
data = json.load(f)
if validate_json(data):
return data
except (ValueError, json.JSONDecodeError):
pass
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return None

def save_json_to_cache(data):
logger.info('Saving disambiguator JSON to cache')
with open(CACHED_JSON_FILE, 'w') as f:
json.dump(data, f)

def load_json():
try:
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved

cached_data = load_cached_json()
if cached_data:
return cached_data

# Cache is missing or invalid, download and validate
config = load_config()
json_url = config.get('json_url')

if json_url:
data = download_json(json_url)
if validate_json(data):
save_json_to_cache(data)
return data

return {}

except (requests.RequestException, ValueError, json.JSONDecodeError) as e:
logger.error('Error loading disambiguator JSON: %s', e)
return {}

def extract_script_id(full_id):
parts = full_id.split('____')
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return parts[0] if len(parts) == 2 else None

def find_or_create_problem(finding):
data = load_json()
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
script_id = finding.vuln_id_from_tool

valid_ids_mapping = {
key: [extract_script_id(full_id) for full_id in script_ids if extract_script_id(full_id)]
for key, script_ids in data.items()
}

for key, valid_ids in valid_ids_mapping.items():
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
if script_id in valid_ids:
problem = _get_or_update_problem(valid_ids, finding, script_id)
if problem:
return problem

# if the script_id is not in the mapping, create a new one
return _get_or_create_problem_by_script_id(script_id, finding)

def _get_or_update_problem(valid_ids, finding, script_id):
for valid_id in valid_ids:
related_finding = Finding.objects.filter(vuln_id_from_tool=valid_id).first()
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
if related_finding and related_finding.problem:
problem = related_finding.problem
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[problem.severity]:
_update_problem(problem, finding.title, finding.description, finding.severity)
return problem

return Problem.objects.create(
name=finding.title,
description=finding.description,
severity=finding.severity
)

def _get_or_create_problem_by_script_id(script_id, finding):
related_finding = Finding.objects.filter(vuln_id_from_tool=script_id).first()
if related_finding and related_finding.problem:
problem = related_finding.problem
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[problem.severity]:
_update_problem(problem, finding.title, finding.description, finding.severity)
return problem

return Problem.objects.create(
name=finding.title,
description=finding.description,
severity=finding.severity
)

def _update_problem(problem, name, description, severity):
problem.name = name
problem.description = description
problem.severity = severity
problem.save()
37 changes: 37 additions & 0 deletions dojo/problem/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
import os
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning

from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.problem.helper import CONFIG_FILE, validate_json, download_json, save_json_to_cache

import logging
logger = logging.getLogger(__name__)


@dojo_async_task
@app.task
def daily_cache_update(**kwargs):
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Starting daily cache update")
try:
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
json_url = config.get('json_url')
if json_url:
data = download_json(json_url)
if validate_json(data):
save_json_to_cache(data)
else:
logger.error('Disambiguator JSON is invalid')
else:
logger.error('No JSON URL found in config')
else:
logger.error('Config file not found')
except (requests.RequestException, ValueError, json.JSONDecodeError) as e:
logger.error('Error updating cache: %s', e)
27 changes: 27 additions & 0 deletions dojo/problem/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django.urls import re_path

from dojo.problem import views

urlpatterns = [
# Listing operations
re_path(
r"^problems/all$",
views.ListProblems.as_view(),
name="all_problems",
),
re_path(
r"^problems/open$",
views.ListOpenProblems.as_view(),
name="open_problems",
),
re_path(
r"^problems/closed$",
views.ListClosedProblems.as_view(),
name="closed_problems",
),
re_path(
r"^problems/(?P<problem_id>\d+)/findings$",
views.ProblemFindings.as_view(),
name="problem_findings",
)
]
Loading
Loading