diff --git a/w3af/conftest.py b/w3af/conftest.py new file mode 100644 index 0000000000..d00b6fe14e --- /dev/null +++ b/w3af/conftest.py @@ -0,0 +1,30 @@ +import pytest + +from w3af.core.data.dc.headers import Headers +from w3af.core.data.parsers.doc.url import URL +from w3af.core.data.url.HTTPRequest import HTTPRequest +from w3af.core.data.url.HTTPResponse import HTTPResponse + + +@pytest.fixture +def http_response(): + url = URL('http://example.com/') + headers = Headers([('content-type', 'text/html')]) + return HTTPResponse( + 200, + '', + headers, + url, + url, + ) + + +@pytest.fixture +def http_request(): + url = URL('http://example.com/') + headers = Headers([('content-type', 'text/html')]) + return HTTPRequest( + url, + headers, + method='GET', + ) diff --git a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py index e87ae5734d..12bd36ac85 100644 --- a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py +++ b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py @@ -24,6 +24,7 @@ from w3af.core.controllers.chrome.tests.helpers import ExtendedHttpRequestHandler +@pytest.mark.skip('uses internet') class AngularBasicTest(BaseChromeCrawlerTest): def test_angular_click(self): self._unittest_setup(AngularButtonClickRequestHandler) diff --git a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_react_basics.py b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_react_basics.py index 430534c6b4..fd0ad7e444 100644 --- a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_react_basics.py +++ b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_react_basics.py @@ -23,6 +23,7 @@ from w3af.core.controllers.chrome.crawler.tests.base import BaseChromeCrawlerTest +@pytest.mark.skip('uses internet') class ReactBasicTest(BaseChromeCrawlerTest): def test_react_hello_world_app(self): url = 'http://react-hello-world-app.surge.sh/' diff --git a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_vue_basics.py b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_vue_basics.py index a7611ac1cf..6eeebdc577 100644 --- a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_vue_basics.py +++ b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_vue_basics.py @@ -18,10 +18,13 @@ along with w3af; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +import pytest + from w3af.core.controllers.chrome.crawler.tests.base import BaseChromeCrawlerTest class ReactBasicTest(BaseChromeCrawlerTest): + @pytest.mark.skip('uses internet') def test_vue_todo_list(self): url = 'http://vue-todo-test.surge.sh' found_uris = self._crawl(url) diff --git a/w3af/core/controllers/chrome/devtools/exceptions.py b/w3af/core/controllers/chrome/devtools/exceptions.py index 38ac0e3561..45214b34c6 100644 --- a/w3af/core/controllers/chrome/devtools/exceptions.py +++ b/w3af/core/controllers/chrome/devtools/exceptions.py @@ -27,3 +27,11 @@ class ChromeInterfaceException(Exception): class ChromeInterfaceTimeout(Exception): pass + + +class ChromeScriptRuntimeException(Exception): + def __init__(self, message, function_called=None, *args): + if function_called: + message = "function: {}, exception: {}".format(function_called, message) + super(ChromeScriptRuntimeException, self).__init__(message, *args) + pass diff --git a/w3af/core/controllers/chrome/instrumented/frame_manager.py b/w3af/core/controllers/chrome/instrumented/frame_manager.py index 14f660559a..f96f46e8c6 100644 --- a/w3af/core/controllers/chrome/instrumented/frame_manager.py +++ b/w3af/core/controllers/chrome/instrumented/frame_manager.py @@ -166,7 +166,7 @@ def _on_frame_navigated(self, message): # URL all the child frames are removed from Chrome, we should remove # them from our code too to mirror state if frame: - for child_frame_id, child_frame in frame.child_frames: + for child_frame_id, child_frame in frame.child_frames.items(): child_frame.detach(self) frame.set_navigated() diff --git a/w3af/core/controllers/chrome/instrumented/main.py b/w3af/core/controllers/chrome/instrumented/main.py index 41262e49ba..9b3672aa95 100644 --- a/w3af/core/controllers/chrome/instrumented/main.py +++ b/w3af/core/controllers/chrome/instrumented/main.py @@ -23,6 +23,7 @@ import json import w3af.core.controllers.output_manager as om +from w3af.core.controllers.chrome.devtools.exceptions import ChromeScriptRuntimeException from w3af.core.data.parsers.doc.url import URL from w3af.core.controllers.chrome.instrumented.instrumented_base import InstrumentedChromeBase @@ -297,11 +298,20 @@ def dispatch_js_event(self, selector, event_type): return True - def get_login_forms(self): + def get_login_forms(self, exact_css_selectors): """ + :param dict exact_css_selectors: Optional parameter containing css selectors + for part of form like username input or login button. :return: Yield LoginForm instances """ - result = self.js_runtime_evaluate('window._DOMAnalyzer.getLoginForms()') + func = ( + 'window._DOMAnalyzer.getLoginForms("{}", "{}")' + ) + func = func.format( + exact_css_selectors.get('username_input', '').replace('"', '\\"'), + exact_css_selectors.get('login_button', '').replace('"', '\\"'), + ) + result = self.js_runtime_evaluate(func) if result is None: raise EventTimeout('The event execution timed out') @@ -316,11 +326,20 @@ def get_login_forms(self): yield login_form - def get_login_forms_without_form_tags(self): + def get_login_forms_without_form_tags(self, exact_css_selectors): """ + :param dict exact_css_selectors: Optional parameter containing css selectors + for part of form like username input or login button. :return: Yield LoginForm instances """ - result = self.js_runtime_evaluate('window._DOMAnalyzer.getLoginFormsWithoutFormTags()') + func = ( + 'window._DOMAnalyzer.getLoginFormsWithoutFormTags("{}", "{}")' + ) + func = func.format( + exact_css_selectors.get('username_input', '').replace('"', '\\"'), + exact_css_selectors.get('login_button', '').replace('"', '\\"'), + ) + result = self.js_runtime_evaluate(func) if result is None: raise EventTimeout('The event execution timed out') @@ -406,9 +425,9 @@ def focus(self, selector): if result is None: return None - node_ids = result.get('result', {}).get('nodeIds', None) + node_ids = result.get('result', {}).get('nodeIds') - if node_ids is None: + if not node_ids: msg = ('The call to chrome.focus() failed.' ' CSS selector "%s" returned no nodes (did: %s)') args = (selector, self.debugging_id) @@ -589,19 +608,13 @@ def js_runtime_evaluate(self, expression, timeout=5): timeout=timeout) # This is a rare case where the DOM is not present - if result is None: - return None - - if 'result' not in result: - return None - - if 'result' not in result['result']: - return None - - if 'value' not in result['result']['result']: - return None - - return result['result']['result']['value'] + runtime_exception = result.get('result', {}).get('exceptionDetails') + if runtime_exception: + raise ChromeScriptRuntimeException( + runtime_exception, + function_called=expression + ) + return result.get('result', {}).get('result', {}).get('value', None) def get_js_variable_value(self, variable_name): """ diff --git a/w3af/core/controllers/chrome/js/dom_analyzer.js b/w3af/core/controllers/chrome/js/dom_analyzer.js index b8077b60a3..9b113de676 100644 --- a/w3af/core/controllers/chrome/js/dom_analyzer.js +++ b/w3af/core/controllers/chrome/js/dom_analyzer.js @@ -330,7 +330,7 @@ var _DOMAnalyzer = _DOMAnalyzer || { if( !_DOMAnalyzer.eventIsValidForTagName( tag_name, type ) ) return false; let selector = OptimalSelect.getSingleSelector(element); - + // node_type is https://developer.mozilla.org/en-US/docs/Web/API/Node/nodeType#Node_type_constants _DOMAnalyzer.event_listeners.push({"tag_name": tag_name, "node_type": element.nodeType, @@ -865,6 +865,48 @@ var _DOMAnalyzer = _DOMAnalyzer || { return false; }, + /** + * This is naive function which takes parentElement (the login form) and + * tries to find username input field within it. + * @param {Node} parentElement - parent element to scope to document.querySelectorAll() + * @param {String} exactSelector - optional CSS selector. If provided prevents + * using standard selectors + * @returns {NodeList} - result of querySelectorAll() + */ + _getUsernameInput(parentElement, exactSelector = '') { + if (exactSelector) { + return document.querySelectorAll(exactSelector, parentElement); + } + result = document.querySelectorAll("input[type='email']", parentElement); + if (!result.length) { + result = document.querySelectorAll("input[type='text']", parentElement); + } + return result; + }, + + /** + * This is naive function which takes parentElement (the login form) and tries + * to find submit button within it. + * @param {Node} parentElement - parent element to scope to document.querySelectorAll() + * @param {String} exactSelector - optional CSS selector. If provided prevents + * using standard selectors + * @returns {NodeList} - result of querySelectorAll() + */ + _getSubmitButton(parentElement, exactSelector = '') { + if (exactSelector) { + return document.querySelectorAll(exactSelector, parentElement); + } + result = document.querySelectorAll("input[type='submit']", parentElement); + if (!result.length) { + result = document.querySelectorAll("button[type='submit']", parentElement); + } + // Maybe it's just normal button without type="submit"... + if (!result.length) { + result = document.querySelectorAll('button', parentElement); + } + return result; + }, + /** * Return the CSS selector for the login forms which exist in the DOM. * @@ -874,8 +916,12 @@ var _DOMAnalyzer = _DOMAnalyzer || { * - , and * - * + * @param {String} usernameCssSelector - CSS selector for username input. If + * provided we won't try to find username input automatically. + * @param {String} submitButtonCssSelector - CSS selector for submit button. If + * provided we won't try to find submit button autmatically. */ - getLoginForms: function () { + getLoginForms: function (usernameCssSelector = '', submitButtonCssSelector = '') { let login_forms = []; // First we identify the forms with a password field using a descendant Selector @@ -898,7 +944,7 @@ var _DOMAnalyzer = _DOMAnalyzer || { let form = forms[0]; // Finally we confirm that the form has a type=text input - let text_fields = document.querySelectorAll("input[type='text']", form) + let text_fields = this._getUsernameInput(form, usernameCssSelector); // Zero text fields is most likely a password-only login form // Two text fields or more is most likely a registration from @@ -906,7 +952,7 @@ var _DOMAnalyzer = _DOMAnalyzer || { if (text_fields.length !== 1) continue; // And if there is a submit button I want that selector too - let submit_fields = document.querySelectorAll("input[type='submit']", form) + let submit_fields = this._getSubmitButton(form, submitButtonCssSelector); let submit_selector = null; if (submit_fields.length !== 0) { @@ -936,8 +982,12 @@ var _DOMAnalyzer = _DOMAnalyzer || { * - , and * - * + * @param {String} usernameCssSelector - CSS selector for username input. If + * provided we won't try to find username input automatically. + * @param {String} submitButtonCssSelector - CSS selector for submit button. If + * provided we won't try to find submit button autmatically. */ - getLoginFormsWithoutFormTags: function () { + getLoginFormsWithoutFormTags: function (usernameCssSelector = '', submitButtonCssSelector = '') { let login_forms = []; // First we identify the password fields @@ -962,7 +1012,7 @@ var _DOMAnalyzer = _DOMAnalyzer || { // go up one more level, and so one. // // Find if this parent has a type=text input - let text_fields = document.querySelectorAll("input[type='text']", parent) + let text_fields = this._getUsernameInput(parent, usernameCssSelector); // Zero text fields is most likely a password-only login form // Two text fields or more is most likely a registration from @@ -974,7 +1024,7 @@ var _DOMAnalyzer = _DOMAnalyzer || { } // And if there is a submit button I want that selector too - let submit_fields = document.querySelectorAll("input[type='submit']", parent) + let submit_fields = this._getSubmitButton(parent, submitButtonCssSelector) let submit_selector = null; if (submit_fields.length !== 0) { @@ -999,6 +1049,12 @@ var _DOMAnalyzer = _DOMAnalyzer || { return JSON.stringify(login_forms); }, + clickOnSelector(exactSelector) { + let element = document.querySelector(exactSelector); + element.click(); + return 'success' + }, + sliceAndSerialize: function (filtered_event_listeners, start, count) { return JSON.stringify(filtered_event_listeners.slice(start, start + count)); }, @@ -1142,4 +1198,4 @@ var _DOMAnalyzer = _DOMAnalyzer || { }; -_DOMAnalyzer.initialize(); \ No newline at end of file +_DOMAnalyzer.initialize(); diff --git a/w3af/core/controllers/chrome/login/find_form/main.py b/w3af/core/controllers/chrome/login/find_form/main.py index 2ee45ad7f5..2e42e13c57 100644 --- a/w3af/core/controllers/chrome/login/find_form/main.py +++ b/w3af/core/controllers/chrome/login/find_form/main.py @@ -36,16 +36,24 @@ def __init__(self, chrome, debugging_id): self.chrome = chrome self.debugging_id = debugging_id - def find_forms(self): + def find_forms(self, css_selectors=None): """ + :param dict css_selectors: optional dict of css selectors used to find + elements of form (like username input or login button) :return: Yield forms as they are found by each strategy """ + if css_selectors: + msg = 'Form finder uses the CSS selectors: "%s" (did: %s)' + args = (css_selectors, self.debugging_id) + om.out.debug(msg % args) + identified_forms = [] for strategy_klass in self.STRATEGIES: - strategy = strategy_klass(self.chrome, self.debugging_id) + strategy = strategy_klass(self.chrome, self.debugging_id, css_selectors) try: + strategy.prepare() for form in strategy.find_forms(): if form in identified_forms: continue @@ -55,6 +63,6 @@ def find_forms(self): except Exception as e: msg = 'Form finder strategy %s raised exception: "%s" (did: %s)' args = (strategy.get_name(), - e, + repr(e), self.debugging_id) om.out.debug(msg % args) diff --git a/w3af/core/controllers/chrome/login/find_form/strategies/base_find_form_strategy.py b/w3af/core/controllers/chrome/login/find_form/strategies/base_find_form_strategy.py new file mode 100644 index 0000000000..6c635adc44 --- /dev/null +++ b/w3af/core/controllers/chrome/login/find_form/strategies/base_find_form_strategy.py @@ -0,0 +1,35 @@ +from w3af.core.controllers.chrome.instrumented.exceptions import EventTimeout + + +class BaseFindFormStrategy: + def __init__(self, chrome, debugging_id, exact_css_selectors=None): + """ + :param InstrumentedChrome chrome: + :param String debugging_id: + :param dict exact_css_selectors: Optional parameter containing css selectors + for part of form like username input or login button. + """ + self.chrome = chrome + self.debugging_id = debugging_id + self.exact_css_selectors = exact_css_selectors or {} + + def prepare(self): + """ + :raises EventTimeout: + Hook called before find_forms() + """ + form_activator_selector = self.exact_css_selectors.get('form_activator') + if form_activator_selector: + func = 'window._DOMAnalyzer.clickOnSelector("{}")'.format( + form_activator_selector.replace('"', '\\"') + ) + result = self.chrome.js_runtime_evaluate(func) + if result is None: + raise EventTimeout('The event execution timed out') + + def find_forms(self): + raise NotImplementedError + + @staticmethod + def get_name(): + return 'BaseFindFormStrategy' diff --git a/w3af/core/controllers/chrome/login/find_form/strategies/form_tag.py b/w3af/core/controllers/chrome/login/find_form/strategies/form_tag.py index bf47ba4a17..ec6da6aab0 100644 --- a/w3af/core/controllers/chrome/login/find_form/strategies/form_tag.py +++ b/w3af/core/controllers/chrome/login/find_form/strategies/form_tag.py @@ -19,12 +19,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +from w3af.core.controllers.chrome.login.find_form.strategies.base_find_form_strategy import \ + BaseFindFormStrategy -class FormTagStrategy(object): - def __init__(self, chrome, debugging_id): - self.chrome = chrome - self.debugging_id = debugging_id +class FormTagStrategy(BaseFindFormStrategy): def find_forms(self): """ @@ -37,7 +36,7 @@ def _simple_form_with_username_password_submit(self): """ :return: Yield forms that have username, password and submit inputs """ - for login_form in self.chrome.get_login_forms(): + for login_form in self.chrome.get_login_forms(self.exact_css_selectors): yield login_form @staticmethod diff --git a/w3af/core/controllers/chrome/login/find_form/strategies/password_and_parent.py b/w3af/core/controllers/chrome/login/find_form/strategies/password_and_parent.py index 1f64780502..4dbf7c654a 100644 --- a/w3af/core/controllers/chrome/login/find_form/strategies/password_and_parent.py +++ b/w3af/core/controllers/chrome/login/find_form/strategies/password_and_parent.py @@ -19,12 +19,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +from w3af.core.controllers.chrome.login.find_form.strategies.base_find_form_strategy import \ + BaseFindFormStrategy -class PasswordAndParentStrategy(object): - def __init__(self, chrome, debugging_id): - self.chrome = chrome - self.debugging_id = debugging_id +class PasswordAndParentStrategy(BaseFindFormStrategy): def find_forms(self): """ @@ -32,8 +31,9 @@ def find_forms(self): :return: Yield forms which are identified by the strategy algorithm """ - for login_form in self.chrome.get_login_forms_without_form_tags(): + for login_form in self.chrome.get_login_forms_without_form_tags(self.exact_css_selectors): yield login_form - def get_name(self): + @staticmethod + def get_name(): return 'PasswordAndParent' diff --git a/w3af/core/controllers/chrome/login/submit_form/main.py b/w3af/core/controllers/chrome/login/submit_form/main.py index b3954a5b92..f2370de798 100644 --- a/w3af/core/controllers/chrome/login/submit_form/main.py +++ b/w3af/core/controllers/chrome/login/submit_form/main.py @@ -19,11 +19,12 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +import traceback + from w3af.core.controllers import output_manager as om from w3af.core.controllers.chrome.login.submit_form.strategies.press_enter import PressEnterStrategy from w3af.core.controllers.chrome.login.submit_form.strategies.press_tab_enter import PressTabEnterStrategy -from w3af.core.controllers.chrome.login.submit_form.strategies.form_input_submit import FormInputSubmitStrategy class FormSubmitter(object): @@ -31,7 +32,7 @@ class FormSubmitter(object): STRATEGIES = [ PressEnterStrategy, PressTabEnterStrategy, - #FormInputSubmitStrategy + # FormInputSubmitStrategy ] def __init__(self, chrome, form, login_form_url, username, password, debugging_id): @@ -91,3 +92,4 @@ def _handle_exception(self, strategy, e): e, self.debugging_id) om.out.debug(msg % args) + om.out.debug(traceback.format_exc()) diff --git a/w3af/core/controllers/chrome/proxy/tests/test_proxy.py b/w3af/core/controllers/chrome/proxy/tests/test_proxy.py index 1dcf836e46..873f7f21dc 100644 --- a/w3af/core/controllers/chrome/proxy/tests/test_proxy.py +++ b/w3af/core/controllers/chrome/proxy/tests/test_proxy.py @@ -36,7 +36,7 @@ from w3af.core.data.url.extended_urllib import ExtendedUrllib -pytestmarks = pytest.mark.deprecated +pytestmark = pytest.mark.deprecated class TestProxy(unittest.TestCase): diff --git a/w3af/core/controllers/daemons/proxy/tests/test_proxy.py b/w3af/core/controllers/daemons/proxy/tests/test_proxy.py index 1d5b1dc837..e1ded8af38 100644 --- a/w3af/core/controllers/daemons/proxy/tests/test_proxy.py +++ b/w3af/core/controllers/daemons/proxy/tests/test_proxy.py @@ -54,7 +54,6 @@ def setUp(self): self.proxy_opener = urllib2.build_opener(proxy_handler, urllib2.HTTPHandler) - @pytest.mark.deprecated def tearDown(self): # Shutdown the proxy server self._proxy.stop() diff --git a/w3af/core/controllers/dependency_check/requirements.py b/w3af/core/controllers/dependency_check/requirements.py index c981441010..9275c3ffcf 100644 --- a/w3af/core/controllers/dependency_check/requirements.py +++ b/w3af/core/controllers/dependency_check/requirements.py @@ -114,6 +114,7 @@ # Calculate distances between two strings PIPDependency('Levenshtein', 'python-Levenshtein', '0.12.0'), + PIPDependency('Zeep', 'zeep', '3.4.0'), ] GUI_PIP_EXTRAS = [PIPDependency('xdot', 'xdot', '0.6')] diff --git a/w3af/core/controllers/misc/tests/test_is_private_site.py b/w3af/core/controllers/misc/tests/test_is_private_site.py index 2fad8e75cd..d2a5d3593f 100644 --- a/w3af/core/controllers/misc/tests/test_is_private_site.py +++ b/w3af/core/controllers/misc/tests/test_is_private_site.py @@ -40,6 +40,6 @@ def test_is_private_site_true_case03(self): def test_is_private_site_false_case01(self): self.assertFalse(is_private_site('192.1.0.1')) + @pytest.mark.skip('uses internet') def test_is_private_site_false_case02(self): self.assertFalse(is_private_site('www.w3af.org')) - diff --git a/w3af/core/data/options/option_list.py b/w3af/core/data/options/option_list.py index 74f3c4820d..8d883bc949 100644 --- a/w3af/core/data/options/option_list.py +++ b/w3af/core/data/options/option_list.py @@ -35,6 +35,18 @@ def add(self, option): self._internal_opt_list.append(option) append = add + def pop(self, option): + """ + DANGEROUS!! + You will probably want to deepcopy the OptionList instance before + modifying it with this method. If you'll modify the original OptionList + then user will have to set this option again. + """ + if not isinstance(option, int): + option_names = [item.get_name() for item in self._internal_opt_list] + option = option_names.index(option) + return self._internal_opt_list.pop(option) + def __len__(self): return len(self._internal_opt_list) diff --git a/w3af/core/data/parsers/doc/baseparser.py b/w3af/core/data/parsers/doc/baseparser.py index 6cb332e28b..4f7e486155 100644 --- a/w3af/core/data/parsers/doc/baseparser.py +++ b/w3af/core/data/parsers/doc/baseparser.py @@ -145,6 +145,9 @@ def get_references(self): """ raise NotImplementedError(NOT_IMPLEMENTED_FMT % 'get_references') + def get_fuzzable_requests(self): + return [] + def get_emails(self, domain=None): """ :return: A set with email addresses diff --git a/w3af/core/data/parsers/doc/open_api/requests.py b/w3af/core/data/parsers/doc/open_api/requests.py index 19366b0d7f..1a28daef88 100644 --- a/w3af/core/data/parsers/doc/open_api/requests.py +++ b/w3af/core/data/parsers/doc/open_api/requests.py @@ -184,6 +184,8 @@ def get_uri(self): continue if param_def.param_spec['type'] == 'array': + if not parameters[param_name] and not param_def.required: + continue parameters[param_name] = parameters[param_name][0] if parameters: diff --git a/w3af/core/data/parsers/doc/open_api/specification.py b/w3af/core/data/parsers/doc/open_api/specification.py index 4aaa93344e..dc338fcf72 100644 --- a/w3af/core/data/parsers/doc/open_api/specification.py +++ b/w3af/core/data/parsers/doc/open_api/specification.py @@ -21,7 +21,6 @@ """ import json -import yaml import logging from yaml import load @@ -232,7 +231,8 @@ def _load_spec_dict(self): :return: The dict with the open api data """ try: - spec_dict = json.loads(self.http_response.body) + decoded_response = self.http_response.body.decode('ascii', 'ignore') + spec_dict = json.loads(decoded_response) except ValueError: # Seems like the OpenAPI was specified using Yaml instead of # JSON. Let's parse the Yaml data! diff --git a/w3af/core/data/parsers/doc/open_api/tests/data/array_not_required_model_items.json b/w3af/core/data/parsers/doc/open_api/tests/data/array_not_required_model_items.json new file mode 100644 index 0000000000..7a7877738f --- /dev/null +++ b/w3af/core/data/parsers/doc/open_api/tests/data/array_not_required_model_items.json @@ -0,0 +1,71 @@ +{ + "swagger": "2.0", + "info": { + "version": "1.0.0", + "title": "Swagger Petstore", + "description": "A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification", + "termsOfService": "http://swagger.io/terms/", + "contact": { + "name": "Swagger API Team" + }, + "license": { + "name": "MIT" + } + }, + "host": "petstore.swagger.io", + "basePath": "/api", + "schemes": [ + "http" + ], + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "paths": { + "/pets": { + "post": { + "description": "Add multiple pets", + "operationId": "addMultiplePets", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "pets", + "in": "query", + "description": "array with pets to add", + "required": false, + "type": "array", + "items": {} + } + ], + "responses": { + "200": { + "description": "pet response", + "schema": { + "$ref": "#/definitions/Pet" + } + } + } + } + } + }, + "definitions": { + "Pet": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "name": { + "type": "string" + }, + "tag": { + "type": "string" + } + } + } + } +} diff --git a/w3af/core/data/parsers/doc/open_api/tests/data/swagger-special-chars.json b/w3af/core/data/parsers/doc/open_api/tests/data/swagger-special-chars.json new file mode 100644 index 0000000000..a3eeff15d9 --- /dev/null +++ b/w3af/core/data/parsers/doc/open_api/tests/data/swagger-special-chars.json @@ -0,0 +1,73 @@ +{ + "swagger": "2.0", + "info": { + "version": "1.0.0", + "title": "Swagger Petstore, special chars: ąęćźżó^żć√≤Ķńå", + "description": "A sample API that uses a petstore as an example to demonstrate features in the swagger-2.0 specification", + "termsOfService": "http://swagger.io/terms/", + "contact": { + "name": "Swagger API Team" + }, + "license": { + "name": "MIT" + } + }, + "host": "petstore.swagger.io", + "basePath": "/api", + "schemes": [ + "http" + ], + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "paths": { + "/pets": { + "get": { + "description": "Returns all pets from the system that the user has access to", + "produces": [ + "application/json" + ], + "responses": { + "200": { + "description": "A list of pets.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/Pet" + } + } + } + } + } + }, + "/pets/{ąęćźżó^żć√≤Ķńå}": { + "get": { + "description": "Let's see if I'll return an error" + } + } + }, + "definitions": { + "Pet": { + "type": "object", + "required": [ + "id", + "name" + ], + "properties": { + "id": { + "type": "integer", + "format": "int64" + }, + "name": { + "type": "string" + }, + "tag": { + "type": "string" + } + } + } + } +} diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py index 45027e6090..2a454e128a 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py @@ -20,6 +20,7 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +import json import unittest from w3af.core.data.parsers.doc.url import URL @@ -312,6 +313,24 @@ def test_array_with_model_items_param_in_json(self): self.assertEqual(fuzzable_request.get_headers(), e_headers) self.assertEqual(fuzzable_request.get_data(), e_data) + def test_array_param_not_required_in_json(self): + """ + Regression test when param type is array and param is not required. + Param must be in query, not in body. + """ + test_spec_filename = ( + 'w3af/core/data/parsers/doc/open_api/tests/data/array_not_required_model_items.json' + ) + with open(test_spec_filename, 'r') as file_: + specification_as_string = file_.read() + + http_response = self.generate_response(specification_as_string) + handler = SpecificationHandler(http_response) + data = [item for item in handler.get_api_information()] + for spec_obj in data: + factory = RequestFactory(*spec_obj) + req = factory.get_fuzzable_request() + def test_model_param_nested_allOf_in_json(self): specification_as_string = NestedModel().get_specification() http_response = self.generate_response(specification_as_string) diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_specification.py b/w3af/core/data/parsers/doc/open_api/tests/test_specification.py index e6e96efb6a..3eb9636167 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_specification.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_specification.py @@ -595,6 +595,18 @@ def test_parameter_handler_multiple_paths_and_headers(self): handler = SpecificationHandler(http_response) self.check_parameter_setting(handler) + def test_specification_handler_can_handle_spec_with_non_ascii_chars(self): + with open( + 'w3af/core/data/parsers/doc/open_api/tests/data/swagger-special-chars.json', + ) as file_: + spec_as_string = file_.read() + http_response = self.generate_response(spec_as_string) + spec_handler = SpecificationHandler(http_response) + result = spec_handler.get_api_information() + for _ in result: + pass + self.assertFalse(spec_handler._parsing_errors) + def check_parameter_setting(self, spec_handler): data = [d for d in spec_handler.get_api_information()] self.assertIsNotNone(data) diff --git a/w3af/core/data/parsers/doc/tests/test_wsdl.py b/w3af/core/data/parsers/doc/tests/test_wsdl.py new file mode 100644 index 0000000000..e272cc532f --- /dev/null +++ b/w3af/core/data/parsers/doc/tests/test_wsdl.py @@ -0,0 +1,153 @@ +import pytest +from mock import MagicMock, patch + +from w3af.core.data.dc.headers import Headers +from w3af.core.data.parsers.doc.url import URL +from w3af.core.data.parsers.doc.wsdl import ZeepTransport, WSDLParser +from w3af.core.data.url.HTTPResponse import HTTPResponse +from w3af.core.data.url.extended_urllib import ExtendedUrllib +from w3af.plugins.tests.plugin_testing_tools import NetworkPatcher + + +@pytest.fixture +def mocked_http_client(): + return MagicMock() + + +@pytest.fixture +def zeep_transport(mocked_http_client): + transport = ZeepTransport() + transport.uri_opener = mocked_http_client + return transport + + +@pytest.fixture +def zeep_transport_from_class(zeep_transport): + return lambda *args, **kwargs: zeep_transport + + +@pytest.fixture +def http_response(): + return HTTPResponse( + 200, + '
', + Headers(), + URL('https://example.com/'), + URL('https://example.com/'), + ) + + +class TestZeepTransport: + def setup_method(self): + self.url = 'http://example.com/' + + def test_it_implements_all_needed_methods(self): + zeep_transport = ZeepTransport() + required_methods = [ + 'get', + 'load', + 'post', + 'post_xml', + ] + for method in required_methods: + assert hasattr(zeep_transport, method) + + def test_it_calls_http_client_on_get_method(self, zeep_transport, mocked_http_client): + zeep_transport.get(self.url, '', {}) + assert mocked_http_client.GET.called + + def test_it_calls_http_client_on_post_method(self, zeep_transport, mocked_http_client): + zeep_transport.post(self.url, 'some data', {}) + assert mocked_http_client.POST.called + + def test_it_calls_http_client_on_post_xml_method(self, zeep_transport, mocked_http_client): + from lxml import etree # feeding Zeep dependencies + zeep_transport.post_xml(self.url, etree.Element('test'), {}) + assert mocked_http_client.POST.called + + def test_it_loads_the_response_content(self, zeep_transport, mocked_http_client): + mocked_response = MagicMock(name='mocked_response') + mocked_response.body = 'test' + mocked_http_client.GET = MagicMock(return_value=mocked_response) + + result = zeep_transport.load(self.url) + assert result == 'test' + + def test_it_reports_requests_performed(self, zeep_transport): + assert not zeep_transport.requests_performed + zeep_transport.get(self.url, '', {}) + logged_request = { + 'url': self.url, + 'method': 'GET', + 'headers': {}, + 'data': None, + } + assert logged_request in zeep_transport.requests_performed + + def test_it_reports_proper_url_if_url_params_are_passed(self, zeep_transport): + params = {'test': True, 'some_val': 5} + zeep_transport.get(self.url, params, {}) + logged_request = { + 'url': '{}?test=True&some_val=5'.format(self.url), + 'method': 'GET', + 'headers': {}, + 'data': None, + } + assert logged_request in zeep_transport.requests_performed + + def test_it_reports_headers_properly(self, zeep_transport): + zeep_transport.get(self.url, '', {'test': True}) + logged_request = { + 'url': self.url, + 'method': 'GET', + 'headers': {'test': True}, + 'data': None, + } + assert logged_request in zeep_transport.requests_performed + + +class TestZeepTransportIntegration: + def test_it_can_perform_get_request(self): + url = 'http://example.com/' + with NetworkPatcher() as network_patcher: + zeep_transport = ZeepTransport() + zeep_transport.get(url, {}, {}) + assert url in network_patcher.mocked_server.urls_requested + + def test_it_can_perform_post_request(self): + url = 'http://example.com/' + with NetworkPatcher() as network_patcher: + zeep_transport = ZeepTransport() + zeep_transport.post(url, 'some data', {}) + assert url in network_patcher.mocked_server.urls_requested + + def test_it_can_load_url(self): + url = 'http://example.com/' + with NetworkPatcher() as network_patcher: + zeep_transport = ZeepTransport() + zeep_transport.load('http://example.com/') + assert url in network_patcher.mocked_server.urls_requested + + +class TestWSDLParserIntegration: + def test_wsdl_zeep_transport_uses_extended_urllib(self): + zeep_transport = ZeepTransport() + assert isinstance(zeep_transport.uri_opener, ExtendedUrllib) + + def test_it_uses_extended_urllib_for_performing_requests( + self, + mocked_http_client, + zeep_transport_from_class, + http_response, + ): + mocked_http_client.GET = MagicMock(return_value=http_response) + with patch('w3af.core.data.parsers.doc.wsdl.ZeepTransport', zeep_transport_from_class): + WSDLParser(http_response=http_response) + assert mocked_http_client.GET.called + + def test_it_produces_fuzzable_requests(self, http_response): + with NetworkPatcher(): + wsdl_parser = WSDLParser(http_response=http_response) + fuzzable_requests = wsdl_parser.get_fuzzable_requests() + assert len(fuzzable_requests) == 1 + assert fuzzable_requests[0].get_url() == http_response.get_url() diff --git a/w3af/core/data/parsers/doc/wsdl.py b/w3af/core/data/parsers/doc/wsdl.py index 13e71599d5..aa531015ec 100644 --- a/w3af/core/data/parsers/doc/wsdl.py +++ b/w3af/core/data/parsers/doc/wsdl.py @@ -19,194 +19,179 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ -import xml.parsers.expat as expat - -import SOAPpy - -import w3af.core.controllers.output_manager as om -from w3af.core.controllers.exceptions import BaseFrameworkException +import contextlib +import sys +import urllib +from cStringIO import StringIO + +import zeep +from requests import HTTPError +from zeep.exceptions import XMLSyntaxError + +import w3af.core.data.kb.knowledge_base as kb +from w3af.core.data.kb.info import Info +from w3af.core.data.parsers.doc.baseparser import BaseParser from w3af.core.data.parsers.doc.url import URL +from w3af.core.controllers import output_manager +from w3af.core.data.request.fuzzable_request import FuzzableRequest -class WSDLParser(object): +class ZeepTransport(zeep.Transport): + """ + Custom Zeep Transport class which overrides it's methods to use w3af's HTTP client. + We don't call super() on any overwritten method as we want to force Zeep to use + our client, not their. + + Tradeoff: + As WSDLParser has to be tight coupled to Zeep by design we have to also + make tight coupling between WSDLParser and ExtendedUrllib. And that's because + parser by design is not intended to perform any requests by itself. Although + Zeep is constructed in this specific way that it performs request when it's + instantiated. + As parsers are not intended to make requests there's also no obvious way to + pass uri_opener into parser. + """ + def __init__(self): + super(ZeepTransport, self).__init__() + from w3af.core.data.url.extended_urllib import ExtendedUrllib + self.uri_opener = ExtendedUrllib() + self.uri_opener.setup(disable_cache=True) + self.requests_performed = [] + + def get(self, address, params, headers): + self._save_request(address, method='GET', params=params, headers=headers) + return self.uri_opener.GET(address, params, headers=headers) + + def post(self, address, message, headers): + self._save_request(address, method='POST', data=message, headers=headers) + return self.uri_opener.POST(address, data=message, headers=headers) + + def post_xml(self, address, envelope, headers): + from zeep.wsdl.utils import etree_to_string + message = etree_to_string(envelope) + self._save_request(address, method='POST', data=message, headers=headers) + return self.uri_opener.POST(address, data=message, headers=headers) + + def load(self, url): + self._save_request(address=url, method='GET') + response = self.uri_opener.GET(url) + return response.body + + def _save_request(self, address, method, params=None, headers=None, data=None): + uri = address + if params: + uri += '?{}'.format(urllib.urlencode(params)) + self.requests_performed.append({ + 'url': uri, + 'method': method, + 'headers': headers, + 'data': data, + }) + + +class ZeepClientAdapter(zeep.Client): + def __init__(self, url, transport=None, *args, **kwargs): + transport = transport or ZeepTransport() + super(ZeepClientAdapter, self).__init__(url, transport=transport, *args, **kwargs) + + +class WSDLParser(BaseParser): """ This class parses WSDL documents. :author: Andres Riancho (andres.riancho@gmail.com) """ - def __init__(self): + def __init__(self, http_response): self._proxy = None - - def is_WSDL(self, data): - """ - This is not a 100% accurate test, the real WSDL parsing is performed - in "SOAPpy.WSDL.Proxy( xmlData )". This test was mostly added to - enhance framework's performance. - - :param data: A string that might represent a WSDL - :return: True if the data parameter is a WSDL document. - """ - return False - if '', 'text/html') + + +class MockedSerializer: + """ + If you wonder why on earth do we use MockedSerializer instead of something + as simple as MagicMock then it's because mp_document_parser goes crazy with + ProcessPool and Pickling and MagicMock brings a lot of troubles if you want + to pickle it. + """ + def __init__(self): + self.saved_data = {} + + def save_http_response(self, http_response): + self.saved_data[id(http_response)] = http_response.to_dict() + return id(http_response) + + def load_http_response(self, id_): + return HTTPResponse.from_dict(self.saved_data[id_]) + + def save_tags(self, tag_list): + data = [t.to_dict() for t in tag_list] + self.saved_data[id(data)] = data + return id(data) + + def load_tags(self, id_): + result = [Tag.from_dict(t) for t in self.saved_data[id_]] + return result + def remove_if_exists(self, id_): + if id_ in self.saved_data: + self.saved_data.pop(id_) -class TestMPDocumentParser(unittest.TestCase): - def setUp(self): - self.url = URL('http://w3af.com') +class TestMPDocumentParser: + """ + If you wonder why on earth do we use FileSerializer(temp_dir) instead of + something as simple as MagicMock then it's because mp_document_parser + goes crazy with ProcessPool and Pickling MagicMock brings a lot of trouble. + """ + def setup_method(self): self.headers = Headers([(u'content-type', u'text/html')]) self.mpdoc = MultiProcessingDocumentParser() + self.temp_directory = tempfile.gettempdir() + '/w3af-test' + os.mkdir(self.temp_directory) + serializer = FileSerializer(file_directory=self.temp_directory) + self.mpdoc._serializer = serializer - def tearDown(self): - self.mpdoc.stop_workers() + def teardown_method(self): + shutil.rmtree(self.temp_directory) - @pytest.mark.deprecated def test_basic(self): + url = URL('http://localhost') resp = HTTPResponse(200, 'hello', - self.headers, self.url, self.url) + self.headers, url, url) - parser = self.mpdoc.get_document_parser_for(resp) + with NetworkPatcher(): + parser = self.mpdoc.get_document_parser_for(resp) parsed_refs, _ = parser.get_references() - self.assertEqual([URL('http://w3af.com/abc')], parsed_refs) + assert [URL('http://localhost/abc')] == parsed_refs - @pytest.mark.deprecated def test_no_parser_for_images(self): body = '' url = URL('http://w3af.com/foo.jpg') headers = Headers([(u'content-type', u'image/jpeg')]) resp = HTTPResponse(200, body, headers, url, url) - try: + with pytest.raises(Exception) as e: self.mpdoc.get_document_parser_for(resp) - except Exception, e: - self.assertEqual(str(e), 'There is no parser for images.') - else: - self.assertTrue(False, 'Expected exception!') + assert str(e) == 'There is no parser for images.' - @pytest.mark.deprecated def test_parser_timeout(self): """ Test to verify fix for https://github.com/andresriancho/w3af/issues/6723 @@ -99,12 +143,9 @@ def test_parser_timeout(self): max_workers_mock.return_value = 1 parsers_mock.return_value = [DelayedParser, HTMLParser] - try: + with pytest.raises(TimeoutError) as toe: self.mpdoc.get_document_parser_for(http_resp) - except TimeoutError, toe: self._is_timeout_exception_message(toe, om_mock, http_resp) - else: - self.assertTrue(False) # # We now want to make sure that after we kill the process the Pool @@ -116,7 +157,7 @@ def test_parser_timeout(self): http_resp = _build_http_response(html, u'text/html') doc_parser = self.mpdoc.get_document_parser_for(http_resp) - self.assertIsInstance(doc_parser._parser, HTMLParser) + assert isinstance(doc_parser._parser, HTMLParser) @pytest.mark.slow def test_many_parsers_timing_out(self): @@ -153,12 +194,9 @@ def test_many_parsers_timing_out(self): for i in xrange(ITERATIONS): http_resp = _build_http_response(html_trigger_delay % i, u'text/html') - try: + with pytest.raises(TimeoutError) as toe: self.mpdoc.get_document_parser_for(http_resp) - except TimeoutError, toe: self._is_timeout_exception_message(toe, om_mock, http_resp) - else: - self.assertTrue(False) # # Lets timeout randomly @@ -172,7 +210,7 @@ def test_many_parsers_timing_out(self): except TimeoutError, toe: self._is_timeout_exception_message(toe, om_mock, http_resp) else: - self.assertIsInstance(parser._parser, HTMLParser) + assert isinstance(parser._parser, HTMLParser) # # Lets parse things we know should work @@ -180,7 +218,7 @@ def test_many_parsers_timing_out(self): for i in xrange(ITERATIONS): http_resp = _build_http_response(html_ok % i, u'text/html') parser = self.mpdoc.get_document_parser_for(http_resp) - self.assertIsInstance(parser._parser, HTMLParser) + assert isinstance(parser._parser, HTMLParser) def test_parser_with_large_attr_killed_when_sending_to_queue(self): """ @@ -227,12 +265,9 @@ def test_parser_with_large_attr_killed_when_sending_to_queue(self): for i in xrange(ITERATIONS): http_resp = _build_http_response(html_trigger_delay % i, u'text/html') - try: + with pytest.raises(TimeoutError) as toe: self.mpdoc.get_document_parser_for(http_resp) - except TimeoutError, toe: self._is_timeout_exception_message(toe, om_mock, http_resp) - else: - self.assertTrue(False) # # Lets timeout randomly @@ -246,7 +281,7 @@ def test_parser_with_large_attr_killed_when_sending_to_queue(self): except TimeoutError, toe: self._is_timeout_exception_message(toe, om_mock, http_resp) else: - self.assertIsInstance(parser._parser, HTMLParser) + assert isinstance(parser._parser, HTMLParser) # # Lets parse things we know should work @@ -254,9 +289,8 @@ def test_parser_with_large_attr_killed_when_sending_to_queue(self): for i in xrange(ITERATIONS): http_resp = _build_http_response(html_ok % i, u'text/html') parser = self.mpdoc.get_document_parser_for(http_resp) - self.assertIsInstance(parser._parser, HTMLParser) + assert isinstance(parser._parser, HTMLParser) - @pytest.mark.deprecated def test_parser_memory_usage_exceeded(self): """ This makes sure that we stop parsing a document that exceeds our memory @@ -281,12 +315,9 @@ def test_parser_memory_usage_exceeded(self): max_workers_mock.return_value = 1 parsers_mock.return_value = [UseMemoryParser, HTMLParser] - try: + with pytest.raises(MemoryError) as me: self.mpdoc.get_document_parser_for(http_resp) - except MemoryError, me: - self.assertIn('OOM issues', str(me)) - else: - self.assertTrue(False) + assert 'OOM issues' in str(me) # # We now want to make sure that after we stop because of a memory issue @@ -296,7 +327,7 @@ def test_parser_memory_usage_exceeded(self): http_resp = _build_http_response(html, u'text/html') doc_parser = self.mpdoc.get_document_parser_for(http_resp) - self.assertIsInstance(doc_parser._parser, HTMLParser) + assert isinstance(doc_parser._parser, HTMLParser) def _is_timeout_exception_message(self, toe, om_mock, http_resp): msg = ('[timeout] The parser took more than %s seconds to ' @@ -305,7 +336,7 @@ def _is_timeout_exception_message(self, toe, om_mock, http_resp): error = msg % (MultiProcessingDocumentParser.PARSER_TIMEOUT, http_resp.get_url()) - self.assertEquals(str(toe), error) + assert str(toe) == error def test_daemon_child(self): """ @@ -325,9 +356,7 @@ def test_daemon_child(self): p.join() got_assertion_error = queue.get(timeout=10) - if got_assertion_error: - self.assertTrue(False, 'daemonic processes are not allowed' - ' to have children') + assert not got_assertion_error def test_non_daemon_child_ok(self): """ @@ -342,11 +371,9 @@ def test_non_daemon_child_ok(self): p.join() got_assertion_error = queue.get(timeout=10) - if got_assertion_error: - self.assertTrue(False, 'daemonic processes are not allowed' - ' to have children') + assert not got_assertion_error - @pytest.mark.deprecated + @pytest.mark.deprecated # this test uses internet!! def test_dictproxy_pickle_8748(self): """ MaybeEncodingError - PicklingError: Can't pickle dictproxy #8748 @@ -359,9 +386,9 @@ def test_dictproxy_pickle_8748(self): resp = HTTPResponse(200, html_body, self.headers, url, url) parser = self.mpdoc.get_document_parser_for(resp) - self.assertIsInstance(parser._parser, HTMLParser) + assert isinstance(parser._parser, HTMLParser) - @pytest.mark.deprecated + @patch_network def test_get_tags_by_filter(self): body = 'foobar' url = URL('http://www.w3af.com/') @@ -371,10 +398,9 @@ def test_get_tags_by_filter(self): tags = self.mpdoc.get_tags_by_filter(resp, ('a', 'b'), yield_text=True) - self.assertEqual([Tag('a', {'href': '/abc'}, 'foo'), - Tag('b', {}, 'bar')], tags) + assert [Tag('a', {'href': '/abc'}, 'foo'), Tag('b', {}, 'bar')] == tags - @pytest.mark.deprecated + @patch_network def test_get_tags_by_filter_empty_tag(self): body = '' url = URL('http://www.w3af.com/') @@ -385,18 +411,23 @@ def test_get_tags_by_filter_empty_tag(self): tags = self.mpdoc.get_tags_by_filter(resp, ('script',), yield_text=True) # Note that lxml returns None for this tag text: - self.assertEqual([Tag('script', {'src': 'foo.js'}, None)], tags) + assert [Tag('script', {'src': 'foo.js'}, None)] == tags + + def test_it_doesnt_silence_type_error_from_document_parser(self, html_response): + self.mpdoc._document_parser_class = MockedDamagedDocumentParser + with pytest.raises(TypeError), NetworkPatcher(): + self.mpdoc.get_document_parser_for(html_response) def daemon_child(queue): dpc = MultiProcessingDocumentParser() + dpc.start_workers() + queue.put(False) + - try: - dpc.start_workers() - except AssertionError: - queue.put(True) - else: - queue.put(False) +class MockedDamagedDocumentParser: + def __init__(self): + raise TypeError('unit-test') class DelayedParser(object): diff --git a/w3af/core/data/url/extended_urllib.py b/w3af/core/data/url/extended_urllib.py index 20c77797f3..bc0ebcf9df 100644 --- a/w3af/core/data/url/extended_urllib.py +++ b/w3af/core/data/url/extended_urllib.py @@ -528,10 +528,10 @@ def end(self): def restart(self): self.end() - def setup(self): + def setup(self, disable_cache=False): if self.settings.need_update or self._opener is None: self.settings.need_update = False - self.settings.build_openers() + self.settings.build_openers(disable_cache=disable_cache) self._opener = self.settings.get_custom_opener() self.clear_timeout() @@ -674,12 +674,25 @@ def send_mutant(self, mutant, callback=None, grep=True, cache=True, return res - def GET(self, uri, data=None, headers=None, cache=False, - grep=True, cookies=True, session=None, - respect_size_limit=True, new_connection=False, - error_handling=True, timeout=None, follow_redirects=False, - use_basic_auth=True, use_proxy=True, debugging_id=None, - binary_response=False): + def GET( + self, + uri, + data=None, + headers=None, + cache=False, + grep=True, + cookies=True, + session=None, + respect_size_limit=True, + new_connection=False, + error_handling=True, + timeout=None, + follow_redirects=False, + use_basic_auth=True, + use_proxy=True, + debugging_id=None, + binary_response=False, + ): """ HTTP GET a URI using a proxy, user agent, and other settings that where previously set in opener_settings.py . @@ -702,15 +715,7 @@ def GET(self, uri, data=None, headers=None, cache=False, :return: An HTTPResponse object. """ - headers = headers or Headers() - - if not isinstance(uri, URL): - raise TypeError('The uri parameter of ExtendedUrllib.GET() must be' - ' of url.URL type.') - - if not isinstance(headers, Headers): - raise TypeError('The header parameter of ExtendedUrllib.GET() must' - ' be of Headers type.') + uri, headers = self._parse_uri_and_headers(uri, headers, method_name='GET') # Validate what I'm sending, init the library (if needed) self.setup() @@ -738,12 +743,25 @@ def GET(self, uri, data=None, headers=None, cache=False, with raise_size_limit(respect_size_limit): return self.send(req, grep=grep) - def POST(self, uri, data='', headers=None, grep=True, cache=False, - cookies=True, session=None, error_handling=True, timeout=None, - follow_redirects=None, use_basic_auth=True, use_proxy=True, - debugging_id=None, new_connection=False, - respect_size_limit=None, - binary_response=False): + def POST( + self, + uri, + data='', + headers=None, + grep=True, + cache=False, + cookies=True, + session=None, + error_handling=True, + timeout=None, + follow_redirects=None, + use_basic_auth=True, + use_proxy=True, + debugging_id=None, + new_connection=False, + respect_size_limit=None, + binary_response=False, + ): """ POST's data to a uri using a proxy, user agents, and other settings that where set previously. @@ -755,15 +773,7 @@ def POST(self, uri, data='', headers=None, grep=True, cache=False, :see: The GET() for documentation on the other parameters :return: An HTTPResponse object. """ - headers = headers or Headers() - - if not isinstance(uri, URL): - raise TypeError('The uri parameter of ExtendedUrllib.POST() must' - ' be of url.URL type. Got %s instead.' % type(uri)) - - if not isinstance(headers, Headers): - raise TypeError('The header parameter of ExtendedUrllib.POST() must' - ' be of Headers type.') + uri, headers = self._parse_uri_and_headers(uri, headers, method_name='POST') # Validate what I'm sending, init the library (if needed) self.setup() @@ -792,6 +802,38 @@ def POST(self, uri, data='', headers=None, grep=True, cache=False, return self.send(req, grep=grep) + def _parse_uri_and_headers(self, uri, headers, method_name): + """ + If uri or headers comes in primitive format then make sure they're + instantiated to proper ones. + """ + if isinstance(headers, dict): + new_headers = [] + for key, value in headers.items(): + new_headers.append((key, value)) + headers = Headers(new_headers) + headers = headers or Headers() + + if not isinstance(headers, Headers): + error_message = ( + 'The header parameter of ExtendedUrllib.{}() must be of dict Headers type.' + ) + raise TypeError( + error_message.format(method_name) + ) + + if isinstance(uri, str): + uri = URL(uri) + if not isinstance(uri, URL): + error_message = ( + 'The uri parameter of ExtendedUrllib.{}() must be of str or url.URL type.' + ) + raise TypeError( + error_message.format(method_name) + ) + + return uri, headers + def get_remote_file_size(self, req, cache=True): """ This method was previously used in the framework to perform a HEAD diff --git a/w3af/core/data/url/handlers/cache.py b/w3af/core/data/url/handlers/cache.py index c348419adc..26cb42305f 100644 --- a/w3af/core/data/url/handlers/cache.py +++ b/w3af/core/data/url/handlers/cache.py @@ -28,6 +28,8 @@ # TODO: Why not POST? Why don't we perform real caching and respect # the cache headers/meta tags? # @see: https://bitbucket.org/jaraco/jaraco.net/src/65af6e442d21/jaraco/net/http/caching.py +from w3af.core.data.url.handlers.cache_backend.no_chache import NoCachedResponse + CACHE_METHODS = ('GET', 'HEAD') @@ -42,14 +44,17 @@ class CacheHandler(urllib2.BaseHandler): :author: Version 0.2 by Andres Riancho :author: Version 0.3 by Javier Andalia """ - def __init__(self): - CacheClass.init() + def __init__(self, disable_cache=False): + self._cache_class = DefaultCacheClass + if disable_cache: + self._cache_class = NoCachedResponse + self._cache_class.init() def clear(self): """ Clear the cache (remove all files and directories associated with it). """ - return CacheClass.clear() + return self._cache_class.clear() def default_open(self, request): """ @@ -64,11 +69,11 @@ def default_open(self, request): if not request.get_from_cache: return None - if not CacheClass.exists_in_cache(request): + if not self._cache_class.exists_in_cache(request): return None try: - cache_response_obj = CacheClass(request) + cache_response_obj = self._cache_class(request) except Exception: # Sometimes the cache gets corrupted, or the initial HTTP # request that's saved to disk doesn't completely respect the @@ -105,11 +110,11 @@ def http_response(self, request, response): # above) to decide if the response should be returned from the # cache # - CacheClass.store_in_cache(request, response) + self._cache_class.store_in_cache(request, response) return response https_response = http_response # This is the default implementation -CacheClass = SQLCachedResponse +DefaultCacheClass = SQLCachedResponse diff --git a/w3af/core/data/url/handlers/cache_backend/no_chache.py b/w3af/core/data/url/handlers/cache_backend/no_chache.py new file mode 100644 index 0000000000..910327524a --- /dev/null +++ b/w3af/core/data/url/handlers/cache_backend/no_chache.py @@ -0,0 +1,19 @@ +from w3af.core.data.url.handlers.cache_backend.cached_response import CachedResponse + + +class NoCachedResponse(CachedResponse): + @staticmethod + def init(): + pass + + @staticmethod + def exists_in_cache(request): + return False + + @staticmethod + def clear(): + pass + + @staticmethod + def store_in_cache(request, response): + pass diff --git a/w3af/core/data/url/handlers/tests/test_cache.py b/w3af/core/data/url/handlers/tests/test_cache.py index 818f60695f..0d90f1e436 100644 --- a/w3af/core/data/url/handlers/tests/test_cache.py +++ b/w3af/core/data/url/handlers/tests/test_cache.py @@ -23,7 +23,8 @@ import urllib2 import unittest -from mock import patch, Mock, _Call +import pytest +from mock import patch, Mock, _Call, MagicMock from w3af.core.data.url.HTTPRequest import HTTPRequest from w3af.core.data.url.handlers.cache import CacheHandler @@ -32,67 +33,79 @@ from w3af.core.data.dc.headers import Headers -class TestCacheHandler(unittest.TestCase): - - def tearDown(self): +class TestCacheHandler: + def setup_method(self): + self.url = URL('http://www.w3af.org') + self.request = HTTPRequest(self.url, cache=True) + self.response = FakeHttplibHTTPResponse( + 200, 'OK', 'spameggs', Headers(), self.url.url_string + ) + + def teardown_method(self): CacheHandler().clear() - + def test_basic(self): - url = URL('http://www.w3af.org') - request = HTTPRequest(url, cache=True) - + cache = CacheHandler() - self.assertEqual(cache.default_open(request), None) - - response = FakeHttplibHTTPResponse(200, 'OK', 'spameggs', Headers(), - url.url_string) + assert cache.default_open(self.request) is None - with patch('w3af.core.data.url.handlers.cache.CacheClass') as cc_mock: - store_in_cache = Mock() - cc_mock.attach_mock(store_in_cache, 'store_in_cache') + cc_mock = MagicMock() + cache._cache_class = cc_mock + store_in_cache = Mock() + cc_mock.attach_mock(store_in_cache, 'store_in_cache') - # This stores the response - cache.http_response(request, response) + # This stores the response + cache.http_response(self.request, self.response) - # Make sure the right call was made - _call = _Call(('store_in_cache', (request, response))) - self.assertEqual(cc_mock.mock_calls, [_call]) - cc_mock.reset_mock() + # Make sure the right call was made + _call = _Call(('store_in_cache', (self.request, self.response))) + assert cc_mock.mock_calls == [_call] + cc_mock.reset_mock() - exists_in_cache = Mock() - cc_mock.return_value = response - cc_mock.attach_mock(exists_in_cache, 'exists_in_cache') + exists_in_cache = Mock() + cc_mock.return_value = self.response + cc_mock.attach_mock(exists_in_cache, 'exists_in_cache') - # This retrieves the response from the "cache" - cached_response = cache.default_open(request) + # This retrieves the response from the "cache" + cached_response = cache.default_open(self.request) - # Make sure the right call was made - _exists_call = _Call(('exists_in_cache', (request,))) - _retrieve_call = _Call(((request,), {})) - self.assertEqual(cc_mock.mock_calls, [_exists_call, _retrieve_call]) + # Make sure the right call was made + _exists_call = _Call(('exists_in_cache', (self.request,))) + _retrieve_call = _Call(((self.request,), {})) + assert cc_mock.mock_calls == [_exists_call, _retrieve_call] - self.assertIsNotNone(cached_response) + assert cached_response is not None - self.assertEqual(cached_response.code, response.code) - self.assertEqual(cached_response.msg, response.msg) - self.assertEqual(cached_response.read(), response.read()) - self.assertEqual(Headers(cached_response.info().items()), response.info()) - self.assertEqual(cached_response.geturl(), response.geturl()) + assert cached_response.code == self.response.code + assert cached_response.msg == self.response.msg + assert cached_response.read() == self.response.read() + assert Headers(cached_response.info().items()) == self.response.info() + assert cached_response.geturl() == self.response.geturl() def test_no_cache(self): url = URL('http://www.w3af.org') request = HTTPRequest(url, cache=False) - + cache = CacheHandler() - self.assertEqual(cache.default_open(request), None) - + assert cache.default_open(request) is None + response = FakeHttplibHTTPResponse(200, 'OK', 'spameggs', Headers(), url.url_string) cache.http_response(request, response) - self.assertEqual(cache.default_open(request), None) + assert cache.default_open(request) is None + +class TestCacheIntegration: + def setup_method(self): + self.http_response = FakeHttplibHTTPResponse( + 200, + 'OK', + '', + Headers(), + 'http://example.com/' + ) -class CacheIntegrationTest(unittest.TestCase): + @pytest.mark.skip('uses internet') def test_cache_http_errors(self): settings = opener_settings.OpenerSettings() settings.build_openers() @@ -101,7 +114,7 @@ def test_cache_http_errors(self): url = URL('http://w3af.org/foo-bar-not-exists.htm') request = HTTPRequest(url, cache=False) - with patch('w3af.core.data.url.handlers.cache.CacheClass') as cc_mock: + with patch('w3af.core.data.url.handlers.cache.DefaultCacheClass') as cc_mock: store_in_cache = Mock() cc_mock.attach_mock(store_in_cache, 'store_in_cache') @@ -113,11 +126,23 @@ def test_cache_http_errors(self): # Make sure the right call was made _call = _Call(('store_in_cache', (request, response))) - self.assertEqual(cc_mock.mock_calls, [_call]) + assert cc_mock.mock_calls == [_call] cc_mock.reset_mock() # And make sure the response was a 404 - self.assertEqual(response.status, 404) + assert response.status == 404 + + def test_cache_handler_with_enabled_cache(self, http_request): + http_request.get_from_cache = True + cache_handler = CacheHandler(disable_cache=False) + cache_handler.http_response(http_request, self.http_response) + assert cache_handler.default_open(http_request) + + def test_cache_handler_with_disabled_cache(self, http_request): + http_request.get_from_cache = True + cache_handler = CacheHandler(disable_cache=True) + cache_handler.http_response(http_request, self.http_response) + assert not cache_handler.default_open(http_request) class FakeHttplibHTTPResponse(object): @@ -135,4 +160,4 @@ def read(self): return self.body def info(self): - return self.headers \ No newline at end of file + return self.headers diff --git a/w3af/core/data/url/handlers/tests/test_no_cache.py b/w3af/core/data/url/handlers/tests/test_no_cache.py new file mode 100644 index 0000000000..5c8a99294e --- /dev/null +++ b/w3af/core/data/url/handlers/tests/test_no_cache.py @@ -0,0 +1,15 @@ +from mock import MagicMock + +from w3af.core.data.url.handlers.cache_backend.no_chache import NoCachedResponse + + +def test_it_implements_all_static_methods_required(): + NoCachedResponse.init() + NoCachedResponse.clear() + NoCachedResponse.exists_in_cache(MagicMock()) + NoCachedResponse.store_in_cache(MagicMock(), MagicMock()) + + +def test_response_wont_exist_in_cache(http_request, http_response): + NoCachedResponse.store_in_cache(http_request, http_response) + assert not NoCachedResponse.exists_in_cache(http_request) diff --git a/w3af/core/data/url/opener_settings.py b/w3af/core/data/url/opener_settings.py index 16ef552237..d4fc7ee2ef 100644 --- a/w3af/core/data/url/opener_settings.py +++ b/w3af/core/data/url/opener_settings.py @@ -370,11 +370,11 @@ def get_keep_alive_handlers(self): self._ka_https } - def build_openers(self): + def build_openers(self, disable_cache=False): # Instantiate the handlers passing the proxy as parameter self._ka_http = HTTPHandler() self._ka_https = HTTPSHandler(self.get_proxy()) - self._cache_handler = CacheHandler() + self._cache_handler = CacheHandler(disable_cache=disable_cache) # Prepare the list of handlers handlers = [] diff --git a/w3af/core/data/url/tests/test_xurllib.py b/w3af/core/data/url/tests/test_xurllib.py index 54dd0c9015..0dd6bb6dc7 100644 --- a/w3af/core/data/url/tests/test_xurllib.py +++ b/w3af/core/data/url/tests/test_xurllib.py @@ -31,7 +31,7 @@ import httpretty from nose.plugins.attrib import attr -from mock import patch +from mock import patch, MagicMock from w3af import ROOT_PATH from w3af.core.data.url.extended_urllib import ExtendedUrllib @@ -52,7 +52,10 @@ @attr('moth') @attr('smoke') -class TestXUrllib(unittest.TestCase): +class TestXUrllibUnittest(unittest.TestCase): + """ + Pytest style is preferred for newer tests + """ MOTH_MESSAGE = 'moth: vulnerable web application' MOCK_URL = 'http://www.w3af.org/' @@ -64,7 +67,7 @@ def tearDown(self): self.uri_opener.end() httpretty.reset() - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_basic(self): url = URL(get_moth_http()) http_response = self.uri_opener.GET(url, cache=False) @@ -86,7 +89,7 @@ def test_redir_content_length_zero(self): http_response = self.uri_opener.GET(url, cache=False) self.assertEqual(http_response.get_code(), 301) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_basic_ssl(self): url = URL(get_moth_https()) http_response = self.uri_opener.GET(url, cache=False) @@ -96,6 +99,7 @@ def test_basic_ssl(self): self.assertGreaterEqual(http_response.id, 1) self.assertNotEqual(http_response.id, None) + @pytest.mark.skip('uses internet') def test_github_ssl(self): url = URL('https://raw.githubusercontent.com/RetireJS/retire.js/master/repository/jsrepository.json') @@ -106,7 +110,7 @@ def test_github_ssl(self): self.assertGreaterEqual(http_response.id, 1) self.assertNotEqual(http_response.id, None) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_cache(self): url = URL(get_moth_http()) http_response = self.uri_opener.GET(url) @@ -116,7 +120,7 @@ def test_cache(self): http_response = self.uri_opener.GET(url) self.assertIn(self.MOTH_MESSAGE, http_response.body) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_qs_params(self): url = URL(get_moth_http('/audit/xss/simple_xss.py?text=123456abc')) http_response = self.uri_opener.GET(url, cache=False) @@ -173,7 +177,7 @@ def test_GET_with_post_data_and_qs(self): self.assertEqual(httpretty.last_request().body, data) self.assertEqual(httpretty.last_request().path, '/' + qs) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_post(self): url = URL(get_moth_http('/audit/xss/simple_xss_form.py')) @@ -183,7 +187,7 @@ def test_post(self): http_response = self.uri_opener.POST(url, data, cache=False) self.assertIn('123456abc', http_response.body) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_post_special_chars(self): url = URL(get_moth_http('/audit/xss/simple_xss_form.py')) test_data = u'abc"-á-' @@ -194,7 +198,6 @@ def test_post_special_chars(self): http_response = self.uri_opener.POST(url, data, cache=False) self.assertIn(test_data, http_response.body) - @pytest.mark.deprecated def test_unknown_domain(self): url = URL('http://longsitethatdoesnotexistfoo.com/') self.assertRaises(HTTPRequestException, self.uri_opener.GET, url) @@ -203,13 +206,12 @@ def test_file_proto(self): url = URL('file://foo/bar.txt') self.assertRaises(HTTPRequestException, self.uri_opener.GET, url) - @pytest.mark.deprecated def test_url_port_closed(self): # TODO: Change 2312 by an always closed/non-http port url = URL('http://127.0.0.1:2312/') self.assertRaises(HTTPRequestException, self.uri_opener.GET, url) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_url_port_not_http(self): upper_daemon = UpperDaemon(EmptyTCPHandler) upper_daemon.start() @@ -226,7 +228,6 @@ def test_url_port_not_http(self): else: self.assertTrue(False, 'Expected HTTPRequestException.') - @pytest.mark.deprecated def test_url_port_not_http_many(self): upper_daemon = UpperDaemon(EmptyTCPHandler) upper_daemon.start() @@ -255,7 +256,7 @@ def test_url_port_not_http_many(self): self.assertEqual(scan_must_stop_e, 1) self.assertEqual(http_request_e, 9) - @pytest.mark.deprecated + @pytest.mark.skip('uses internet') def test_get_wait_time(self): """ Asserts that all the responses coming out of the extended urllib have a @@ -323,7 +324,6 @@ def test_ssl_sni(self): resp = self.uri_opener.GET(url) self.assertIn('Great!', resp.get_body()) - @pytest.mark.deprecated def test_ssl_fail_when_requesting_http(self): http_daemon = UpperDaemon(Ok200Handler) http_daemon.start() @@ -337,7 +337,6 @@ def test_ssl_fail_when_requesting_http(self): self.assertRaises(HTTPRequestException, self.uri_opener.GET, url) - @pytest.mark.deprecated def test_ssl_fail_when_requesting_moth_http(self): """ https://github.com/andresriancho/w3af/issues/7989 @@ -415,8 +414,8 @@ def send(uri_opener, output): self.assertEqual(http_response.get_code(), 200) self.assertIn(self.MOTH_MESSAGE, http_response.body) - - @pytest.mark.deprecated + + @pytest.mark.skip('uses internet') def test_removes_cache(self): url = URL(get_moth_http()) self.uri_opener.GET(url, cache=False) @@ -434,8 +433,8 @@ def test_removes_cache(self): test_trace_path = os.path.join(temp_dir, trace_fmt % i) self.assertFalse(os.path.exists(test_db_path), test_db_path) self.assertFalse(os.path.exists(test_trace_path), test_trace_path) - - @pytest.mark.deprecated + + @pytest.mark.skip('uses internet') def test_special_char_header(self): url = URL(get_moth_http('/core/headers/echo-headers.py')) header_content = u'name=ábc' @@ -443,7 +442,6 @@ def test_special_char_header(self): http_response = self.uri_opener.GET(url, cache=False, headers=headers) self.assertIn(header_content, http_response.body) - @pytest.mark.deprecated def test_bad_file_descriptor_8125_local(self): """ 8125 is basically an issue with the way HTTP SSL connections handle the @@ -505,6 +503,48 @@ def rate_limit_generic(self, max_requests_per_second, _min, _max): self.assertLessEqual(elapsed_time, _max) +@pytest.fixture +def blind_extended_urllib(): + """ + It's blind. It doesn't send real request and it returns MagicMock instead of + HTTPResponse instance, so mock's implementation stays easy. + """ + extended_urllib = ExtendedUrllib() + extended_urllib.setup() + extended_urllib._opener = MagicMock() + with patch('w3af.core.data.url.extended_urllib.HTTPResponse', MagicMock()): + yield extended_urllib + + +class TestXUrllib: + def test_get_method_can_be_called_with_url_as_string(self, blind_extended_urllib): + blind_extended_urllib.GET('http://example.com/') # no error + + def test_get_method_can_be_called_with_headers_as_dict(self, blind_extended_urllib): + headers = { + 'origin': 'example.com', + 'authorization': 'some token', + } + # no error + blind_extended_urllib.GET('http://example.com/', headers=headers) + + def test_post_method_can_be_called_with_url_as_string(self, blind_extended_urllib): + # no error + blind_extended_urllib.POST('http://example.com/', data='some data') + + def test_post_method_can_be_called_with_headers_as_dict(self, blind_extended_urllib): + headers = { + 'origin': 'example.com', + 'authorization': 'some token', + } + # no error + blind_extended_urllib.POST( + 'http://example.com/', + data='some data', + headers=headers, + ) + + class EmptyTCPHandler(SocketServer.BaseRequestHandler): def handle(self): self.data = self.request.recv(1024).strip() diff --git a/w3af/plugins/auth/autocomplete_js.py b/w3af/plugins/auth/autocomplete_js.py index 6b1d39e2cd..f86caf5b3d 100644 --- a/w3af/plugins/auth/autocomplete_js.py +++ b/w3af/plugins/auth/autocomplete_js.py @@ -20,7 +20,10 @@ """ import Queue +from copy import deepcopy +from w3af.core.data.options.opt_factory import opt_factory +from w3af.core.data.options.option_types import STRING from w3af.core.data.request.fuzzable_request import FuzzableRequest from w3af.core.controllers.chrome.instrumented.main import InstrumentedChrome from w3af.core.controllers.chrome.login.find_form.main import FormFinder @@ -36,6 +39,12 @@ class autocomplete_js(autocomplete): def __init__(self): autocomplete.__init__(self) + # default values for autocomplete_js options + self.username_field_css_selector = '' + self.login_button_css_selector = '' + self.login_form_activator_css_selector = '' + self._did_css_selectors_work = False + self._login_form = None self._http_traffic_queue = None @@ -81,6 +90,7 @@ def login(self, debugging_id=None): return True def _handle_authentication_success(self): + self._login_result_log.append(True) # # Logging # @@ -107,6 +117,17 @@ def _handle_authentication_success(self): self._configure_audit_blacklist(*login_urls) + def end(self): + super(autocomplete_js, self).end() + if not self._did_css_selectors_work: + message = ( + "The `{}` authentication plugin was never able to find " + "one or more CSS selectors specified in options." + ) + message = message.format(self.get_name()) + self._log_info_to_kb(title='CSS selectors failed', message=message) + self._log_error(message) + def _do_login(self, chrome): """ Login to the application in two different scenarios: @@ -129,7 +150,12 @@ def _login_using_existing_form(self, chrome): :param chrome: The chrome instance to use during login :return: True if login was successful """ - raise NotImplementedError + form_submit_strategy = self._find_form_submit_strategy(chrome, self._login_form) + if form_submit_strategy is None: + return False + self._login_form.set_submit_strategy(form_submit_strategy) + self._log_debug('Identified valid login form: %s' % self._login_form) + return True def _login_and_save_form(self, chrome): """ @@ -207,13 +233,20 @@ def _find_all_login_forms(self, chrome): * Use the FormFinder class to yield all existing forms """ form_finder = FormFinder(chrome, self._debugging_id) + css_selectors = { + 'username_input': self.username_field_css_selector, + 'login_button': self.login_button_css_selector, + 'form_activator': self.login_form_activator_css_selector, + } - for form in form_finder.find_forms(): + for form in form_finder.find_forms(css_selectors): msg = 'Found potential login form: %s' args = (form,) self._log_debug(msg % args) + self._did_css_selectors_work = True + yield form def _find_form_submit_strategy(self, chrome, form): @@ -239,7 +272,10 @@ def _find_form_submit_strategy(self, chrome, form): for form_submit_strategy in form_submitter.submit_form(): - if not self.has_active_session(debugging_id=self._debugging_id): + if not self.has_active_session(debugging_id=self._debugging_id, chrome=chrome): + msg = '%s is invalid form submit strategy for %s' + args = (form_submit_strategy.get_name(), form) + self._log_debug(msg % args) # No need to set the state of the chrome browser back to the # login page, that is performed inside the FormSubmitter continue @@ -256,22 +292,89 @@ def _find_form_submit_strategy(self, chrome, form): return None - def has_active_session(self, debugging_id=None): + def has_active_session(self, debugging_id=None, chrome=None): """ Check user session with chrome + :param str debugging_id: string representing debugging id. + :param InstrumentedChrome chrome: chrome instance passed from outer scope + to reuse. EDGE CASE EXAMPLE: + Sometimes we don't want to create new chrome instance. For example + when we login for the first time to webapp and in _find_form_submit_strategy() + we just pressed enter in login form. Browser may take some actions under + the hood like sending XHR to backend API and after receiving response + setting API token at localStorage. Before token will be saved to localStorage + it may exist only in webapp's code, so using the same chrome will prevent + us from performing check without credentials. """ has_active_session = False + is_new_chrome_instance_created = False self._set_debugging_id(debugging_id) - chrome = self._get_chrome_instance(load_url=False) + if not chrome or not chrome.chrome_conn: + chrome = self._get_chrome_instance(load_url=False) + is_new_chrome_instance_created = True try: chrome.load_url(self.check_url) chrome.wait_for_load() has_active_session = self.check_string in chrome.get_dom() finally: - chrome.terminate() + if is_new_chrome_instance_created: + chrome.terminate() return has_active_session + def get_options(self): + """ + :returns OptionList: list of option objects for plugin + """ + option_list = super(autocomplete_js, self).get_options() + autocomplete_js_options = [ + ( + 'username_field_css_selector', + self.username_field_css_selector, + STRING, + "(Optional) Exact CSS selector which will be used to retrieve " + "the username input field. When provided the scanner is not going" + " to try to detect the input field in an automated way" + ), + ( + 'login_button_css_selector', + self.login_button_css_selector, + STRING, + "(Optional) Exact CSS selector which will be used to retrieve " + "the login button field. When provided the scanner is not going " + "to try to detect the login button in an automated way" + ), + ( + 'login_form_activator_css_selector', + self.login_form_activator_css_selector, + STRING, + "(Optional) Exact CSS selector for the element which needs to be " + "clicked to show login form." + ) + ] + for option in autocomplete_js_options: + option_list.add(opt_factory( + option[0], + option[1], + option[3], + option[2], + help=option[3], + )) + return option_list + + def set_options(self, options_list): + options_list_copy = deepcopy(options_list) # we don't want to touch real option_list + self.username_field_css_selector = options_list_copy.pop( + 'username_field_css_selector' + ).get_value() + self.login_button_css_selector = options_list_copy.pop( + 'login_button_css_selector' + ).get_value() + self.login_form_activator_css_selector = options_list_copy.pop( + 'login_form_activator_css_selector' + ).get_value() + super(autocomplete_js, self).set_options(options_list_copy) + def get_long_desc(self): """ :return: A DETAILED description of the plugin functions and features. @@ -283,7 +386,15 @@ def get_long_desc(self): The plugin loads the `login_form_url` to obtain the login form, automatically identifies the inputs where the `username` and `password` should be entered, - and then submits the form by clicking on the login button. + and then submits the form by clicking on the login button. You can specify + the exact CSS selectors (like ".login > input #password") in + `username_filed_css_selector` and `login_button_css_selector` to force + plugin to use those selectors in case when it can't find username field + or login button automatically. + + If the page requires to click on something to show the login form you + can set `login_form_activator_css_selector` and scanner will use it + find and click on element The following configurable parameters exist: - username @@ -291,4 +402,7 @@ def get_long_desc(self): - login_form_url - check_url - check_string + - username_field_css_selector + - login_button_css_selector + - login_form_activator_css_selector """ diff --git a/w3af/plugins/crawl/web_spider.py b/w3af/plugins/crawl/web_spider.py index b9869da83c..a5b452de66 100644 --- a/w3af/plugins/crawl/web_spider.py +++ b/w3af/plugins/crawl/web_spider.py @@ -25,6 +25,7 @@ import itertools import w3af.core.controllers.output_manager as om +import w3af.core.data.kb.knowledge_base as kb import w3af.core.data.kb.config as cf import w3af.core.data.parsers.parser_cache as parser_cache import w3af.core.data.constants.response_codes as http_constants @@ -113,6 +114,9 @@ def crawl(self, fuzzable_request, debugging_id): self._extract_html_forms(doc_parser, fuzzable_request, debugging_id) self._extract_links_and_verify(doc_parser, fuzzable_request, http_response, debugging_id) + for fuzzable_request_ in doc_parser.get_fuzzable_requests(): + if fuzzable_request_ not in kb.kb.get_all_known_fuzzable_requests(): + self.output_queue.put(fuzzable_request_) # raise exceptions in the main thread for better handling # diff --git a/w3af/plugins/tests/auth/test_autocomplete_js.py b/w3af/plugins/tests/auth/test_autocomplete_js.py index a22fb3f298..56a303e9d3 100644 --- a/w3af/plugins/tests/auth/test_autocomplete_js.py +++ b/w3af/plugins/tests/auth/test_autocomplete_js.py @@ -26,6 +26,7 @@ import pytest from w3af import ROOT_PATH +from w3af.plugins.auth.autocomplete_js import autocomplete_js from w3af.plugins.tests.helper import PluginTest, PluginConfig from w3af.core.controllers.daemons.webserver import start_webserver_any_free_port from w3af.core.controllers.chrome.tests.helpers import ExtendedHttpRequestHandler @@ -44,9 +45,6 @@ PASS = 'passw0rd' -pytestmark = pytest.mark.deprecated - - class BasicLoginRequestHandler(ExtendedHttpRequestHandler): LOGIN_FORM = VANILLA_JS_LOGIN_1 ADMIN_HOME = u'Hello admin!' @@ -122,6 +120,7 @@ def do_login(self): return self.send_response_to_client(302, 'Success', headers) +@pytest.mark.deprecated class TestVanillaJavaScript1(PluginTest): SERVER_HOST = '127.0.0.1' @@ -233,3 +232,54 @@ def test_js_auth(self): self.assertIn('/login_post.py', FakeFormLoginRequestHandler.EVENTS) self.assertIn('/admin', FakeFormLoginRequestHandler.EVENTS) self.assertIn('ADMIN_REQUEST_SUCCESS', FakeFormLoginRequestHandler.EVENTS) + + +def test_autocomplete_js_reports_if_it_fails_to_use_css_selectors( + plugin_runner, + knowledge_base, +): + autocomplete_js_config = { + 'username': 'test@example.com', + 'password': 'pass', + 'check_url': 'http://example.com/me/', + 'login_form_url': 'http://example.com/login/', + 'check_string': 'logged as', + 'username_field_css_selector': '#username', + 'login_button_css_selector': '#login', + 'login_form_activator_css_selector': '#activator', + } + autocomplete_js_plugin = autocomplete_js() + plugin_runner.run_plugin(autocomplete_js_plugin, autocomplete_js_config) + kb_result = knowledge_base.dump() + errors = kb_result.get('authentication').get('error') + css_error_count = 0 + for error in errors: + if 'CSS selectors failed' in error.get_name(): + css_error_count += 1 + assert css_error_count + + +def test_autocomplete_js_doesnt_report_if_it_can_find_css_selectors( + plugin_runner, + knowledge_base, + js_domain_with_login_form, +): + autocomplete_js_config = { + 'username': 'test@example.com', + 'password': 'pass', + 'check_url': 'http://example.com/me/', + 'login_form_url': 'http://example.com/login/', + 'check_string': 'logged as', + 'username_field_css_selector': '#username', + 'login_button_css_selector': '#login', + } + autocomplete_js_plugin = autocomplete_js() + plugin_runner.run_plugin( + autocomplete_js_plugin, + autocomplete_js_config, + mock_domain=js_domain_with_login_form, + do_end_call=False, + ) + plugin_runner.plugin_last_ran.end() + kb_result = knowledge_base.dump() + assert not kb_result.get('authentication', {}).get('error') diff --git a/w3af/plugins/tests/conftest.py b/w3af/plugins/tests/conftest.py new file mode 100644 index 0000000000..b751a16c96 --- /dev/null +++ b/w3af/plugins/tests/conftest.py @@ -0,0 +1,44 @@ +import pytest + +from w3af.plugins.tests.plugin_testing_tools import TestPluginRunner + + +def cleanup_parser_cache(): + from w3af.core.data.parsers import parser_cache + parser_cache.dpc = parser_cache.ParserCache() + + +@pytest.fixture +def plugin_runner(): + """ + This fixture returns PluginRunner instance which can run the plugin inside + sandbox environment. + + It's "core" fixture in testing w3af. 99% of time when developer wants to test + something he runs plugin and sees what happens. + """ + cleanup_parser_cache() + return TestPluginRunner() + + +@pytest.fixture +def js_domain_with_login_form(): + mapping = { + 1: '
example
', + '/login/': ( + '
' + '' + '' + '' + '
' + ), + '/me/': '
logged as
', + } + return mapping + + +@pytest.fixture +def knowledge_base(): + from w3af.core.data.kb import knowledge_base + kb = knowledge_base.kb = knowledge_base.DBKnowledgeBase() # create new kb instance + return kb diff --git a/w3af/plugins/tests/crawl/conftest.py b/w3af/plugins/tests/crawl/conftest.py new file mode 100644 index 0000000000..dcbb36e277 --- /dev/null +++ b/w3af/plugins/tests/crawl/conftest.py @@ -0,0 +1,34 @@ +import pytest + + +@pytest.fixture +def soap_domain(): + with open('w3af/plugins/tests/crawl/soap/wsdl_example.xml', 'r') as file_: + wsdl_content = file_.read() + with open('w3af/plugins/tests/crawl/soap/soap_service_example.html', 'r') as file_: + soap_service_content = file_.read() + + return { + '/': ( + 'wrong"' + 'good' + ), + '/webservice.asmx': '
some strange things
', + '/webservice.asmx?WSDL=': wsdl_content, + '/webservicesserver/NumberConversion.wso': soap_service_content, + } + + +@pytest.fixture +def soap_domain_2(): + with open('w3af/plugins/tests/crawl/soap/wsdl_example.xml', 'r') as file_: + wsdl_content = file_.read() + with open('w3af/plugins/tests/crawl/soap/soap_service_example.html', 'r') as file_: + soap_service_content = file_.read() + + return { + '/': 'example.com', + '/webservice.asmx': '
some strange things
', + '/webservice.asmx?WSDL=': wsdl_content, + '/webservicesserver/NumberConversion.wso': soap_service_content, + } diff --git a/w3af/plugins/tests/crawl/soap/soap_service_example.html b/w3af/plugins/tests/crawl/soap/soap_service_example.html new file mode 100644 index 0000000000..87debc0b8e --- /dev/null +++ b/w3af/plugins/tests/crawl/soap/soap_service_example.html @@ -0,0 +1,24 @@ + +
+

