-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Related to #98 Add unit tests for various components and create a PR gate. * **Unit Tests:** - Add `tests/test_storage_service.py` with unit tests for `StorageService` class. - Add `tests/test_zap_scanner.py` with unit tests for `ZapScanner` class. - Add `tests/test_nexpose_scanner.py` with unit tests for `NexposeScanner` class. - Add `tests/test_openvas_scanner.py` with unit tests for `OpenVASScanner` class. - Add `tests/test_main.py` with unit tests for `main.py`. - Add `tests/test_scanner.py` with unit tests for `Scanner` class. - Add `tests/test_init.py` with unit tests for `__init__.py`. * **GitHub Actions Workflow:** - Add `.github/workflows/test.yml` to run unit tests on PRs. - Set up Python environment using `actions/setup-python`. - Check out the repository using `actions/checkout`. - Install dependencies using `pip`. - Run unit tests using `pytest`. --- For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/vs4vijay/ScanMaster/issues/98?shareId=XXXX-XXXX-XXXX-XXXX).
- Loading branch information
Showing
8 changed files
with
557 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
name: Run Unit Tests | ||
|
||
on: | ||
pull_request: | ||
branches: | ||
- main | ||
|
||
jobs: | ||
test: | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- name: Check out repository | ||
uses: actions/checkout@v2 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v2 | ||
with: | ||
python-version: '3.x' | ||
|
||
- name: Install dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
pip install -r requirements.txt | ||
- name: Run tests | ||
run: | | ||
pytest |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import unittest | ||
|
||
class TestInit(unittest.TestCase): | ||
|
||
def test_import(self): | ||
try: | ||
import core | ||
import scanners | ||
except ImportError: | ||
self.fail("Failed to import core or scanners module") | ||
|
||
def test_initialization(self): | ||
try: | ||
from core import storage_service | ||
from scanners import zap_scanner, nexpose_scanner, openvas_scanner | ||
except ImportError: | ||
self.fail("Failed to initialize core or scanners module") | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import unittest | ||
from unittest.mock import patch, MagicMock | ||
import main | ||
|
||
class TestMain(unittest.TestCase): | ||
|
||
@patch('main.ZapScanner') | ||
@patch('main.NexposeScanner') | ||
@patch('main.OpenVASScanner') | ||
def setUp(self, MockZapScanner, MockNexposeScanner, MockOpenVASScanner): | ||
self.mock_zap_scanner = MockZapScanner.return_value | ||
self.mock_nexpose_scanner = MockNexposeScanner.return_value | ||
self.mock_openvas_scanner = MockOpenVASScanner.return_value | ||
|
||
def test_main_with_target(self): | ||
config = { | ||
'scan_name': 'test_scan', | ||
'target': 'http://example.com', | ||
'pause': False, | ||
'resume': False | ||
} | ||
result = main.main(config) | ||
self.assertTrue(result) | ||
self.mock_zap_scanner.start.assert_called_once_with(config['scan_name'], config['target']) | ||
self.mock_nexpose_scanner.start.assert_called_once_with(config['scan_name'], config['target']) | ||
self.mock_openvas_scanner.start.assert_called_once_with(config['scan_name'], config['target']) | ||
|
||
def test_main_with_pause(self): | ||
config = { | ||
'scan_name': 'test_scan', | ||
'target': None, | ||
'pause': True, | ||
'resume': False | ||
} | ||
result = main.main(config) | ||
self.assertTrue(result) | ||
self.mock_zap_scanner.pause.assert_called_once_with(config['scan_name']) | ||
self.mock_nexpose_scanner.pause.assert_called_once_with(config['scan_name']) | ||
self.mock_openvas_scanner.pause.assert_called_once_with(config['scan_name']) | ||
|
||
def test_main_with_resume(self): | ||
config = { | ||
'scan_name': 'test_scan', | ||
'target': None, | ||
'pause': False, | ||
'resume': True | ||
} | ||
result = main.main(config) | ||
self.assertTrue(result) | ||
self.mock_zap_scanner.resume.assert_called_once_with(config['scan_name']) | ||
self.mock_nexpose_scanner.resume.assert_called_once_with(config['scan_name']) | ||
self.mock_openvas_scanner.resume.assert_called_once_with(config['scan_name']) | ||
|
||
def test_main_with_no_target_pause_resume(self): | ||
config = { | ||
'scan_name': 'test_scan', | ||
'target': None, | ||
'pause': False, | ||
'resume': False | ||
} | ||
result = main.main(config) | ||
self.assertTrue(result) | ||
self.mock_zap_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) | ||
self.mock_zap_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) | ||
self.mock_nexpose_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) | ||
self.mock_nexpose_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) | ||
self.mock_openvas_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) | ||
self.mock_openvas_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import unittest | ||
from unittest.mock import patch, MagicMock | ||
from scanners.nexpose_scanner import NexposeScanner | ||
|
||
class TestNexposeScanner(unittest.TestCase): | ||
|
||
@patch('scanners.nexpose_scanner.rapid7vmconsole') | ||
@patch('scanners.nexpose_scanner.StorageService') | ||
def setUp(self, MockStorageService, MockRapid7vmconsole): | ||
self.mock_rapid7 = MockRapid7vmconsole.return_value | ||
self.mock_storage_service = MockStorageService.return_value | ||
self.nexpose_scanner = NexposeScanner() | ||
|
||
def test_start(self): | ||
scan_name = 'test_scan' | ||
target = 'http://example.com' | ||
self.nexpose_scanner.scan = MagicMock(return_value=True) | ||
result = self.nexpose_scanner.start(scan_name, target) | ||
self.assertTrue(result) | ||
self.nexpose_scanner.scan.assert_called_once_with(scan_name, target) | ||
|
||
def test_scan(self): | ||
scan_name = 'test_scan' | ||
target = 'http://example.com' | ||
scan_id = 1 | ||
site_id = 2 | ||
self.mock_rapid7vmconsole.SiteApi.return_value.create_site.return_value.id = site_id | ||
self.mock_rapid7vmconsole.ScanApi.return_value.start_scan.return_value.id = scan_id | ||
scan_data = { | ||
'scan_name': scan_name, | ||
'scan_id': '', | ||
'target': target, | ||
'status': '' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = None | ||
result = self.nexpose_scanner.scan(scan_name, target) | ||
self.assertEqual(result['NEXPOSE']['nexpose_id'], scan_id) | ||
self.assertEqual(result['NEXPOSE']['site_id'], site_id) | ||
self.assertEqual(result['NEXPOSE']['scan_status']['status'], 'INPROGRESS') | ||
self.mock_storage_service.add.assert_called_once_with(scan_data) | ||
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result) | ||
|
||
def test_get_scan_status(self): | ||
scan_name = 'test_scan' | ||
scan_data = { | ||
'NEXPOSE': { | ||
'nexpose_id': 1, | ||
'scan_status': {} | ||
}, | ||
'target': 'http://example.com' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
self.mock_rapid7vmconsole.ScanApi.return_value.get_scan.return_value.status = 'finished' | ||
scan_status_list = [] | ||
result = self.nexpose_scanner.get_scan_status(scan_name, scan_status_list) | ||
self.assertEqual(result[0]['scanner'], 'Nexpose') | ||
self.assertEqual(result[0]['status'], 'COMPLETE') | ||
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data) | ||
|
||
def test_get_scan_results(self): | ||
scan_name = 'test_scan' | ||
scan_data = { | ||
'NEXPOSE': { | ||
'nexpose_id': 1, | ||
'report_id': 2, | ||
'report_instance_id': 3, | ||
'scan_status': {'status': 'COMPLETE'} | ||
}, | ||
'target': 'http://example.com' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
report = '<vulnerabilities><vulnerability title="vuln1" cvssScore="5.0"><references><reference source="CVE">CVE-1234</reference></references><description><ContainerBlockElement><Paragraph>desc</Paragraph></ContainerBlockElement></description><solution><ContainerBlockElement><Paragraph>sol</Paragraph></ContainerBlockElement></solution></vulnerability></vulnerabilities>' | ||
self.mock_rapid7vmconsole.ReportApi.return_value.download_report.return_value = report | ||
scan_results = {} | ||
result = self.nexpose_scanner.get_scan_results(scan_name, scan_results) | ||
self.assertEqual(result['vuln1']['name'], 'vuln1') | ||
self.assertEqual(result['vuln1']['severity'], 5.0) | ||
self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234') | ||
self.assertEqual(result['vuln1']['description'], 'desc') | ||
self.assertEqual(result['vuln1']['solution'], 'sol') | ||
|
||
def test_pause(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'nexpose_id': 1} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.nexpose_scanner.pause(scan_name) | ||
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) | ||
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'pause') | ||
|
||
def test_resume(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'nexpose_id': 1} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.nexpose_scanner.resume(scan_name) | ||
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) | ||
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'resume') | ||
|
||
def test_stop(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'nexpose_id': 1} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.nexpose_scanner.stop(scan_name) | ||
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) | ||
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'stop') | ||
|
||
def test_remove(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'nexpose_id': 1} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.nexpose_scanner.remove(scan_name) | ||
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) | ||
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'remove') | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import unittest | ||
from unittest.mock import patch, MagicMock | ||
from scanners.openvas_scanner import OpenVASScanner | ||
|
||
class TestOpenVASScanner(unittest.TestCase): | ||
|
||
@patch('scanners.openvas_scanner.Gmp') | ||
@patch('scanners.openvas_scanner.StorageService') | ||
def setUp(self, MockStorageService, MockGmp): | ||
self.mock_gmp = MockGmp.return_value | ||
self.mock_storage_service = MockStorageService.return_value | ||
self.openvas_scanner = OpenVASScanner() | ||
|
||
def test_start(self): | ||
scan_name = 'test_scan' | ||
target = 'http://example.com' | ||
self.openvas_scanner.scan = MagicMock(return_value=True) | ||
result = self.openvas_scanner.start(scan_name, target) | ||
self.assertTrue(result) | ||
self.openvas_scanner.scan.assert_called_once_with(scan_name, target) | ||
|
||
def test_scan(self): | ||
scan_name = 'test_scan' | ||
target = 'http://example.com' | ||
target_id = '1234' | ||
self.mock_gmp.create_target.return_value.get.return_value = target_id | ||
scan_data = { | ||
'scan_name': scan_name, | ||
'scan_id': '', | ||
'target': target, | ||
'status': '' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = None | ||
result = self.openvas_scanner.scan(scan_name, target) | ||
self.assertEqual(result['OPENVAS']['openvas_id'], target_id) | ||
self.assertEqual(result['OPENVAS']['scan_status']['status'], 'INPROGRESS') | ||
self.mock_storage_service.add.assert_called_once_with(scan_data) | ||
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result) | ||
|
||
def test_get_scan_status(self): | ||
scan_name = 'test_scan' | ||
scan_data = { | ||
'OPENVAS': { | ||
'openvas_id': '1234', | ||
'scan_status': {} | ||
}, | ||
'target': 'http://example.com' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
self.mock_gmp.get_report.return_value = True | ||
scan_status_list = [] | ||
result = self.openvas_scanner.get_scan_status(scan_name, scan_status_list) | ||
self.assertEqual(result[0]['scanner'], 'OpenVAS') | ||
self.assertEqual(result[0]['status'], 'COMPLETE') | ||
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data) | ||
|
||
def test_get_scan_results(self): | ||
scan_name = 'test_scan' | ||
scan_data = { | ||
'OPENVAS': { | ||
'openvas_id': '1234', | ||
'report_id': '5678', | ||
'scan_status': {'status': 'COMPLETE'} | ||
}, | ||
'target': 'http://example.com' | ||
} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
report = '<get_reports_response><report><report><results><result><name>vuln1</name><nvt><cvss_base>5.0</cvss_base><cve>CVE-1234</cve></nvt><threat>High</threat><description>desc</description></result></results></report></report></get_reports_response>' | ||
self.mock_gmp.get_report.return_value = report | ||
scan_results = {} | ||
result = self.openvas_scanner.get_scan_results(scan_name, scan_results) | ||
self.assertEqual(result['vuln1']['name'], 'vuln1') | ||
self.assertEqual(result['vuln1']['severity'], 5.0) | ||
self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234') | ||
self.assertEqual(result['vuln1']['description'], 'desc') | ||
self.assertEqual(result['vuln1']['risk'], 'High') | ||
|
||
def test_pause(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'openvas_id': '1234'} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.openvas_scanner.pause(scan_name) | ||
self.assertIsNone(result) | ||
|
||
def test_resume(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'openvas_id': '1234'} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.openvas_scanner.resume(scan_name) | ||
self.assertIsNone(result) | ||
|
||
def test_stop(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'openvas_id': '1234'} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.openvas_scanner.stop(scan_name) | ||
self.assertIsNone(result) | ||
|
||
def test_remove(self): | ||
scan_name = 'test_scan' | ||
scan_data = {'openvas_id': '1234'} | ||
self.mock_storage_service.get_by_name.return_value = scan_data | ||
result = self.openvas_scanner.remove(scan_name) | ||
self.assertIsNone(result) | ||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.