Skip to content

Commit

Permalink
Merge pull request #75 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev Release: v0.17.0
  • Loading branch information
dmdhrumilmistry authored Apr 6, 2024
2 parents 231000c + 3692ce2 commit ce7086c
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 310 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ specs.yaml
specs.json
swagger.yaml
swagger.json
oas.yml
*.json

## unknown data
.DS_Store
oas.yml

## local testing scripts
test.py
10 changes: 9 additions & 1 deletion src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ def start():
action='store_true',
help='Captures failed requests due to any exceptions into output file',
)
parser.add_argument(
'--server',
dest='server_url',
type=str,
default=None,
required=False,
help='server/host base url to overwrite from OAS/Swagger file',
)
args = parser.parse_args()

# convert req headers str to dict
Expand All @@ -142,7 +150,7 @@ def start():
test_data_config = validate_config_file_data(test_data_config)

# parse args and run tests
api_parser = create_parser(args.fpath)
api_parser = create_parser(args.fpath, server_url=args.server_url)
generate_and_run_tests(
api_parser=api_parser,
regex_pattern=args.path_regex_pattern,
Expand Down
1 change: 1 addition & 0 deletions src/offat/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def validate_proxy(self, proxy_url: str | None):
True if the proxy URL seems valid and a basic connection can be established, False otherwise.
"""
# Check for valid URL format
# TODO: implement url parse security: https://docs.python.org/3/library/urllib.parse.html#url-parsing-security
parsed_url = urlparse(proxy_url)
if all([parsed_url.scheme, parsed_url.netloc]):
return True
Expand Down
15 changes: 9 additions & 6 deletions src/offat/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from sys import exit
from requests import get as http_get
from json import loads as json_load, JSONDecodeError
from requests import get as http_get
from .openapi import OpenAPIv3Parser
from .swagger import SwaggerParser
from .parser import BaseParser
Expand All @@ -9,7 +8,9 @@


def create_parser(
fpath_or_url: str, spec: dict | None = None
fpath_or_url: str,
spec: dict | None = None,
server_url: str | None = None,
) -> SwaggerParser | OpenAPIv3Parser | None:
'''returns parser based on doc file'''
if fpath_or_url and is_valid_url(fpath_or_url):
Expand All @@ -29,12 +30,14 @@ def create_parser(
exit(-1)

try:
parser = BaseParser(file_or_url=fpath_or_url, spec=spec)
parser = BaseParser(file_or_url=fpath_or_url, spec=spec, server_url=server_url)
except OSError:
logger.error('File Not Found')
exit(-1)

if parser.is_v3:
return OpenAPIv3Parser(file_or_url=fpath_or_url, spec=spec)
return OpenAPIv3Parser(
file_or_url=fpath_or_url, spec=spec, server_url=server_url
)

return SwaggerParser(fpath_or_url=fpath_or_url, spec=spec)
return SwaggerParser(fpath_or_url=fpath_or_url, spec=spec, server_url=server_url)
137 changes: 88 additions & 49 deletions src/offat/parsers/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
module to parse OAS v3 documentation JSON/YAML files.
'''
from .parser import BaseParser
from ..utils import parse_server_url
from ..logger import logger


Expand All @@ -11,22 +12,34 @@ class InvalidOpenAPIv3File(Exception):

class OpenAPIv3Parser(BaseParser):
'''OpenAPI v3 Spec File Parser'''
# while adding new method to this class, make sure same method is present in SwaggerParser class

# while adding new method to this class, make sure same method is present in SwaggerParser class

def __init__(self, file_or_url: str, spec: dict | None = None) -> None:
super().__init__(file_or_url=file_or_url, spec=spec) # noqa
def __init__(
self, file_or_url: str, spec: dict | None = None, *args, **kwargs
) -> None:
super().__init__(file_or_url=file_or_url, spec=spec, *args, **kwargs) # noqa
if not self.is_v3:
raise InvalidOpenAPIv3File("Invalid OAS v3 file")
raise InvalidOpenAPIv3File('Invalid OAS v3 file')

self._populate_hosts()
self.http_scheme = self._get_scheme()
self.api_base_path = self.specification.get('basePath', '')

# save hosts in self.hosts
self._populate_hosts()

# raise error if host data not found
if not (self.hosts and self.hosts[0]):
raise ValueError('Host is invalid or not found')

# parse and set host data
host_dict = self.hosts[0]
self.http_scheme = host_dict['scheme']
self.host = f'{host_dict["host"]}:{host_dict["port"]}'
self.api_base_path = host_dict['basepath']
self.base_url = f"{self.http_scheme}://{self.host}"

self.request_response_params = self._get_request_response_params()


def _populate_hosts(self):
servers = self.specification.get('servers', [])
hosts = []
Expand All @@ -35,14 +48,25 @@ def _populate_hosts(self):
raise InvalidOpenAPIv3File('Server URLs Not Found in spec file')

for server in servers:
host = server.get('url', '').removeprefix(
'https://').removeprefix('http://').removesuffix('/')
host = None if host == '' else host
hosts.append(host)
# host = (
# server.get('url', '')
# .removeprefix('https://')
# .removeprefix('http://')
# .removesuffix('/')
# )
# host = None if host == '' else host
scheme, host, port, basepath = parse_server_url(url=server.get('url'))

hosts.append(
{
'scheme': scheme,
'host': host,
'port': port,
'basepath': basepath,
}
)

self.hosts = hosts
self.host = self.hosts[0]


def _get_scheme(self):
servers = self.specification.get('servers', [])
Expand All @@ -53,20 +77,20 @@ def _get_scheme(self):
scheme = 'https' if 'https' in schemes else 'http'
return scheme


def _fetch_schema_from_spec(self, param_schema_ref:str) -> dict:
def _fetch_schema_from_spec(self, param_schema_ref: str) -> dict:
schema_spec_path = param_schema_ref.split('/')[1:]

if len(schema_spec_path) > 3:
logger.error('Schema spec $ref path should not be greater than 3 (excluding #)')
logger.error(
'Schema spec $ref path should not be greater than 3 (excluding #)'
)
return {}
schema_data:dict = self.specification

schema_data: dict = self.specification
for child_ele in schema_spec_path:
schema_data:dict = schema_data.get(child_ele, {})
schema_data: dict = schema_data.get(child_ele, {})

return schema_data


def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
Expand Down Expand Up @@ -96,13 +120,20 @@ def _get_response_definition_schema(self, responses: dict):
if content:
status_code_content_type_responses = content.keys()
for status_code_content_type in status_code_content_type_responses:
status_code_content = responses[status_code]['content'][status_code_content_type].keys()
status_code_content = responses[status_code]['content'][
status_code_content_type
].keys()
if 'parameters' in status_code_content:
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
responses[status_code]['schema'] = responses[status_code][
'content'
][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])

responses[status_code][
'schema'
] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type]
)

else:
# Fetch $ref schema directly
ref = responses[status_code].get('$ref', None)
Expand All @@ -111,7 +142,6 @@ def _get_response_definition_schema(self, responses: dict):

return responses


def _get_request_response_params(self):
'''Returns Schema of requests and response params
Expand All @@ -133,43 +163,52 @@ def _get_request_response_params(self):
if http_method not in ['get', 'put', 'post', 'delete', 'options']:
continue

request_parameters = paths[path][http_method].get(
'parameters', [])
request_parameters = paths[path][http_method].get('parameters', [])

# create list of parameters: Fetch object schema from OAS file
body_params = []

body_parameter_keys = paths[path][http_method].get(
'requestBody', {}).get('content', {})
body_parameter_keys = (
paths[path][http_method].get('requestBody', {}).get('content', {})
)

for body_parameter_key in body_parameter_keys:
body_parameters_dict = paths[path][http_method]['requestBody']['content'][body_parameter_key]
body_parameters_dict = paths[path][http_method]['requestBody'][
'content'
][body_parameter_key]

required = paths[path][http_method]['requestBody'].get('required')
description = paths[path][http_method]['requestBody'].get('description')
description = paths[path][http_method]['requestBody'].get(
'description'
)
body_param = self._get_param_definition_schema(body_parameters_dict)

body_params.append({
'in': 'body',
'name': body_parameter_key,
'description': description,
'required': required,
'schema': body_param,
})
body_params.append(
{
'in': 'body',
'name': body_parameter_key,
'description': description,
'required': required,
'schema': body_param,
}
)

response_params = []
response_params = self._get_response_definition_schema(
paths[path][http_method].get('responses', {}))
paths[path][http_method].get('responses', {})
)

# add body param to request param
request_parameters += body_params
requests.append({
'http_method': http_method,
'path': path,
'request_params': request_parameters,
'response_params': response_params,
'path_params': path_params,
'body_params': body_params,
})
requests.append(
{
'http_method': http_method,
'path': path,
'request_params': request_parameters,
'response_params': response_params,
'path_params': path_params,
'body_params': body_params,
}
)

return requests
34 changes: 25 additions & 9 deletions src/offat/parsers/parser.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,55 @@
from openapi_spec_validator import validate
from openapi_spec_validator.readers import read_from_filename
from ..logger import logger
from ..utils import parse_server_url


class InvalidSpecVersion(Exception):
'''Exception to be raised '''
'''Exception to be raised'''

pass


class BaseParser:
def __init__(self, file_or_url: str, spec: dict = None) -> None:
def __init__(
self, file_or_url: str, spec: dict | None = None, server_url: str | None = None
) -> None:
if spec:
self.specification:dict = spec
base_uri = ""
self.specification: dict = spec
base_uri = ''
else:
self.specification, base_uri = read_from_filename(file_or_url)

self.is_v3 = self._get_oas_version() == 3

# overwrite server if present according to OAS version
if self.is_v3 and server_url:
self.specification['servers'] = [{'url': server_url}]
elif server_url:
scheme, host, port, basepath = parse_server_url(url=server_url)
basepath = '/' if basepath == '' else basepath
self.specification['host'] = f'{host}:{port}'
self.specification['schemes'] = [scheme]
self.specification['basePath'] = basepath

try:
validate(spec=self.specification, base_uri=base_uri)
self.valid = True
except Exception as e:
logger.warning("OAS/Swagger file is invalid!")
logger.error('Failed to validate spec %s due to err: %s', file_or_url, repr(e))
logger.warning('OAS/Swagger file is invalid!')
logger.error(
'Failed to validate spec %s due to err: %s', file_or_url, repr(e)
)
self.valid = False

self.is_v3 = self._get_oas_version() == 3

self.hosts = []

def _get_oas_version(self):
if self.specification.get('openapi'):
return 3
elif self.specification.get('swagger'):
return 2
raise InvalidSpecVersion("only openapi and swagger specs are supported for now")
raise InvalidSpecVersion('only openapi and swagger specs are supported for now')

def _get_endpoints(self):
'''Returns list of endpoint paths along with HTTP methods allowed'''
Expand Down
Loading

0 comments on commit ce7086c

Please sign in to comment.