Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev RELEASE: v0.17.3 #87

Merged
merged 11 commits into from
Apr 27, 2024
36 changes: 36 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec
- [x] Broken Access Control
- [x] Basic Command Injection
- [x] Basic XSS/HTML Injection test
- [x] Basic SSTI test
- [ ] Broken Authentication

## Features
Expand All @@ -28,13 +29,48 @@ Automatically Tests for vulnerabilities after generating tests from openapi spec
- Proxy Support
- Secure Dockerized Project for Easy Usage
- Open Source Tool with MIT License
- Github Action

## Demo

[![asciicast](https://asciinema.org/a/9MSwl7UafIVT3iJn13OcvWXeF.svg)](https://asciinema.org/a/9MSwl7UafIVT3iJn13OcvWXeF)

> Note: The columns for 'data_leak' and 'result' in the table represent independent aspects. It's possible for there to be a data leak in the endpoint, yet the result for that endpoint may still be marked as 'Success'. This is because the 'result' column doesn't necessarily reflect the overall test result; it may indicate success even in the presence of a data leak.

## Github Action

- Create github action secret `url` for your repo
- Setup github action workflow in your repo `.github/workflows/offat.yml`

```yml
name: OWASP OFFAT Sample Workflow

on:
push:
branches:
- dev
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: "download swagger/OAS file"
run: curl ${url} -o /tmp/swagger.json
env:
url: ${{ secrets.url }}

- name: "OWASP OFFAT CICD Scanner"
uses: OWASP/OFFAT@main # OWASP/[email protected]
with:
file: /tmp/swagger.json # or ${{ secrets.url }}
rate_limit: 120
artifact_retention_days: 1
```

> Prefer locking action to specific version `OWASP/[email protected]` instead of using `OWASP/OFFAT@main` and bump OFFAT action version after testing.

## PyPi Downloads

| Period | Count |
Expand Down
61 changes: 25 additions & 36 deletions src/offat/config_data_handler.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,23 @@
"""
Module contains the functions to validate the test
configuration data and populate user data for tests.
"""
from copy import deepcopy
from .logger import logger
from .utils import update_values


def overwrite_user_params(list1: list[dict], list2: list[dict]) -> list[dict]:
def validate_config_file_data(test_config_data: dict):
"""
Update values in list1 based on the corresponding "name" values in list2.
Validates the provided test configuration data.

Args:
list1 (list of dict): The list of dictionaries to be updated.
list2 (list of dict): The list of dictionaries containing values to update from.
test_config_data (dict): The test configuration data to be validated.

Returns:
list of dict: The updated list1 with values from list2.

Example:
```python
list1 = [{'name': 'id', 'value': 67}, {'name': 'email', 'value': '[email protected]'}]
list2 = [{'name': 'id', 'value': 10}, {'name': 'email', 'value': '[email protected]'}]
updated_list = update_values(list1, list2)
print(updated_list)
# Output: [{'name': 'id', 'value': 10}, {'name': 'email', 'value': '[email protected]'}]
```
"""
# Create a dictionary for faster lookup
lookup_dict = {item['name']: item['value'] for item in list2}

# Update values in list1 using index lookup
for item in list1:
if item['name'] in lookup_dict:
item['value'] = lookup_dict[item['name']]

return list1
bool or dict: Returns False if the data is invalid, otherwise returns the validated test configuration data.


def validate_config_file_data(test_config_data: dict):
"""
if not isinstance(test_config_data, dict):
logger.warning('Invalid data format')
return False
Expand All @@ -42,9 +26,7 @@ def validate_config_file_data(test_config_data: dict):
logger.warning('Error Occurred While reading file: %s', test_config_data)
return False

if not test_config_data.get(
'actors',
):
if not test_config_data.get('actors'):
logger.warning('actors are required')
return False

Expand All @@ -57,6 +39,17 @@ def validate_config_file_data(test_config_data: dict):


def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
"""
Populates user data for tests.

Args:
actor_data (dict): The data of the actor.
actor_name (str): The name of the actor.
tests (list[dict]): The list of tests.

Returns:
list[dict]: The updated list of tests.
"""
tests = deepcopy(tests)
headers = actor_data.get('request_headers', [])
body_params = actor_data.get('body', [])
Expand All @@ -69,15 +62,11 @@ def populate_user_data(actor_data: dict, actor_name: str, tests: list[dict]):
request_headers[header.get('name')] = header.get('value')

for test in tests:
test['body_params'] = overwrite_user_params(
deepcopy(test['body_params']), body_params
)
test['query_params'] = overwrite_user_params(
test['body_params'] = update_values(deepcopy(test['body_params']), body_params)
test['query_params'] = update_values(
deepcopy(test['query_params']), query_params
)
test['path_params'] += overwrite_user_params(
deepcopy(test['path_params']), path_params
)
test['path_params'] += update_values(deepcopy(test['path_params']), path_params)
# for post test processing tests such as broken authentication
test['test_actor_name'] = actor_name
if test.get('kwargs', {}).get('headers', {}).items():
Expand Down
35 changes: 22 additions & 13 deletions src/offat/report/templates/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@


class TestResultTable:
def __init__(self, table_width_percentage: float = 98, ) -> None:
def __init__(
self,
table_width_percentage: float = 98,
) -> None:
self.console = console
self.table_width_percentage = table_width_percentage

Expand All @@ -19,7 +22,10 @@ def extract_result_table_cols(self, results: list[dict]) -> list[str]:
return sorted({key for dictionary in results for key in dictionary.keys()})

def generate_result_cols(self, results_list: list[dict]) -> list[Column]:
return [Column(header=col_header, overflow='fold') for col_header in self.extract_result_table_cols(results_list)]
return [
Column(header=col_header, overflow='fold')
for col_header in self.extract_result_table_cols(results_list)
]

def generate_result_table(self, results: list, filter_passed_results: bool = True):
results = self._sanitize_results(results, filter_passed_results)
Expand All @@ -29,23 +35,28 @@ def generate_result_table(self, results: list, filter_passed_results: bool = Tru
for result in results:
table_row = []
for col in cols:
table_row.append(
str(result.get(col.header, '[red]:bug: - [/red]')))
table_row.append(str(result.get(col.header, '[red]:bug: - [/red]')))
table.add_row(*table_row)

return table

def _sanitize_results(self, results: list, filter_passed_results: bool = True, is_leaking_data: bool = False):
def _sanitize_results(
self,
results: list,
filter_passed_results: bool = True,
is_leaking_data: bool = False,
):
if filter_passed_results:
results = list(filter(lambda x: not x.get(
'result') or x.get('data_leak'), results))
results = list(
filter(lambda x: not x.get('result') or x.get('data_leak'), results)
)

# remove keys based on conditions or update their values
for result in results:
if result['result']:
result['result'] = u"[bold green]Passed \u2713[/bold green]"
result['result'] = '[bold green]Passed \u2713[/bold green]'
else:
result['result'] = u"[bold red]Failed \u00d7[/bold red]"
result['result'] = '[bold red]Failed \u00d7[/bold red]'

if not is_leaking_data:
del result['response_headers']
Expand All @@ -65,16 +76,14 @@ def _sanitize_results(self, results: list, filter_passed_results: bool = True, i
del result['response_match_regex']

if result.get('data_leak'):
result['data_leak'] = u"[bold red]Leak Found \u00d7[/bold red]"
result['data_leak'] = '[bold red]Leak Found \u00d7[/bold red]'
else:
result['data_leak'] = u"[bold green]No Leak \u2713[/bold green]"
result['data_leak'] = '[bold green]No Leak \u2713[/bold green]'

if not isinstance(result.get('malicious_payload'), str):
del result['malicious_payload']

del result['url']
del result['args']
del result['kwargs']
del result['test_name']
del result['response_filter']
del result['body_params']
Expand Down
Loading
Loading