Skip to content

Commit

Permalink
Merge pull request #87 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev RELEASE: v0.17.3
  • Loading branch information
dmdhrumilmistry authored Apr 27, 2024
2 parents b06914b + 529038b commit edea41d
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 195 deletions.
36 changes: 36 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec
- [x] Broken Access Control
- [x] Basic Command Injection
- [x] Basic XSS/HTML Injection test
- [x] Basic SSTI test
- [ ] Broken Authentication

## Features
Expand All @@ -28,13 +29,48 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec
- Proxy Support
- Secure Dockerized Project for Easy Usage
- Open Source Tool with MIT License
- Github Action

## Demo

[![asciicast](https://asciinema.org/a/9MSwl7UafIVT3iJn13OcvWXeF.svg)](https://asciinema.org/a/9MSwl7UafIVT3iJn13OcvWXeF)

> Note: The columns for 'data_leak' and 'result' in the table represent independent aspects. It's possible for there to be a data leak in the endpoint, yet the result for that endpoint may still be marked as 'Success'. This is because the 'result' column doesn't necessarily reflect the overall test result; it may indicate success even in the presence of a data leak.
## Github Action

- Create github action secret `url` for your repo
- Setup github action workflow in your repo `.github/workflows/offat.yml`

```yml
name: OWASP OFFAT Sample Workflow

on:
push:
branches:
- dev
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: "download swagger/OAS file"
run: curl ${url} -o /tmp/swagger.json
env:
url: ${{ secrets.url }}

- name: "OWASP OFFAT CICD Scanner"
uses: OWASP/OFFAT@main # OWASP/[email protected]
with:
file: /tmp/swagger.json # or ${{ secrets.url }}
rate_limit: 120
artifact_retention_days: 1
```
> Prefer locking action to specific version `OWASP/[email protected]` instead of using `OWASP/OFFAT@main` and bump OFFAT action version after testing.

## PyPi Downloads

| Period | Count |
Expand Down
61 changes: 25 additions & 36 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,23 @@
"""
Module contains the functions to validate the test
configuration data and populate user data for tests.
"""
from copy import deepcopy
from .logger import logger
from .utils import update_values


def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]:
def validate_config_file_data(test_config_data: dict):
"""
Update values in list1 based on the corresponding "name" values in list2.
Validates the provided test configuration data.
Args:
list1 (list of dict): The list of dictionaries to be updated.
list2 (list of dict): The list of dictionaries containing values to update from.
test_config_data (dict): The test configuration data to be validated.
Returns:
list of dict: The updated list1 with values from list2.
Example:
```python
list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': '[email protected]'}]
list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': '[email protected]'}]
updated_list = update_values(list1, list2)
print(updated_list)
# Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': '[email protected]'}]
```
"""
# Create a dictionary for faster lookup
lookup_dict = {item['name']: item['value'] for item in list2}

# Update values in list1 using index lookup
for item in list1:
if item['name'] in lookup_dict:
item['value'] = lookup_dict[item['name']]

return list1
bool or dict: Returns False if the data is invalid, otherwise returns the validated test configuration data.

def validate_config_file_data(test_config_data: dict):
"""
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
return False
Expand All @@ -42,9 +26,7 @@ def validate_config_file_data(test_config_data: dict):
logger.warning('Error Occurred While reading file: %s', test_config_data)
return False

if not test_config_data.get(
'actors',
):
if not test_config_data.get('actors'):
logger.warning('actors are required')
return False

Expand All @@ -57,6 +39,17 @@ def validate_config_file_data(test_config_data: dict):


def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
"""
Populates user data for tests.
Args:
actor_data (dict): The data of the actor.
actor_name (str): The name of the actor.
tests (list[dict]): The list of tests.
Returns:
list[dict]: The updated list of tests.
"""
tests = deepcopy(tests)
headers = actor_data.get('request_headers', [])
body_params = actor_data.get('body', [])
Expand All @@ -69,15 +62,11 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
request_headers[header.get('name')] = header.get('value')

for test in tests:
test['body_params'] = overwrite_user_params(
deepcopy(test['body_params']), body_params
)
test['query_params'] = overwrite_user_params(
test['body_params'] = update_values(deepcopy(test['body_params']), body_params)
test['query_params'] = update_values(
deepcopy(test['query_params']), query_params
)
test['path_params'] += overwrite_user_params(
deepcopy(test['path_params']), path_params
)
test['path_params'] += update_values(deepcopy(test['path_params']), path_params)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
Expand Down
35 changes: 22 additions & 13 deletions src/offat/report/templates/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@


class TestResultTable:
def __init__(self, table_width_percentage: float = 98, ) -> None:
def __init__(
self,
table_width_percentage: float = 98,
) -> None:
self.console = console
self.table_width_percentage = table_width_percentage

Expand All @@ -19,7 +22,10 @@ def extract_result_table_cols(self, results: list[dict]) -> list[str]:
return sorted({key for dictionary in results for key in dictionary.keys()})

def generate_result_cols(self, results_list: list[dict]) -> list[Column]:
return [Column(header=col_header, overflow='fold') for col_header in self.extract_result_table_cols(results_list)]
return [
Column(header=col_header, overflow='fold')
for col_header in self.extract_result_table_cols(results_list)
]

def generate_result_table(self, results: list, filter_passed_results: bool = True):
results = self._sanitize_results(results, filter_passed_results)
Expand All @@ -29,23 +35,28 @@ def generate_result_table(self, results: list, filter_passed_results: bool = Tru
for result in results:
table_row = []
for col in cols:
table_row.append(
str(result.get(col.header, '[red]:bug: - [/red]')))
table_row.append(str(result.get(col.header, '[red]:bug: - [/red]')))
table.add_row(*table_row)

return table

def _sanitize_results(self, results: list, filter_passed_results: bool = True, is_leaking_data: bool = False):
def _sanitize_results(
self,
results: list,
filter_passed_results: bool = True,
is_leaking_data: bool = False,
):
if filter_passed_results:
results = list(filter(lambda x: not x.get(
'result') or x.get('data_leak'), results))
results = list(
filter(lambda x: not x.get('result') or x.get('data_leak'), results)
)

# remove keys based on conditions or update their values
for result in results:
if result['result']:
result['result'] = u"[bold green]Passed \u2713[/bold green]"
result['result'] = '[bold green]Passed \u2713[/bold green]'
else:
result['result'] = u"[bold red]Failed \u00d7[/bold red]"
result['result'] = '[bold red]Failed \u00d7[/bold red]'

if not is_leaking_data:
del result['response_headers']
Expand All @@ -65,16 +76,14 @@ def _sanitize_results(self, results: list, filter_passed_results: bool = True, i
del result['response_match_regex']

if result.get('data_leak'):
result['data_leak'] = u"[bold red]Leak Found \u00d7[/bold red]"
result['data_leak'] = '[bold red]Leak Found \u00d7[/bold red]'
else:
result['data_leak'] = u"[bold green]No Leak \u2713[/bold green]"
result['data_leak'] = '[bold green]No Leak \u2713[/bold green]'

if not isinstance(result.get('malicious_payload'), str):
del result['malicious_payload']

del result['url']
del result['args']
del result['kwargs']
del result['test_name']
del result['response_filter']
del result['body_params']
Expand Down
Loading

0 comments on commit edea41d

Please sign in to comment.