diff --git a/CHANGELOG/current/307.md b/CHANGELOG/current/307.md new file mode 100644 index 00000000..e73a0060 --- /dev/null +++ b/CHANGELOG/current/307.md @@ -0,0 +1 @@ +[ADD] Extract response and request info in qualyswebapp's plugins. #307 diff --git a/faraday_plugins/plugins/repo/qualyswebapp/plugin.py b/faraday_plugins/plugins/repo/qualyswebapp/plugin.py index 54feb883..1eaa918a 100644 --- a/faraday_plugins/plugins/repo/qualyswebapp/plugin.py +++ b/faraday_plugins/plugins/repo/qualyswebapp/plugin.py @@ -6,6 +6,7 @@ """ import xml.etree.ElementTree as ET from urllib.parse import urlparse +import base64 from dateutil.parser import parse @@ -94,10 +95,34 @@ class Results: def __init__(self, glossary_tags): self.lista_vul = self.get_qid_list(glossary_tags) + @staticmethod + def build_request(request): + request_data = [] + for url in request.find("URL"): + request_data.append(f'URL: {url.text}') + for header in request.findall('HEADERS/HEADER'): + request_data.append(f'{header.find("key").text}: {header.find("value").text}') + return '\n'.join(request_data) + + @staticmethod + def build_response(response): + response_data = [] + for contents in response.findall('CONTENTS'): + response_data.append(base64.b64decode(contents.text).decode('utf-8')) + for evidence in response.findall('EVIDENCE'): + response_data.append(base64.b64decode(evidence.text).decode('utf-8')) + return '\n'.join(response_data) + def get_qid_list(self, vul_list_tags): self.dict_result_vul = {} for vul in vul_list_tags: - self.dict_result_vul[vul.tag] = vul.text + if vul.tag == "PAYLOADS" and vul.find("PAYLOAD"): + #TODO chequear que no se pueda hacer un html injection decodeando RESPONSE + self.dict_result_vul["REQUEST"] = self.build_request(vul.find("PAYLOAD/REQUEST")) + self.dict_result_vul["METHOD"] = vul.find("PAYLOAD/REQUEST/METHOD").text + self.dict_result_vul["RESPONSE"] = self.build_response(vul.find("PAYLOAD/RESPONSE")) + else: + self.dict_result_vul[vul.tag] = vul.text return self.dict_result_vul @@ -145,12 +170,12 @@ def parseOutputString(self, output): for v in parser.info_results: url = urlparse(v.dict_result_vul.get('URL')) - host_id = self.createAndAddHost(name=url.netloc, os=operating_system, hostnames=hostnames) - vuln_scan_id = v.dict_result_vul.get('QID') + vuln_data = next((item for item in glossary if item["QID"] == vuln_scan_id), None) # Data in the xml is in different parts, we look into the glossary - vuln_data = next((item for item in glossary if item["QID"] == vuln_scan_id), None) + + vuln_name = vuln_data.get('TITLE') vuln_desc = vuln_data.get('DESCRIPTION') vuln_CWE = [vuln_data.get('CWE', '')] @@ -164,17 +189,24 @@ def parseOutputString(self, output): vuln_resolution = vuln_data.get('SOLUTION') - cvss3 = {} - vuln_data_add = f"ID: {v.dict_result_vul.get('ID')}, DETECTION_ID: {v.dict_result_vul.get('DETECTION_ID')}" \ f", CATEGORY: {vuln_data.get('CATEGORY')}, GROUP: {vuln_data.get('GROUP')}" \ f", URL: {v.dict_result_vul.get('URL')}, IMPACT: {vuln_data.get('IMPACT')}" - self.createAndAddVulnToHost(host_id=host_id, name=vuln_name, desc=vuln_desc, + host_id = self.createAndAddHost(name=url.netloc, os=operating_system, hostnames=hostnames) + if v.dict_result_vul.get('REQUEST'): + vuln_request = v.dict_result_vul.get('REQUEST') + vuln_response = v.dict_result_vul.get('RESPONSE') + vuln_method = v.dict_result_vul.get('METHOD') + service_id = self.createAndAddServiceToHost(host_id=host_id, name=url.path, protocol='tcp', ports=0) + self.createAndAddVulnWebToService(host_id=host_id, service_id=service_id, name=vuln_name, desc=vuln_desc, + severity=vuln_severity, resolution=vuln_resolution, run_date=run_date, + external_id="QUALYS-" + vuln_scan_id, data=vuln_data_add, cwe=vuln_CWE, + method=vuln_method, response=vuln_response, request=vuln_request, path=url.path) + else: + self.createAndAddVulnToHost(host_id=host_id, name=vuln_name, desc=vuln_desc, severity=vuln_severity, resolution=vuln_resolution, run_date=run_date, - external_id="QUALYS-"+vuln_scan_id, data=vuln_data_add, cwe=vuln_CWE, - cvss3=cvss3) - + external_id="QUALYS-"+vuln_scan_id, data=vuln_data_add, cwe=vuln_CWE) def createPlugin(*args, **kwargs): return QualysWebappPlugin(*args, **kwargs)