Skip to content

Commit

Permalink
initial useful extractions of filenames from advisory msg, moved advi…
Browse files Browse the repository at this point in the history
…sory creation method from prospector_client.py to advisory.py
  • Loading branch information
sacca97 authored and copernico committed Oct 7, 2022
1 parent fd5e3bc commit 7779249
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 130 deletions.
48 changes: 4 additions & 44 deletions prospector/client/cli/prospector_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import log
from client.cli.console import ConsoleWriter, MessageStatus
from datamodel.advisory import AdvisoryRecord
from datamodel.advisory import AdvisoryRecord, build_advisory_record
from datamodel.commit import Commit, apply_ranking, make_from_raw_commit
from filtering.filter import filter_commits
from git.git import GIT_CACHE, Git
Expand All @@ -26,6 +26,7 @@

_logger = init_local_logger()


SECS_PER_DAY = 86400
TIME_LIMIT_BEFORE = 3 * 365 * SECS_PER_DAY
TIME_LIMIT_AFTER = 180 * SECS_PER_DAY
Expand Down Expand Up @@ -73,6 +74,7 @@ def prospector( # noqa: C901
publication_date,
advisory_keywords,
modified_files,
filter_extensions,
)

with ConsoleWriter("Obtaining initial set of candidates") as writer:
Expand Down Expand Up @@ -248,49 +250,7 @@ def save_preprocessed_commits(backend_address, payload):
)


def build_advisory_record(
vulnerability_id,
repository_url,
vuln_descr,
nvd_rest_endpoint,
fetch_references,
use_nvd,
publication_date,
advisory_keywords,
modified_files,
) -> AdvisoryRecord:

advisory_record = AdvisoryRecord(
vulnerability_id=vulnerability_id,
repository_url=repository_url,
description=vuln_descr,
from_nvd=use_nvd,
nvd_rest_endpoint=nvd_rest_endpoint,
)

_logger.pretty_log(advisory_record)
advisory_record.analyze(use_nvd=use_nvd, fetch_references=fetch_references)
_logger.debug(f"{advisory_record.keywords=}")

if publication_date != "":
advisory_record.published_timestamp = int(
datetime.fromisoformat(publication_date).timestamp()
)

if len(advisory_keywords) > 0:
advisory_record.keywords += tuple(advisory_keywords)
# drop duplicates
advisory_record.keywords = list(set(advisory_record.keywords))

if len(modified_files) > 0:
advisory_record.paths += modified_files

_logger.debug(f"{advisory_record.keywords=}")
_logger.debug(f"{advisory_record.paths=}")

return advisory_record