Number Conversion Service

+
+

+ The Number Conversion Web Service, implemented with DataFlex, provides functions that convert numbers into words or dollar amounts. +

+ The following operations are available. For a formal definition, please review the + Service Description. +
    +
  • + NumberToWords +
    + Returns the word corresponding to the positive number passed as parameter. Limited to quadrillions. +
  • +

    +
  • + NumberToDollars +
    + Returns the non-zero dollar amount of the passed number. +
  • +

    +
+ diff --git a/w3af/plugins/tests/crawl/soap/wsdl_example.xml b/w3af/plugins/tests/crawl/soap/wsdl_example.xml new file mode 100644 index 0000000000..294f4f431c --- /dev/null +++ b/w3af/plugins/tests/crawl/soap/wsdl_example.xml @@ -0,0 +1,120 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Returns the word corresponding to the positive number passed as + parameter. Limited to quadrillions. + + + + + + Returns the non-zero dollar amount of the passed number. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + The Number Conversion Web Service, implemented with DataFlex, provides + functions that convert numbers into words or dollar amounts. + + + + + + + + + diff --git a/w3af/plugins/tests/crawl/soap/wsdl_example_2.xml b/w3af/plugins/tests/crawl/soap/wsdl_example_2.xml new file mode 100644 index 0000000000..0b45fc57df --- /dev/null +++ b/w3af/plugins/tests/crawl/soap/wsdl_example_2.xml @@ -0,0 +1,168 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/w3af/plugins/tests/crawl/test_web_spider.py b/w3af/plugins/tests/crawl/test_web_spider.py index 1b664b3ad8..2d40e596bb 100644 --- a/w3af/plugins/tests/crawl/test_web_spider.py +++ b/w3af/plugins/tests/crawl/test_web_spider.py @@ -30,6 +30,7 @@ from nose.plugins.attrib import attr from w3af import ROOT_PATH +from w3af.plugins.crawl.web_spider import web_spider from w3af.plugins.tests.helper import PluginTest, PluginConfig, MockResponse from w3af.core.controllers.ci.moth import get_moth_http from w3af.core.controllers.ci.wivet import get_wivet_http @@ -312,7 +313,7 @@ class TestDeadLock(PluginTest): MOCK_RESPONSES = [MockResponse('http://mock/', INDEX_HTML), MockResponse('http://mock/', 'Thanks.', method='POST')] - @pytest.mark.deprecated + @pytest.mark.slow def test_no_lock(self): cfg = self._run_configs['cfg'] self._scan(cfg['target'], cfg['plugins']) @@ -369,3 +370,112 @@ def test_form_exclusions(self): # revert any changes to the default so we don't affect other tests cf.cf.save('form_id_list', FormIDMatcherList('[]')) cf.cf.save('form_id_action', EXCLUDE) + + +class TestSoap: + def setup_class(self): + self.extra_options = { + 'target_domain': ['http://example.com/webservice.asmx?WSDL'] + } + self.plugin_options = { + 'enable_js_crawler': False, + } + + def test_soap_plugin_finds_all_endpoints( + self, + plugin_runner, + knowledge_base, + soap_domain, + ): + plugin_runner.run_plugin( + web_spider, + plugin_config=self.plugin_options, + mock_domain=soap_domain, + extra_options=self.extra_options, + ) + urls_discovered = [str(url) for url in knowledge_base.get_all_known_urls()] + assert 'http://example.com/webservicesserver/NumberConversion.wso' in urls_discovered + + def test_wsdl_parser_will_handle_wsdl_if_it_is_crawled_by_web_spider_accidentally( + self, + plugin_runner, + soap_domain, + knowledge_base, + ): + """ + by default plugin runner will order web_spider to crawl http://example.com/ + first. soap_domain keeps under http://example.com/ two URLs, one of them is + WSDL file. + """ + plugin_runner.run_plugin( + web_spider, + plugin_config=self.plugin_options, + mock_domain=soap_domain, + do_end_call=False, + ) + urls_discovered = [str(url) for url in knowledge_base.get_all_known_urls()] + assert 'http://example.com/webservicesserver/NumberConversion.wso' in urls_discovered + + def test_soap_can_parse_wsdl_with_both_kinds_of_syntax( + self, + plugin_runner, + soap_domain, + soap_domain_2, + knowledge_base, + ): + plugin_runner.run_plugin( + web_spider, + plugin_config=self.plugin_options, + mock_domain=soap_domain, + extra_options=self.extra_options, + ) + urls_discovered = [str(url) for url in knowledge_base.get_all_known_urls()] + + knowledge_base.cleanup() + + assert 'http://example.com/webservicesserver/NumberConversion.wso' in urls_discovered + plugin_runner.run_plugin( + web_spider, + plugin_config=self.plugin_options, + mock_domain=soap_domain_2, + extra_options=self.extra_options, + ) + urls_discovered = [str(url) for url in knowledge_base.get_all_known_urls()] + assert 'http://example.com/webservicesserver/NumberConversion.wso' in urls_discovered + + def test_web_spider_reports_wsdl_description( + self, + plugin_runner, + soap_domain, + knowledge_base, + ): + plugin_runner.run_plugin( + web_spider, + plugin_config=self.plugin_options, + mock_domain=soap_domain, + extra_options=self.extra_options, + ) + result = knowledge_base.dump() + assert result['wsdl_parser']['soap_actions'] + assert any([ + 'Service: NumberConversion' in str(info) and + 'NumberToDollars' in str(info) and + 'NumberToWords' in str(info) + for info in result['wsdl_parser']['soap_actions'] + ]) + + def test_scanning_soap_adds_fuzzable_request_to_output_queue( + self, + plugin_runner, + soap_domain, + knowledge_base, + ): + web_spider_instance = web_spider() + plugin_runner.run_plugin( + web_spider_instance, + plugin_config=self.plugin_options, + mock_domain=soap_domain, + extra_options=self.extra_options, + ) + # normally web_spider produces 4 known fuzzable request. SOAP parser adds another one. + assert len(knowledge_base.get_all_known_fuzzable_requests()) == 5 diff --git a/w3af/plugins/tests/grep/test_retirejs.py b/w3af/plugins/tests/grep/test_retirejs.py index 0eb8ac3adf..63a4346121 100644 --- a/w3af/plugins/tests/grep/test_retirejs.py +++ b/w3af/plugins/tests/grep/test_retirejs.py @@ -140,6 +140,7 @@ def test_version_check(self): rjs = retirejs() self.assertTrue(rjs._get_is_valid_retire_version()) + @pytest.mark.skip('uses internet') def test_retire_smoke_test(self): rjs = retirejs() self.assertTrue(rjs._retire_smoke_test()) diff --git a/w3af/plugins/tests/plugin_testing_tools.py b/w3af/plugins/tests/plugin_testing_tools.py new file mode 100644 index 0000000000..f3ca20c034 --- /dev/null +++ b/w3af/plugins/tests/plugin_testing_tools.py @@ -0,0 +1,322 @@ +import inspect +from urlparse import urlsplit + +from mock import patch, MagicMock + +from w3af.core.controllers.core_helpers.consumers.constants import POISON_PILL +from w3af.core.controllers.plugins.auth_plugin import AuthPlugin +from w3af.core.controllers.plugins.crawl_plugin import CrawlPlugin +from w3af.core.data.dc.headers import Headers +import w3af.core.data.kb.knowledge_base as kb +from w3af.core.data.parsers.doc.url import URL +from w3af.core.data.request.fuzzable_request import FuzzableRequest +from w3af.core.data.url.HTTPResponse import HTTPResponse +from w3af.core.data.url.extended_urllib import ExtendedUrllib + + +class TestPluginError(Exception): + pass + + +class TestPluginRunner: + """ + This class prepares everything needed to run w3af plugin, offers network + mocking (like mock_domain). The main method is `run_plugin` and it should + be used in tests. Also it exposes `plugin_last_ran` and `mocked_server` + as parameters. + """ + def __init__(self): + # Useful for debugging: + self.plugin_last_ran = None # last plugin instance used at self.run_plugin(). + self.mocked_server = None # mocked_server holds e.g. info which urls were hit. + + def run_plugin( + self, + plugin, + plugin_config=None, + mock_domain=None, + do_end_call=True, + extra_options=None, + ): + """ + This is the main method you'll probably use in your tests. + + :param Plugin plugin: plugin class or instance + :param dict plugin_config: dict which will be used to pass options with plugin.set_options + :param dict mock_domain: pytest fixture to mock requests to + specific domain + :param bool do_end_call: if False plugin.end() won't be called + :param dict extra_options: extra options for plugin runner used in certain + TestPluginRunner's methods. + For example (for web_spider plugin): + { + 'target_domain': [ + 'https://example.com/', + 'https://example.com/somethings', + ], + } + :return: Any result which returns the executed plugin. In most cases + it's just None + """ + + if inspect.isclass(plugin): + plugin_instance = plugin() + else: + plugin_instance = plugin + self.plugin_last_ran = plugin_instance + + self.mocked_server = MockedServer(url_mapping=mock_domain) + with NetworkPatcher( + mock_domain, + mocked_server=self.mocked_server, + plugin_instance=plugin_instance, + ): + if plugin_config: + self.set_options_to_plugin(plugin_instance, plugin_config) + + result = None + did_plugin_run = False + + if isinstance(plugin_instance, AuthPlugin): + result = run_auth_plugin(plugin_instance) + did_plugin_run = True + if isinstance(plugin_instance, CrawlPlugin): + result = run_crawl_plugin(plugin_instance, extra_options) + did_plugin_run = True + + if do_end_call: + plugin_instance.end() + + if not did_plugin_run: + raise TestPluginError( + "Can't find any way to run plugin {}. Is it already implemented?".format( + plugin_instance, + ) + ) + return result + + @staticmethod + def set_options_to_plugin(plugin, options): + """ + :param Plugin plugin: the plugin instance + :param dict options: dict of options that will be set to plugin + """ + options_list = plugin.get_options() + for option_name, option_value in options.items(): + option = options_list[option_name] + option.set_value(option_value) + plugin.set_options(options_list) + + +def run_auth_plugin(plugin): + if not plugin.has_active_session(): + return plugin.login() + return False + + +def run_crawl_plugin(plugin_instance, extra_options=None): + extra_options = extra_options or {} + initial_request_url = URL('http://example.com/') + initial_request = FuzzableRequest(initial_request_url) + requests_to_crawl = [initial_request] + if extra_options.get('target_domain'): + requests_to_crawl += [ + FuzzableRequest(URL(url)) + for url in + extra_options['target_domain'] + ] + plugin_instance.crawl(initial_request, debugging_id='test') + while requests_to_crawl: + request = requests_to_crawl.pop() + if request == POISON_PILL: + break + plugin_instance.crawl(request, debugging_id=MagicMock()) + for _ in range(plugin_instance.output_queue.qsize()): + request = plugin_instance.output_queue.get(block=True) + kb.kb.add_fuzzable_request(request) + requests_to_crawl.append(request) + return True + + +class MockedServer: + """ + This is class used to mock whole network for TestPluginRunner. It provides + `mock_GET` and `mock_chrome_load_url` which are methods to monkey-patch + the real w3af methods. + """ + def __init__(self, url_mapping=None): + """ + :param dict or None url_mapping: url_mapping should be a dict with data + formatted in following way: {'url_path': 'response_content'} or + {request_number: 'response_content'}. So for example: + { + 1: '
first response
', + 2: '
second response
', + 7: '
seventh response
', + '/login/': '' + '/me/': 'user@example.com' + } + """ + self.url_mapping = url_mapping or {} + self.default_content = 'example.com' + self.response_count = 0 + self.urls_requested = [] + + def mock_GET(self, url, *args, **kwargs): + """ + Mock for all places where w3af uses extended urllib. + + :param URL or str url: w3af.core.data.parsers.doc.url.URL instance or str + :return: w3af.core.data.url.HTTPResponse.HTTPResponse instance + """ + url = str(url) + return self._mocked_resp(URL(url), self.match_response(url)) + + def mock_POST(self, url, *args, **kwargs): + """ + Mock for all places where w3af uses extended urllib. + + :param URL or str url: w3af.core.data.parsers.doc.url.URL instance or str + :return: w3af.core.data.url.HTTPResponse.HTTPResponse instance + """ + url = str(url) + return self._mocked_resp(URL(url), self.match_response(url)) + + def mock_chrome_load_url(self, *args, **kwargs): + def real_mock(self_, url, *args, **kwargs): + """ + Set response content as chrome's DOM. + + :return: None + """ + self_.chrome_conn.Page.reload() # this enabled dom_analyzer.js + response_content = self.match_response(url.url_string) + result = self_.chrome_conn.Runtime.evaluate( + expression='document.write(`{}`)'.format(response_content) + ) + if result['result'].get('exceptionDetails'): + error_text = ( + "Can't mock the response for url\n" + "URL: {}\n" + "response_content: {}\n" + "JavaScript exception: {}" + ) + raise TestPluginError(error_text.format( + url, + response_content, + result['result']['exceptionDetails'] + )) + return None + return real_mock + + def mock_response(self, url): + """ + Sometimes you may need raw response content, not HTTPResponse instance. + + :return str: Raw response content (DOM) as string. + """ + response = self.match_response(url) + return response + + def match_response(self, url): + """ + :param str url: string representing url like: https://example.com/test/ + :return str: the content of matched response + """ + self.response_count += 1 + self.urls_requested.append(url) + if self.url_mapping.get(self.response_count): + return self.url_mapping[self.response_count] + + split_url = urlsplit(url) + path_to_match = split_url.path + if split_url.query: + path_to_match += '?' + split_url.query + if self.url_mapping.get(path_to_match): + return self.url_mapping[path_to_match] + return self.default_content + + @staticmethod + def _mocked_resp(url, text_resp, *args, **kwargs): + return HTTPResponse( + code=200, + read=text_resp, + headers=Headers(), + geturl=url, + original_url=url, + ) + + +class NetworkPatcher: + """ + Context manager used for mocking the whole network. It uses MockedServer + for patching. + """ + def __init__(self, mock_domain=None, mocked_server=None, plugin_instance=None): + """ + :param dict mock_domain: pytest fixture to mock requests to + specific domain + :param MockedServer mocked_server: + :param Plugin plugin_instance: the plugin instance + """ + self.mock_domain = mock_domain + self.mocked_server = mocked_server or MockedServer(url_mapping=mock_domain) + self.plugin_instance = plugin_instance + self.patchers = [] + + def __enter__(self): + # all non-js plugins + patcher = patch( + 'w3af.core.data.url.extended_urllib.ExtendedUrllib.GET', + self.mocked_server.mock_GET, + ) + patcher.start() + self.patchers.append(patcher) + + # all chrome (js) plugins + chrome_patcher = patch( + 'w3af.core.controllers.chrome.instrumented.main.InstrumentedChrome.load_url', + self.mocked_server.mock_chrome_load_url(), + ) + chrome_patcher.start() + self.patchers.append(chrome_patcher) + + post_patcher = patch( + 'w3af.core.data.url.extended_urllib.ExtendedUrllib.POST', + self.mocked_server.mock_POST, + ) + self.patchers.append(post_patcher) + post_patcher.start() + + from w3af.plugins.crawl.web_spider import web_spider + if self.plugin_instance and isinstance(self.plugin_instance, web_spider): + self._handle_web_spider_plugin() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for patcher in self.patchers: + try: + patcher.stop() + except RuntimeError: + pass + return False + + def _handle_web_spider_plugin(self): + from w3af.core.controllers.core_helpers.fingerprint_404 import fingerprint_404_singleton + self.plugin_instance._target_domain = 'example.com' + self.plugin_instance._first_run = False + mocked_404_singleton = fingerprint_404_singleton(cleanup=True) + mocked_404_singleton.set_url_opener(ExtendedUrllib()) + self.plugin_instance.set_url_opener(ExtendedUrllib()) + from w3af.core.controllers.threads.threadpool import Pool + self.plugin_instance.set_worker_pool(Pool()) + + +def patch_network(func): + """ + NetworkPatcher decorator + """ + def decorating_function(*args, **kwargs): + with NetworkPatcher(): + return func(*args, **kwargs) + return decorating_function diff --git a/w3af/plugins/tests/test_plugin_testing_tools.py b/w3af/plugins/tests/test_plugin_testing_tools.py new file mode 100644 index 0000000000..9136c78972 --- /dev/null +++ b/w3af/plugins/tests/test_plugin_testing_tools.py @@ -0,0 +1,67 @@ +import pytest +from mock import MagicMock, call, patch + +from w3af.core.data.url.extended_urllib import ExtendedUrllib +from w3af.plugins.tests.plugin_testing_tools import NetworkPatcher, patch_network + +""" +Unit tests for plugin_testing_tools.py +""" + + +@pytest.fixture +def network_patcher(): + return NetworkPatcher() + + +class TestNetworkPatcher: + def setup_class(self): + self.url_opener = ExtendedUrllib() + + def test_it_works_and_hits_mocked_server(self): + mocked_server = MagicMock() + network_patcher = NetworkPatcher(mocked_server=mocked_server) + with network_patcher: + self.url_opener.GET(MagicMock()) + assert call.mock_GET in mocked_server.method_calls + + def test_it_works_for_post_requests(self): + mocked_server = MagicMock() + network_patcher = NetworkPatcher(mocked_server=mocked_server) + with network_patcher: + self.url_opener.POST('http://example.com/', 'data') + assert mocked_server.mock_POST.called + + def test_it_stops_all_patchers(self, network_patcher): + with network_patcher: + pass + for patcher in network_patcher.patchers: + with pytest.raises(RuntimeError): + patcher.stop() + + def test_it_starts_all_patchers(self, network_patcher): + """ + This test additionally tests if __exit__ can handle already stopped patchers + """ + with network_patcher: + for patcher in network_patcher.patchers: + patcher.stop() # no error here + + def test_with_as_works(self): + with NetworkPatcher() as network_patcher: + assert isinstance(network_patcher, NetworkPatcher) + + def test_it_works_as_a_decorator(self): + mocked_server = MagicMock() + network_patcher_from_class = ( + lambda *args, **kwargs: NetworkPatcher(mocked_server=mocked_server) + ) + with patch( + 'w3af.plugins.tests.plugin_testing_tools.NetworkPatcher', + network_patcher_from_class, + ): + @patch_network + def decorated_function(): + self.url_opener.GET(MagicMock()) + decorated_function() + assert mocked_server.mock_GET.called diff --git a/w3af/tests/requirements.txt b/w3af/tests/requirements.txt index c475737f6f..8092dbc969 100644 --- a/w3af/tests/requirements.txt +++ b/w3af/tests/requirements.txt @@ -22,7 +22,6 @@ requests>=2.7.0 # Other mock==1.0.1 psutil==2.2.1 -SOAPpy==0.12.5 Pillow==6.2.0 SimpleCV==1.3 futures==2.1.5