diff --git a/.gitignore b/.gitignore
index 1f280df..d24a112 100644
--- a/.gitignore
+++ b/.gitignore
@@ -199,8 +199,11 @@ specs.yaml
specs.json
swagger.yaml
swagger.json
+oas.yml
*.json
## unknown data
.DS_Store
-oas.yml
+
+## local testing scripts
+test.py
\ No newline at end of file
diff --git a/src/offat/__main__.py b/src/offat/__main__.py
index e404941..555ce59 100644
--- a/src/offat/__main__.py
+++ b/src/offat/__main__.py
@@ -126,6 +126,14 @@ def start():
action='store_true',
help='Captures failed requests due to any exceptions into output file',
)
+ parser.add_argument(
+ '--server',
+ dest='server_url',
+ type=str,
+ default=None,
+ required=False,
+ help='server/host base url to overwrite from OAS/Swagger file',
+ )
args = parser.parse_args()
# convert req headers str to dict
@@ -142,7 +150,7 @@ def start():
test_data_config = validate_config_file_data(test_data_config)
# parse args and run tests
- api_parser = create_parser(args.fpath)
+ api_parser = create_parser(args.fpath, server_url=args.server_url)
generate_and_run_tests(
api_parser=api_parser,
regex_pattern=args.path_regex_pattern,
diff --git a/src/offat/http.py b/src/offat/http.py
index 39a6277..9e6620c 100644
--- a/src/offat/http.py
+++ b/src/offat/http.py
@@ -36,6 +36,7 @@ def validate_proxy(self, proxy_url: str | None):
True if the proxy URL seems valid and a basic connection can be established, False otherwise.
"""
# Check for valid URL format
+ # TODO: implement url parse security: https://docs.python.org/3/library/urllib.parse.html#url-parsing-security
parsed_url = urlparse(proxy_url)
if all([parsed_url.scheme, parsed_url.netloc]):
return True
diff --git a/src/offat/parsers/__init__.py b/src/offat/parsers/__init__.py
index c9c3974..39dc176 100644
--- a/src/offat/parsers/__init__.py
+++ b/src/offat/parsers/__init__.py
@@ -1,6 +1,5 @@
-from sys import exit
-from requests import get as http_get
from json import loads as json_load, JSONDecodeError
+from requests import get as http_get
from .openapi import OpenAPIv3Parser
from .swagger import SwaggerParser
from .parser import BaseParser
@@ -9,7 +8,9 @@
def create_parser(
- fpath_or_url: str, spec: dict | None = None
+ fpath_or_url: str,
+ spec: dict | None = None,
+ server_url: str | None = None,
) -> SwaggerParser | OpenAPIv3Parser | None:
'''returns parser based on doc file'''
if fpath_or_url and is_valid_url(fpath_or_url):
@@ -29,12 +30,14 @@ def create_parser(
exit(-1)
try:
- parser = BaseParser(file_or_url=fpath_or_url, spec=spec)
+ parser = BaseParser(file_or_url=fpath_or_url, spec=spec, server_url=server_url)
except OSError:
logger.error('File Not Found')
exit(-1)
if parser.is_v3:
- return OpenAPIv3Parser(file_or_url=fpath_or_url, spec=spec)
+ return OpenAPIv3Parser(
+ file_or_url=fpath_or_url, spec=spec, server_url=server_url
+ )
- return SwaggerParser(fpath_or_url=fpath_or_url, spec=spec)
+ return SwaggerParser(fpath_or_url=fpath_or_url, spec=spec, server_url=server_url)
diff --git a/src/offat/parsers/openapi.py b/src/offat/parsers/openapi.py
index ea49b8f..16c3de4 100644
--- a/src/offat/parsers/openapi.py
+++ b/src/offat/parsers/openapi.py
@@ -2,6 +2,7 @@
module to parse OAS v3 documentation JSON/YAML files.
'''
from .parser import BaseParser
+from ..utils import parse_server_url
from ..logger import logger
@@ -11,22 +12,34 @@ class InvalidOpenAPIv3File(Exception):
class OpenAPIv3Parser(BaseParser):
'''OpenAPI v3 Spec File Parser'''
- # while adding new method to this class, make sure same method is present in SwaggerParser class
+ # while adding new method to this class, make sure same method is present in SwaggerParser class
- def __init__(self, file_or_url: str, spec: dict | None = None) -> None:
- super().__init__(file_or_url=file_or_url, spec=spec) # noqa
+ def __init__(
+ self, file_or_url: str, spec: dict | None = None, *args, **kwargs
+ ) -> None:
+ super().__init__(file_or_url=file_or_url, spec=spec, *args, **kwargs) # noqa
if not self.is_v3:
- raise InvalidOpenAPIv3File("Invalid OAS v3 file")
+ raise InvalidOpenAPIv3File('Invalid OAS v3 file')
- self._populate_hosts()
self.http_scheme = self._get_scheme()
- self.api_base_path = self.specification.get('basePath', '')
+
+ # save hosts in self.hosts
+ self._populate_hosts()
+
+ # raise error if host data not found
+ if not (self.hosts and self.hosts[0]):
+ raise ValueError('Host is invalid or not found')
+
+ # parse and set host data
+ host_dict = self.hosts[0]
+ self.http_scheme = host_dict['scheme']
+ self.host = f'{host_dict["host"]}:{host_dict["port"]}'
+ self.api_base_path = host_dict['basepath']
self.base_url = f"{self.http_scheme}://{self.host}"
self.request_response_params = self._get_request_response_params()
-
def _populate_hosts(self):
servers = self.specification.get('servers', [])
hosts = []
@@ -35,14 +48,25 @@ def _populate_hosts(self):
raise InvalidOpenAPIv3File('Server URLs Not Found in spec file')
for server in servers:
- host = server.get('url', '').removeprefix(
- 'https://').removeprefix('http://').removesuffix('/')
- host = None if host == '' else host
- hosts.append(host)
+ # host = (
+ # server.get('url', '')
+ # .removeprefix('https://')
+ # .removeprefix('http://')
+ # .removesuffix('/')
+ # )
+ # host = None if host == '' else host
+ scheme, host, port, basepath = parse_server_url(url=server.get('url'))
+
+ hosts.append(
+ {
+ 'scheme': scheme,
+ 'host': host,
+ 'port': port,
+ 'basepath': basepath,
+ }
+ )
self.hosts = hosts
- self.host = self.hosts[0]
-
def _get_scheme(self):
servers = self.specification.get('servers', [])
@@ -53,20 +77,20 @@ def _get_scheme(self):
scheme = 'https' if 'https' in schemes else 'http'
return scheme
-
- def _fetch_schema_from_spec(self, param_schema_ref:str) -> dict:
+ def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict:
schema_spec_path = param_schema_ref.split('/')[1:]
-
+
if len(schema_spec_path) > 3:
- logger.error('Schema spec $ref path should not be greater than 3 (excluding #)')
+ logger.error(
+ 'Schema spec $ref path should not be greater than 3 (excluding #)'
+ )
return {}
-
- schema_data:dict = self.specification
+
+ schema_data: dict = self.specification
for child_ele in schema_spec_path:
- schema_data:dict = schema_data.get(child_ele, {})
+ schema_data: dict = schema_data.get(child_ele, {})
return schema_data
-
def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
@@ -96,13 +120,20 @@ def _get_response_definition_schema(self, responses: dict):
if content:
status_code_content_type_responses = content.keys()
for status_code_content_type in status_code_content_type_responses:
- status_code_content = responses[status_code]['content'][status_code_content_type].keys()
+ status_code_content = responses[status_code]['content'][
+ status_code_content_type
+ ].keys()
if 'parameters' in status_code_content:
- responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
+ responses[status_code]['schema'] = responses[status_code][
+ 'content'
+ ][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
- responses[status_code]['schema'] = self._get_param_definition_schema(
- responses[status_code]['content'][status_code_content_type])
-
+ responses[status_code][
+ 'schema'
+ ] = self._get_param_definition_schema(
+ responses[status_code]['content'][status_code_content_type]
+ )
+
else:
# Fetch $ref schema directly
ref = responses[status_code].get('$ref', None)
@@ -111,7 +142,6 @@ def _get_response_definition_schema(self, responses: dict):
return responses
-
def _get_request_response_params(self):
'''Returns Schema of requests and response params
@@ -133,43 +163,52 @@ def _get_request_response_params(self):
if http_method not in ['get', 'put', 'post', 'delete', 'options']:
continue
- request_parameters = paths[path][http_method].get(
- 'parameters', [])
+ request_parameters = paths[path][http_method].get('parameters', [])
# create list of parameters: Fetch object schema from OAS file
body_params = []
- body_parameter_keys = paths[path][http_method].get(
- 'requestBody', {}).get('content', {})
+ body_parameter_keys = (
+ paths[path][http_method].get('requestBody', {}).get('content', {})
+ )
for body_parameter_key in body_parameter_keys:
- body_parameters_dict = paths[path][http_method]['requestBody']['content'][body_parameter_key]
+ body_parameters_dict = paths[path][http_method]['requestBody'][
+ 'content'
+ ][body_parameter_key]
required = paths[path][http_method]['requestBody'].get('required')
- description = paths[path][http_method]['requestBody'].get('description')
+ description = paths[path][http_method]['requestBody'].get(
+ 'description'
+ )
body_param = self._get_param_definition_schema(body_parameters_dict)
- body_params.append({
- 'in': 'body',
- 'name': body_parameter_key,
- 'description': description,
- 'required': required,
- 'schema': body_param,
- })
+ body_params.append(
+ {
+ 'in': 'body',
+ 'name': body_parameter_key,
+ 'description': description,
+ 'required': required,
+ 'schema': body_param,
+ }
+ )
response_params = []
response_params = self._get_response_definition_schema(
- paths[path][http_method].get('responses', {}))
+ paths[path][http_method].get('responses', {})
+ )
# add body param to request param
request_parameters += body_params
- requests.append({
- 'http_method': http_method,
- 'path': path,
- 'request_params': request_parameters,
- 'response_params': response_params,
- 'path_params': path_params,
- 'body_params': body_params,
- })
+ requests.append(
+ {
+ 'http_method': http_method,
+ 'path': path,
+ 'request_params': request_parameters,
+ 'response_params': response_params,
+ 'path_params': path_params,
+ 'body_params': body_params,
+ }
+ )
return requests
diff --git a/src/offat/parsers/parser.py b/src/offat/parsers/parser.py
index 035b415..a438448 100644
--- a/src/offat/parsers/parser.py
+++ b/src/offat/parsers/parser.py
@@ -1,31 +1,47 @@
from openapi_spec_validator import validate
from openapi_spec_validator.readers import read_from_filename
from ..logger import logger
+from ..utils import parse_server_url
class InvalidSpecVersion(Exception):
- '''Exception to be raised '''
+ '''Exception to be raised'''
+
pass
class BaseParser:
- def __init__(self, file_or_url: str, spec: dict = None) -> None:
+ def __init__(
+ self, file_or_url: str, spec: dict | None = None, server_url: str | None = None
+ ) -> None:
if spec:
- self.specification:dict = spec
- base_uri = ""
+ self.specification: dict = spec
+ base_uri = ''
else:
self.specification, base_uri = read_from_filename(file_or_url)
+ self.is_v3 = self._get_oas_version() == 3
+
+ # overwrite server if present according to OAS version
+ if self.is_v3 and server_url:
+ self.specification['servers'] = [{'url': server_url}]
+ elif server_url:
+ scheme, host, port, basepath = parse_server_url(url=server_url)
+ basepath = '/' if basepath == '' else basepath
+ self.specification['host'] = f'{host}:{port}'
+ self.specification['schemes'] = [scheme]
+ self.specification['basePath'] = basepath
+
try:
validate(spec=self.specification, base_uri=base_uri)
self.valid = True
except Exception as e:
- logger.warning("OAS/Swagger file is invalid!")
- logger.error('Failed to validate spec %s due to err: %s', file_or_url, repr(e))
+ logger.warning('OAS/Swagger file is invalid!')
+ logger.error(
+ 'Failed to validate spec %s due to err: %s', file_or_url, repr(e)
+ )
self.valid = False
- self.is_v3 = self._get_oas_version() == 3
-
self.hosts = []
def _get_oas_version(self):
@@ -33,7 +49,7 @@ def _get_oas_version(self):
return 3
elif self.specification.get('swagger'):
return 2
- raise InvalidSpecVersion("only openapi and swagger specs are supported for now")
+ raise InvalidSpecVersion('only openapi and swagger specs are supported for now')
def _get_endpoints(self):
'''Returns list of endpoint paths along with HTTP methods allowed'''
diff --git a/src/offat/parsers/swagger.py b/src/offat/parsers/swagger.py
index 764387d..98ec5ad 100644
--- a/src/offat/parsers/swagger.py
+++ b/src/offat/parsers/swagger.py
@@ -11,12 +11,15 @@ class InvalidSwaggerFile(Exception):
class SwaggerParser(BaseParser):
'''Swagger Spec file Parser'''
+
# while adding new method to this class, make sure same method is present in OpenAPIv3Parser class
- def __init__(self, fpath_or_url: str, spec: dict | None = None) -> None:
- super().__init__(file_or_url=fpath_or_url, spec=spec) # noqa
+ def __init__(
+ self, fpath_or_url: str, spec: dict | None = None, *args, **kwargs
+ ) -> None:
+ super().__init__(file_or_url=fpath_or_url, spec=spec, *args, **kwargs) # noqa
if self.is_v3:
- raise InvalidSwaggerFile("Invalid OAS v3 file")
+ raise InvalidSwaggerFile('Invalid OAS v3 file')
self._populate_hosts()
self.http_scheme = self._get_scheme()
@@ -47,8 +50,7 @@ def _get_param_definition_schema(self, param: dict):
if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
- param_schema = self.specification.get(
- 'definitions', {}).get(model_slug)
+ param_schema = self.specification.get('definitions', {}).get(model_slug)
return param_schema
@@ -67,7 +69,8 @@ def _get_response_definition_schema(self, responses: dict):
responses[status_code]['schema'] = responses[status_code]['parameters']
elif 'schema' in status_code_response:
responses[status_code]['schema'] = self._get_param_definition_schema(
- responses[status_code])
+ responses[status_code]
+ )
else:
continue
@@ -95,21 +98,23 @@ def _get_request_response_params(self):
continue
# below var contains overall params
- request_parameters = paths[path][http_method].get(
- 'parameters', [])
+ request_parameters = paths[path][http_method].get('parameters', [])
response_params = self._get_response_definition_schema(
- paths[path][http_method].get('responses', {}))
+ paths[path][http_method].get('responses', {})
+ )
# create list of parameters: Fetch object schema from OAS file
for param in request_parameters:
param['schema'] = self._get_param_definition_schema(param)
- requests.append({
- 'http_method': http_method,
- 'path': path,
- 'request_params': request_parameters,
- 'response_params': response_params,
- 'path_params': path_params,
- })
+ requests.append(
+ {
+ 'http_method': http_method,
+ 'path': path,
+ 'request_params': request_parameters,
+ 'response_params': response_params,
+ 'path_params': path_params,
+ }
+ )
return requests
diff --git a/src/offat/tester/test_generator.py b/src/offat/tester/test_generator.py
index ccbed1f..5069d5d 100644
--- a/src/offat/tester/test_generator.py
+++ b/src/offat/tester/test_generator.py
@@ -2,8 +2,9 @@
from .fuzzer import fill_params
from .post_test_processor import PostTestFiltersEnum
from .fuzzer import generate_random_int
-from ..parsers import SwaggerParser, OpenAPIv3Parser
from ..config_data_handler import populate_user_data
+from ..parsers import SwaggerParser, OpenAPIv3Parser
+from ..utils import join_uri_path
class TestGenerator:
@@ -20,7 +21,7 @@ class TestGenerator:
sqli_fuzz_params: Performs SQL injection (SQLi) parameter fuzzing based on the provided OpenAPIParser instance.
"""
- def __init__(self, headers: dict = None) -> None:
+ def __init__(self, headers: dict = None) -> None:
"""
Initializes an instance of the TestGenerator class.
@@ -41,7 +42,7 @@ def check_unsupported_http_methods(
openapi_parser: SwaggerParser | OpenAPIv3Parser,
success_codes: list[int] = [200, 201, 301, 302],
*args,
- **kwargs
+ **kwargs,
):
'''Checks whether endpoint supports undocumented/unsupported HTTP methods
@@ -72,7 +73,7 @@ def check_unsupported_http_methods(
'methods': [],
'body_params': [],
'query_params': [],
- 'path_params': []
+ 'path_params': [],
}
endpoints_index[endpoint]['endpoints'].append(fuzzed_endpoint_data)
@@ -80,47 +81,51 @@ def check_unsupported_http_methods(
endpoints_index[endpoint]['methods'].append(method.lower())
endpoints_index[endpoint]['body_params'].extend(
- fuzzed_endpoint_data['body_params'])
+ fuzzed_endpoint_data['body_params']
+ )
endpoints_index[endpoint]['query_params'].extend(
- fuzzed_endpoint_data['query_params'])
+ fuzzed_endpoint_data['query_params']
+ )
endpoints_index[endpoint]['path_params'].extend(
- fuzzed_endpoint_data['path_params'])
+ fuzzed_endpoint_data['path_params']
+ )
for endpoint, endpoint_dict in endpoints_index.items():
methods_allowed = endpoint_dict.get('methods', [])
body_params = endpoint_dict.get('body_params', [])
path_params = endpoint_dict.get('path_params', [])
query_params = endpoint_dict.get('query_params', [])
- url = f'{openapi_parser.base_url}{endpoint}'
+ url = join_uri_path(openapi_parser.base_url, endpoint)
http_methods: set = {'get', 'post', 'put', 'delete', 'options'}
restricted_methods = http_methods - set(methods_allowed)
for restricted_method in restricted_methods:
- tasks.append({
- 'test_name': 'UnSupported HTTP Method Check',
- 'url': url,
- 'endpoint': endpoint,
- 'method': restricted_method.upper(),
- 'malicious_payload': [],
- 'args': args,
- 'kwargs': kwargs,
- 'result_details': {
- True: 'Endpoint does not perform any HTTP method which is not documented', # passed
- False: 'Endpoint performs HTTP method which is not documented', # failed
- },
- 'body_params': body_params,
- 'query_params': query_params,
- 'path_params': path_params,
- 'success_codes': success_codes,
- 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name
- })
+ tasks.append(
+ {
+ 'test_name': 'UnSupported HTTP Method Check',
+ 'url': url,
+ 'endpoint': endpoint,
+ 'method': restricted_method.upper(),
+ 'malicious_payload': [],
+ 'args': args,
+ 'kwargs': kwargs,
+ 'result_details': {
+ True: 'Endpoint does not perform any HTTP method which is not documented', # passed
+ False: 'Endpoint performs HTTP method which is not documented', # failed
+ },
+ 'body_params': body_params,
+ 'query_params': query_params,
+ 'path_params': path_params,
+ 'success_codes': success_codes,
+ 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
+ }
+ )
return tasks
def __get_request_params_list(self, request_params: list[dict]):
- '''Get list of request parameters
- '''
+ '''Get list of request parameters'''
payload_data = []
for request_param in request_params:
param_pos = request_param.get('in')
@@ -132,16 +137,20 @@ def __get_request_params_list(self, request_params: list[dict]):
for prop in props.keys():
prop_type = props[prop].get('type')
- payload_data.append({
- 'in': param_pos,
- 'name': prop,
- 'type': prop_type,
- 'required': prop in required_params,
- })
+ payload_data.append(
+ {
+ 'in': param_pos,
+ 'name': prop,
+ 'type': prop_type,
+ 'required': prop in required_params,
+ }
+ )
return payload_data
- def __fuzz_request_params(self, openapi_parser: SwaggerParser | OpenAPIv3Parser) -> list[dict]:
+ def __fuzz_request_params(
+ self, openapi_parser: SwaggerParser | OpenAPIv3Parser
+ ) -> list[dict]:
"""
Fuzzes Request params available in different positions and returns a list
of tasks
@@ -164,11 +173,14 @@ def __fuzz_request_params(self, openapi_parser: SwaggerParser | OpenAPIv3Parser)
# get params based on their position in request
request_body_params = list(
- filter(lambda x: x.get('in') == 'body', request_params))
+ filter(lambda x: x.get('in') == 'body', request_params)
+ )
request_query_params = list(
- filter(lambda x: x.get('in') == 'query', request_params))
+ filter(lambda x: x.get('in') == 'query', request_params)
+ )
path_params_in_body = list(
- filter(lambda x: x.get('in') == 'path', request_params))
+ filter(lambda x: x.get('in') == 'path', request_params)
+ )
# handle path params from path_params
# and replace path params by value in
@@ -183,17 +195,24 @@ def __fuzz_request_params(self, openapi_parser: SwaggerParser | OpenAPIv3Parser)
path_param_value = path_param.get('value')
endpoint_path = endpoint_path.replace(
- '{' + str(path_param_name) + '}', str(path_param_value))
-
- tasks.append({
- 'url': f'{base_url}{openapi_parser.api_base_path}{endpoint_path}',
- 'endpoint': f'{openapi_parser.api_base_path}{endpoint_path}',
- 'method': path_obj.get('http_method', '').upper(),
- 'body_params': request_body_params,
- 'query_params': request_query_params,
- 'path_params': path_params,
- # 'malicious_payload':path_params,
- })
+ '{' + str(path_param_name) + '}', str(path_param_value)
+ )
+
+ tasks.append(
+ {
+ 'url': join_uri_path(
+ base_url, openapi_parser.api_base_path, endpoint_path
+ ),
+ 'endpoint': join_uri_path(
+ openapi_parser.api_base_path, endpoint_path
+ ),
+ 'method': path_obj.get('http_method', '').upper(),
+ 'body_params': request_body_params,
+ 'query_params': request_query_params,
+ 'path_params': path_params,
+ # 'malicious_payload':path_params,
+ }
+ )
return tasks
@@ -221,11 +240,11 @@ def __inject_payload_in_params(self, request_params: list[dict], payload: str):
return request_params
def sqli_fuzz_params_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- success_codes: list[int] = [500],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ success_codes: list[int] = [500],
+ *args,
+ **kwargs,
):
'''Performs SQL injection (SQLi) parameter fuzzing based on the provided OpenAPIParser instance.
@@ -262,12 +281,14 @@ def sqli_fuzz_params_test(
# handle body request params
body_request_params = request_obj.get('body_params', [])
malicious_body_request_params = self.__inject_payload_in_params(
- body_request_params, sqli_payload)
+ body_request_params, sqli_payload
+ )
# handle query request params
query_request_params = request_obj.get('query_params', [])
malicious_query_request_params = self.__inject_payload_in_params(
- query_request_params, sqli_payload)
+ query_request_params, sqli_payload
+ )
# BUG: for few SQLi test, path params injected value is not matching with final URI path params in output
request_obj['test_name'] = 'SQLi Test'
@@ -284,17 +305,19 @@ def sqli_fuzz_params_test(
False: 'One or more parameter is vulnerable to SQL Injection Attack', # failed
}
request_obj['success_codes'] = success_codes
- request_obj['response_filter'] = PostTestFiltersEnum.STATUS_CODE_FILTER.name
+ request_obj[
+ 'response_filter'
+ ] = PostTestFiltersEnum.STATUS_CODE_FILTER.name
tasks.append(deepcopy(request_obj))
return tasks
def sqli_in_uri_path_fuzz_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- success_codes: list[int] = [500],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ success_codes: list[int] = [500],
+ *args,
+ **kwargs,
):
'''Generate Tests for SQLi in endpoint path
@@ -315,7 +338,10 @@ def sqli_in_uri_path_fuzz_test(
# filter path containing params in path
endpoints_with_param_in_path = list(
- filter(lambda path_obj: '/{' in path_obj.get('path'), request_response_params))
+ filter(
+ lambda path_obj: '/{' in path_obj.get('path'), request_response_params
+ )
+ )
basic_sqli_payloads = [
"' OR 1=1 ;--",
@@ -334,7 +360,8 @@ def sqli_in_uri_path_fuzz_test(
# get request body params
request_body_params = list(
- filter(lambda x: x.get('in') == 'body', request_params))
+ filter(lambda x: x.get('in') == 'body', request_params)
+ )
# handle path params from path_params
# and replace path params by value in
@@ -343,7 +370,8 @@ def sqli_in_uri_path_fuzz_test(
path_params = path_obj.get('path_params', [])
path_params_in_body = list(
- filter(lambda x: x.get('in') == 'path', request_params))
+ filter(lambda x: x.get('in') == 'path', request_params)
+ )
path_params += path_params_in_body
path_params = fill_params(path_params, openapi_parser.is_v3)
@@ -351,38 +379,46 @@ def sqli_in_uri_path_fuzz_test(
path_param_name = path_param.get('name')
# path_param_value = path_param.get('value')
endpoint_path = endpoint_path.replace(
- '{' + str(path_param_name) + '}', str(sqli_payload))
+ '{' + str(path_param_name) + '}', str(sqli_payload)
+ )
request_query_params = list(
- filter(lambda x: x.get('in') == 'query', request_params))
-
- tasks.append({
- 'test_name': 'SQLi Test in URI Path with Fuzzed Params',
- 'url': f'{base_url}{openapi_parser.api_base_path}{endpoint_path}',
- 'endpoint': f'{openapi_parser.api_base_path}{endpoint_path}',
- 'method': path_obj.get('http_method').upper(),
- 'body_params': request_body_params,
- 'query_params': request_query_params,
- 'path_params': path_params,
- 'malicious_payload': sqli_payload,
- 'args': args,
- 'kwargs': kwargs,
- 'result_details': {
- True: 'Endpoint is not vulnerable to SQLi', # passed
- False: 'Endpoint might be vulnerable to SQli', # failed
- },
- 'success_codes': success_codes,
- 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name
- })
+ filter(lambda x: x.get('in') == 'query', request_params)
+ )
+
+ tasks.append(
+ {
+ 'test_name': 'SQLi Test in URI Path with Fuzzed Params',
+ 'url': join_uri_path(
+ base_url, openapi_parser.api_base_path, endpoint_path
+ ),
+ 'endpoint': join_uri_path(
+ openapi_parser.api_base_path, endpoint_path
+ ),
+ 'method': path_obj.get('http_method').upper(),
+ 'body_params': request_body_params,
+ 'query_params': request_query_params,
+ 'path_params': path_params,
+ 'malicious_payload': sqli_payload,
+ 'args': args,
+ 'kwargs': kwargs,
+ 'result_details': {
+ True: 'Endpoint is not vulnerable to SQLi', # passed
+ False: 'Endpoint might be vulnerable to SQli', # failed
+ },
+ 'success_codes': success_codes,
+ 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
+ }
+ )
return tasks
def bola_fuzz_path_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- success_codes: list[int] = [200, 201, 301],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ success_codes: list[int] = [200, 201, 301],
+ *args,
+ **kwargs,
):
'''Generate Tests for BOLA in endpoint path
@@ -403,7 +439,10 @@ def bola_fuzz_path_test(
# filter path containing params in path
endpoints_with_param_in_path = list(
- filter(lambda path_obj: '/{' in path_obj.get('path'), request_response_params))
+ filter(
+ lambda path_obj: '/{' in path_obj.get('path'), request_response_params
+ )
+ )
tasks = []
for path_obj in endpoints_with_param_in_path:
@@ -413,7 +452,8 @@ def bola_fuzz_path_test(
# get request body params
request_body_params = list(
- filter(lambda x: x.get('in') == 'body', request_params))
+ filter(lambda x: x.get('in') == 'body', request_params)
+ )
# handle path params from path_params
# and replace path params by value in
@@ -422,7 +462,8 @@ def bola_fuzz_path_test(
path_params = path_obj.get('path_params', [])
path_params_in_body = list(
- filter(lambda x: x.get('in') == 'path', request_params))
+ filter(lambda x: x.get('in') == 'path', request_params)
+ )
path_params += path_params_in_body
path_params = fill_params(path_params, openapi_parser.is_v3)
@@ -430,39 +471,47 @@ def bola_fuzz_path_test(
path_param_name = path_param.get('name')
path_param_value = path_param.get('value')
endpoint_path = endpoint_path.replace(
- '{' + str(path_param_name) + '}', str(path_param_value))
+ '{' + str(path_param_name) + '}', str(path_param_value)
+ )
request_query_params = list(
- filter(lambda x: x.get('in') == 'query', request_params))
-
- tasks.append({
- 'test_name': 'BOLA Path Test with Fuzzed Params',
- # f'{base_url}{endpoint_path}',
- 'url': f'{base_url}{openapi_parser.api_base_path}{endpoint_path}',
- 'endpoint': f'{openapi_parser.api_base_path}{endpoint_path}',
- 'method': path_obj.get('http_method').upper(),
- 'body_params': request_body_params,
- 'query_params': request_query_params,
- 'path_params': path_params,
- 'malicious_payload': path_params,
- 'args': args,
- 'kwargs': kwargs,
- 'result_details': {
- True: 'Endpoint is not vulnerable to BOLA', # passed
- False: 'Endpoint might be vulnerable to BOLA', # failed
- },
- 'success_codes': success_codes,
- 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name
- })
+ filter(lambda x: x.get('in') == 'query', request_params)
+ )
+
+ tasks.append(
+ {
+ 'test_name': 'BOLA Path Test with Fuzzed Params',
+ # f'{base_url}{endpoint_path}',
+ 'url': join_uri_path(
+ base_url, openapi_parser.api_base_path, endpoint_path
+ ),
+ 'endpoint': join_uri_path(
+ openapi_parser.api_base_path, endpoint_path
+ ),
+ 'method': path_obj.get('http_method').upper(),
+ 'body_params': request_body_params,
+ 'query_params': request_query_params,
+ 'path_params': path_params,
+ 'malicious_payload': path_params,
+ 'args': args,
+ 'kwargs': kwargs,
+ 'result_details': {
+ True: 'Endpoint is not vulnerable to BOLA', # passed
+ False: 'Endpoint might be vulnerable to BOLA', # failed
+ },
+ 'success_codes': success_codes,
+ 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
+ }
+ )
return tasks
def bola_fuzz_trailing_slash_path_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- success_codes: list[int] = [200, 201, 301],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ success_codes: list[int] = [200, 201, 301],
+ *args,
+ **kwargs,
):
'''Generate Tests for BOLA in endpoint path
@@ -489,11 +538,14 @@ def bola_fuzz_trailing_slash_path_test(
# get params based on their position in request
request_body_params = list(
- filter(lambda x: x.get('in') == 'body', request_params))
+ filter(lambda x: x.get('in') == 'body', request_params)
+ )
request_query_params = list(
- filter(lambda x: x.get('in') == 'query', request_params))
+ filter(lambda x: x.get('in') == 'query', request_params)
+ )
path_params_in_body = list(
- filter(lambda x: x.get('in') == 'path', request_params))
+ filter(lambda x: x.get('in') == 'path', request_params)
+ )
# handle path params from path_params
# and replace path params by value in
@@ -507,33 +559,38 @@ def bola_fuzz_trailing_slash_path_test(
path_param_name = path_param.get('name')
path_param_value = path_param.get('value')
endpoint_path = endpoint_path.replace(
- '{' + str(path_param_name) + '}', str(path_param_value))
+ '{' + str(path_param_name) + '}', str(path_param_value)
+ )
# generate URL for BOLA attack
- url = f'{base_url}{openapi_parser.api_base_path}{endpoint_path}'
+ url = join_uri_path(base_url, openapi_parser.api_base_path, endpoint_path)
if url.endswith('/'):
url = f'{url}{generate_random_int()}'
else:
url = f'{url}/{generate_random_int()}'
- tasks.append({
- 'test_name': 'BOLA Path Trailing Slash Test',
- 'url': url,
- 'endpoint': f'{openapi_parser.api_base_path}{endpoint_path}',
- 'method': path_obj.get('http_method').upper(),
- 'body_params': request_body_params,
- 'query_params': request_query_params,
- 'path_params': path_params,
- 'malicious_payload': [],
- 'args': args,
- 'kwargs': kwargs,
- 'result_details': {
- True: 'Endpoint might not vulnerable to BOLA', # passed
- False: 'Endpoint might be vulnerable to BOLA', # failed
- },
- 'success_codes': success_codes,
- 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name
- })
+ tasks.append(
+ {
+ 'test_name': 'BOLA Path Trailing Slash Test',
+ 'url': url,
+ 'endpoint': join_uri_path(
+ openapi_parser.api_base_path, endpoint_path
+ ),
+ 'method': path_obj.get('http_method').upper(),
+ 'body_params': request_body_params,
+ 'query_params': request_query_params,
+ 'path_params': path_params,
+ 'malicious_payload': [],
+ 'args': args,
+ 'kwargs': kwargs,
+ 'result_details': {
+ True: 'Endpoint might not vulnerable to BOLA', # passed
+ False: 'Endpoint might be vulnerable to BOLA', # failed
+ },
+ 'success_codes': success_codes,
+ 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
+ }
+ )
return tasks
@@ -543,7 +600,7 @@ def _inject_response_params(self, response_params: dict, is_v3: bool = False):
Args:
body_params ([dict]) : dict of response from openapi documentation
- {'200':{'properties':{'schema':{'test_param':{'type':'str'}}}}}
+ {'200':{'properties':{'schema':{'test_param':{'type':'str'}}}}}
Returns:
list[dict]: list of dict containing test case for endpoint
@@ -567,11 +624,11 @@ def _inject_response_params(self, response_params: dict, is_v3: bool = False):
return params
def bopla_fuzz_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- success_codes: list[int] = [200, 201, 301],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ success_codes: list[int] = [200, 201, 301],
+ *args,
+ **kwargs,
):
'''Generate Tests for BOPLA/Mass Assignment Vulnerability
@@ -598,11 +655,14 @@ def bopla_fuzz_test(
# get params based on their position in request
request_body_params = list(
- filter(lambda x: x.get('in') == 'body', request_params))
+ filter(lambda x: x.get('in') == 'body', request_params)
+ )
request_query_params = list(
- filter(lambda x: x.get('in') == 'query', request_params))
+ filter(lambda x: x.get('in') == 'query', request_params)
+ )
path_params_in_body = list(
- filter(lambda x: x.get('in') == 'path', request_params))
+ filter(lambda x: x.get('in') == 'path', request_params)
+ )
# handle path params from path_params
# and replace path params by value in
@@ -616,7 +676,8 @@ def bopla_fuzz_test(
path_param_name = path_param.get('name')
path_param_value = path_param.get('value')
endpoint_path = endpoint_path.replace(
- '{' + str(path_param_name) + '}', str(path_param_value))
+ '{' + str(path_param_name) + '}', str(path_param_value)
+ )
# assign values to response params below and add them to JSON request body
response_body_params = self._inject_response_params(
@@ -625,25 +686,31 @@ def bopla_fuzz_test(
)
request_body_params += response_body_params
- tasks.append({
- 'test_name': 'BOPLA Test',
- # f'{base_url}{endpoint_path}',
- 'url': f'{base_url}{openapi_parser.api_base_path}{endpoint_path}',
- 'endpoint': f'{openapi_parser.api_base_path}{endpoint_path}',
- 'method': path_obj.get('http_method', '').upper(),
- 'body_params': request_body_params,
- 'query_params': request_query_params,
- 'path_params': path_params,
- 'malicious_payload': response_body_params,
- 'args': args,
- 'kwargs': kwargs,
- 'result_details': {
- True: 'Endpoint might not vulnerable to BOPLA', # passed
- False: 'Endpoint might be vulnerable to BOPLA', # failed
- },
- 'success_codes': success_codes,
- 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name
- })
+ tasks.append(
+ {
+ 'test_name': 'BOPLA Test',
+ # f'{base_url}{endpoint_path}',
+ 'url': join_uri_path(
+ base_url, openapi_parser.api_base_path, endpoint_path
+ ),
+ 'endpoint': join_uri_path(
+ openapi_parser.api_base_path, endpoint_path
+ ),
+ 'method': path_obj.get('http_method', '').upper(),
+ 'body_params': request_body_params,
+ 'query_params': request_query_params,
+ 'path_params': path_params,
+ 'malicious_payload': response_body_params,
+ 'args': args,
+ 'kwargs': kwargs,
+ 'result_details': {
+ True: 'Endpoint might not vulnerable to BOPLA', # passed
+ False: 'Endpoint might be vulnerable to BOPLA', # failed
+ },
+ 'success_codes': success_codes,
+ 'response_filter': PostTestFiltersEnum.STATUS_CODE_FILTER.name,
+ }
+ )
return tasks
@@ -654,13 +721,13 @@ def test_with_user_data(
test_for_actor1: bool = True,
test_for_actor2: bool = False,
*args,
- **kwargs
+ **kwargs,
):
'''Generate Tests with user sepecified data using provided test generator method
Args:
user_data (dict): User specified YAML data as dict.
- test_generator_method (class method): test generator class method to be used for generating API pentest tests.
+ test_generator_method (class method): test generator class method to be used for generating API pentest tests.
test_for_actor1 (bool): Generate tests for actor1 user data
test_for_actor2 (bool): Generate tests for actor2 user data
*args: Variable-length positional arguments.
@@ -688,13 +755,13 @@ def test_with_user_data(
return new_tests
def __generate_injection_fuzz_params_test(
- self,
- openapi_parser: SwaggerParser | OpenAPIv3Parser,
- test_name: str,
- result_details: dict,
- payloads_data: list[dict],
- *args,
- **kwargs
+ self,
+ openapi_parser: SwaggerParser | OpenAPIv3Parser,
+ test_name: str,
+ result_details: dict,
+ payloads_data: list[dict],
+ *args,
+ **kwargs,
):
'''Performs injection parameter fuzzing based on the provided OpenAPIParser instance and matches injected payload using regex in response.
@@ -722,12 +789,14 @@ def __generate_injection_fuzz_params_test(
# handle body request params
body_request_params = request_obj.get('body_params', [])
malicious_body_request_params = self.__inject_payload_in_params(
- body_request_params, payload)
+ body_request_params, payload
+ )
# handle query request params
query_request_params = request_obj.get('query_params', [])
malicious_query_request_params = self.__inject_payload_in_params(
- query_request_params, payload)
+ query_request_params, payload
+ )
request_obj['test_name'] = test_name
@@ -739,15 +808,20 @@ def __generate_injection_fuzz_params_test(
request_obj['malicious_payload'] = payload
request_obj['result_details'] = result_details
- request_obj['response_filter'] = PostTestFiltersEnum.BODY_REGEX_FILTER.name
+ request_obj[
+ 'response_filter'
+ ] = PostTestFiltersEnum.BODY_REGEX_FILTER.name
request_obj['response_match_regex'] = payload_dict.get(
- 'response_match_regex')
+ 'response_match_regex'
+ )
tasks.append(deepcopy(request_obj))
return tasks
- def os_command_injection_fuzz_params_test(self, openapi_parser: SwaggerParser | OpenAPIv3Parser):
+ def os_command_injection_fuzz_params_test(
+ self, openapi_parser: SwaggerParser | OpenAPIv3Parser
+ ):
'''Performs OS Command injection parameter fuzzing based on the provided OpenAPIParser instance.
Args:
@@ -764,18 +838,9 @@ def os_command_injection_fuzz_params_test(self, openapi_parser: SwaggerParser |
test_name = 'OS Command Injection Test'
payloads_data = [
- {
- "request_payload": "cat /etc/passwd",
- "response_match_regex": r"root:.*"
- },
- {
- "request_payload": "cat /etc/shadow",
- "response_match_regex": r"root:.*"
- },
- {
- "request_payload": "ls -la",
- "response_match_regex": r"total\s\d+"
- },
+ {'request_payload': 'cat /etc/passwd', 'response_match_regex': r'root:.*'},
+ {'request_payload': 'cat /etc/shadow', 'response_match_regex': r'root:.*'},
+ {'request_payload': 'ls -la', 'response_match_regex': r'total\s\d+'},
]
result_details = {
@@ -790,7 +855,9 @@ def os_command_injection_fuzz_params_test(self, openapi_parser: SwaggerParser |
payloads_data=payloads_data,
)
- def xss_html_injection_fuzz_params_test(self, openapi_parser: SwaggerParser | OpenAPIv3Parser):
+ def xss_html_injection_fuzz_params_test(
+ self, openapi_parser: SwaggerParser | OpenAPIv3Parser
+ ):
'''Performs OS Command injection parameter fuzzing based on the provided OpenAPIParser instance.
Args:
@@ -808,16 +875,16 @@ def xss_html_injection_fuzz_params_test(self, openapi_parser: SwaggerParser | Op
payloads_data = [
{
- "request_payload": "",
- "response_match_regex": r"',
+ 'response_match_regex': r'",
- "response_match_regex": r"',
+ 'response_match_regex': r'