# TODO: Cleanup many parameters should be recovered from the advisory record object
def get_candidates(
advisory_record,
repository,
Expand Down
1 change: 1 addition & 0 deletions prospector/client/cli/prospector_client_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest

from api import DB_CONNECT_STRING
from client.cli.prospector_client import build_advisory_record
from commitdb.postgres import PostgresCommitDB
from stats.execution import execution_statistics

Expand Down
60 changes: 57 additions & 3 deletions prospector/datamodel/advisory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from util.http import fetch_url

from .nlp import (
extract_affected_files_paths,
extract_affected_filenames,
extract_products,
extract_special_terms,
extract_versions,
Expand Down Expand Up @@ -79,7 +79,9 @@ class AdvisoryRecord(BaseModel):
# self.from_nvd = from_nvd
# self.nvd_rest_endpoint = nvd_rest_endpoint

def analyze(self, use_nvd: bool = False, fetch_references=False):
def analyze(
self, use_nvd: bool = False, fetch_references=False, relevant_extensions=[]
):
self.from_nvd = use_nvd

if self.from_nvd:
Expand All @@ -93,7 +95,8 @@ def analyze(self, use_nvd: bool = False, fetch_references=False):

# TODO: if an exact file is found when applying the rules, the relevance must be updated i think
self.paths = union_of(
self.paths, extract_affected_files_paths(self.description)
self.paths,
extract_affected_filenames(self.description, relevant_extensions),
)
self.keywords = union_of(self.keywords, extract_special_terms(self.description))
_logger.debug("References: " + str(self.references))
Expand Down Expand Up @@ -187,6 +190,57 @@ def get_from_nvd(self, vuln_id: str, nvd_rest_endpoint: str = NVD_REST_ENDPOINT)
return False


# Moved here since it is a factory method basicall
def build_advisory_record(
vulnerability_id,
repository_url,
vuln_descr,
nvd_rest_endpoint,
fetch_references,
use_nvd,
publication_date,
advisory_keywords,
modified_files,
filter_extensions,
) -> AdvisoryRecord:

advisory_record = AdvisoryRecord(
vulnerability_id=vulnerability_id,
repository_url=repository_url,
description=vuln_descr,
from_nvd=use_nvd,
nvd_rest_endpoint=nvd_rest_endpoint,
)

_logger.pretty_log(advisory_record)
advisory_record.analyze(
use_nvd=use_nvd,
fetch_references=fetch_references,
# relevant_extensions=filter_extensions.split(".")[
# 1
# ], # the *. is added early in the main and is needed multiple times in the git so let's leave it there
)
_logger.debug(f"{advisory_record.keywords=}")

if publication_date != "":
advisory_record.published_timestamp = int(
datetime.fromisoformat(publication_date).timestamp()
)

if len(advisory_keywords) > 0:
advisory_record.keywords += tuple(advisory_keywords)
# drop duplicates
advisory_record.keywords = list(set(advisory_record.keywords))

if len(modified_files) > 0:
advisory_record.paths += modified_files

_logger.debug(f"{advisory_record.keywords=}")
_logger.debug(f"{advisory_record.paths=}")

return advisory_record


# might be used in the future
# @dataclass
# class Reference:
Expand Down
58 changes: 57 additions & 1 deletion prospector/datamodel/advisory_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# from dataclasses import asdict
from datamodel.advisory import AdvisoryRecord
import time
from unittest import result

from pytest import skip
from datamodel.advisory import AdvisoryRecord, build_advisory_record
from .nlp import RELEVANT_EXTENSIONS

# import pytest

Expand Down Expand Up @@ -89,6 +94,57 @@ def test_adv_record_keywords():
)


def test_build():
record = build_advisory_record(
"CVE-2014-0050", "", "", "", "", True, "", "", "", "*.java"
)
assert "MultipartStream" in record.paths
assert record.vulnerability_id == "CVE-2014-0050"


@skip(reason="Slow connections make it fail")
def test_filenames_extraction():
cve = {
"CVE-2014-0050": "MultipartStream",
"CVE-2021-22696": "JwtRequestCodeFilter", # Should match JwtRequestCodeFilter
"CVE-2021-27582": "OAuthConfirmationController",
"CVE-2021-29425": "FileNameUtils",
"CVE-2021-30468": "JsonMapObjectReaderWriter",
}

result1 = build_advisory_record(
"CVE-2014-0050", "", "", "", "", True, "", "", "", ""
)
result2 = build_advisory_record(
"CVE-2021-22696", "", "", "", "", True, "", "", "", ""
)
result3 = build_advisory_record(
"CVE-2021-27582", "", "", "", "", True, "", "", "", ""
)
result4 = build_advisory_record(
"CVE-2021-29425", "", "", "", "", True, "", "", "", ""
)
result5 = build_advisory_record(
"CVE-2021-30468", "", "", "", "", True, "", "", "", ""
)
assert (
result1.paths.sort() == ["MultiPartStream", "FileUpload"].sort()
) # Content-Type
assert result2.paths.sort() == ["JwtRequestCodeFilter", "request_uri"].sort()
assert (
result3.paths.sort()
== [
"OAuthConfirmationController",
"@ModelAttribute",
"authorizationRequest",
].sort()
)
assert result4.paths.sort() == ["FileNameUtils"].sort()
assert result5.paths.sort() == ["JsonMapObjectReaderWriter"].sort()

# raise Exception("Test failed")


# def test_adv_record_project_data():
# record = AdvisoryRecord(vulnerability_id="CVE-XXXX-YYYY", description=ADVISORY_TEXT_2)
# record.analyze()
Expand Down
139 changes: 82 additions & 57 deletions prospector/datamodel/nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,63 +45,88 @@ def extract_products(text: str) -> List[str]:
return [p for p in result if len(p) > 2]


def extract_affected_files_paths(text: str, strict_extensions: bool = False):
words = text.split()
words = [
word.strip("_,.:;-+!?)]}'\"") for word in words
] # removing common punctuation marks
paths = []
for word in words:
is_xml_tag = word.startswith("<")
is_property = word.endswith("=")
is_unusual = check_unusual_stuff(word)
not_relevant = is_xml_tag or is_property or is_unusual

if not_relevant:
continue

if check_if_path(word):
paths.append(word)

if check_if_file(word):
paths.append(word.split(".")[0].split("::")[0])

return paths


def check_unusual_stuff(text: str) -> bool:
return '"' in text or "," in text


def check_if_path(text: str) -> bool:
return "/" in text or "\\" in text


# TODO: look if there are others
def check_if_file(text: str) -> bool:
file = text.split(".")
if len(file) == 1:
file = file[0].split("::")

flag = False
# Check if there is an extension
if file[-1] in RELEVANT_EXTENSIONS:
return True

# Common name pattern for files or methods with underscores
if "_" in file[0] or "_" in file[-1]:
return True

# Common methods to refer to methods inside class (e.g. Class.method, Class::method)
if ("." in text or "::" in text) and file[0].isalpha():
return True
# Common name for files or methods with uppercase letter in the middle
if bool(re.match(r"(?=.*[a-z])(?=.*[A-Z])", file[0][1:])) or bool(
re.match(r"(?=.*[a-z])(?=.*[A-Z])", file[-1][1:])
):
return True

return flag
def extract_affected_filenames(
text: str, extensions: List[str] = RELEVANT_EXTENSIONS
) -> List[str]:
paths = set()
for word in text.split():
res = word.strip("_,.:;-+!?()]}'\"")
res = extract_filename_from_path(res)
res = check_file_class_method_names(res, extensions)
if res:
paths.add(res)

return list(paths)


# TODO: enhanche this with extensions
# If looks like a path-to-file try to get the filename.extension or just filename
def extract_filename_from_path(text: str) -> str:
# Pattern //path//to//file or \\path\\to\\file, extract file
# res = re.search(r"^(?:(?:\/{,2}|\\{,2})([\w\-\.]+))+$", text)
# if res:
# return res.group(1), True
# # Else simply return the text
# return text, False
res = text.split("/")

return res[-1] # , len(res) > 1


def check_file_class_method_names(text: str, relevant_extensions: List[str]) -> str:
# Covers cases file.extension if extension is relevant, extensions come from CLI parameter
extensions_regex = r"^([\w\-]+)\.({})?$".format("|".join(relevant_extensions))
res = re.search(extensions_regex, text)
if res:
return res.group(1)

# Covers cases like: class::method, class.method,
res = re.search(r"^(\w+)(?:\.|:{2})(\w+)$", text) # ^(\w{2,})(?:\.|:{2})(\w{2,})$
# Check if it is not a number
if res and not bool(re.match(r"^\d+$", res.group(1))):
return res.group(1)

# Covers cases like: className or class_name (normal string with underscore), this may have false positive but often related to some code
if bool(re.search(r"[a-z]{2}[A-Z]+[a-z]{2}", text)) or "_" in text:
return text

return None


# def check_unusual_stuff(text: str) -> bool:
# return '"' in text or "," in text


# def check_if_path(text: str) -> bool:
# return "/" in text or "\\" in text


# # TODO: look if there are others
# def check_if_file(text: str) -> str:
# file = text.split(".")
# if len(file) == 1:
# file = file[0].split("::")

# flag = False
# # Is a filename with extension
# # TODO: dynamic extension using the --filter-extensions from CLI to reduce computations
# if file[-1] in RELEVANT_EXTENSIONS:
# return file[0]

# # Common name pattern for files or methods with underscores
# if "_" in file[0] or "_" in file[-1]:
# return True

# # Contains "." or "::" can be a Class.Method (Class::Method), letters only
# if ("." in text or "::" in text) and file[0].isalpha():
# return True
# # Contains UPPERCASE and lowercase letters excluding the first and last
# if bool(re.match(r"(?=.*[a-z])(?=.*[A-Z])", file[0][1:-1])) or bool(
# re.match(r"(?=.*[a-z])(?=.*[A-Z])", file[-1][1:-1])
# ):
# return True

# return flag


def extract_ghissue_references(repository: str, text: str) -> Dict[str, str]:
Expand Down
Loading

0 comments on commit 7779249

Please sign in to comment.