From b1374e9d2daa77da6bcb59748275df5d7737b312 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Wed, 3 Sep 2025 11:33:25 +0200 Subject: [PATCH 1/9] Fix typos and small clarifications --- fortigate_api/fortigate_base.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index 9c9e813..6e8743b 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -20,6 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) HTTPS = "https" +HTTP = "http" PORT_443 = 443 PORT_80 = 80 TIMEOUT = 15 @@ -251,7 +252,7 @@ def _init_port(self, **kwargs) -> int: """Init port, 443 for scheme=`https`, 80 for scheme=`http`.""" if port := int(kwargs.get("port") or 0): return port - if self.scheme == "http": + if self.scheme == HTTP: return PORT_80 return PORT_443 @@ -316,9 +317,9 @@ def _valid_url(self, url: str) -> str: def _init_scheme(**kwargs) -> str: """Init scheme `https` or `http`.""" scheme = str(kwargs.get("scheme") or HTTPS) - expected = ["https", "http"] + expected = [HTTPS, HTTP] if scheme not in expected: - raise ValueError(f"{scheme=}, {expected=}.") + raise ValueError(f"{scheme=!r}, {expected=!r}.") return scheme @@ -328,7 +329,7 @@ def _init_token(**kwargs) -> str: if not token: return "" if kwargs.get("username"): - raise ValueError("Mutually excluded: username, token.") + raise ValueError("A username and a token are mutually exclusive.") if kwargs.get("password"): - raise ValueError("Mutually excluded: password, token.") + raise ValueError("A password and a token are mutually exclusive.") return token From bd6df4f390edb3a4a31f9ef7bdfd1296b1567972 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Wed, 3 Sep 2025 11:36:53 +0200 Subject: [PATCH 2/9] logic simplifications and more robustness --- fortigate_api/fortigate_base.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index 6e8743b..2923d00 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -6,7 +6,7 @@ import logging import re from typing import Callable, Iterable, Optional -from urllib.parse import urlencode, urljoin +from urllib.parse import urlencode, urljoin, urlunparse import requests from requests import Session, Response @@ -82,7 +82,7 @@ def __repr__(self): host = self.host username = self.username scheme = self.scheme - port = self.port if not (scheme == HTTPS and self.port == PORT_443) else "" + port = self.port timeout = self.timeout verify = self.verify vdom = self.vdom @@ -124,11 +124,7 @@ def is_connected(self) -> bool: @property def url(self) -> str: """Return URL to the Fortigate.""" - if self.scheme == HTTPS and self.port == 443: - return f"{self.scheme}://{self.host}" - if self.scheme == "http" and self.port == 80: - return f"{self.scheme}://{self.host}" - return f"{self.scheme}://{self.host}:{self.port}" + return urlunparse((self.scheme, f"{self.host}:{self.port}", "", "", "", "")) # ============================ login ============================= @@ -189,7 +185,6 @@ def logout(self) -> None: ) except SSLError: pass - del self._session self._session = None # =========================== helpers ============================ @@ -303,12 +298,10 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response: def _valid_url(self, url: str) -> str: """Return a valid URL string. - Add `https://` to `url` if it is absent and remove trailing `/` character. + Ensures no trailing `/` character. """ - if re.match("http(s)?://", url): - return url.rstrip("/") - path = url.strip("/") - return urljoin(self.url, path) + full_url = urljoin(self.url, url) + return full_url.rstrip("/") # =========================== helpers ============================ From 4219171310439629433340dd5ee0fca466a03c06 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Wed, 3 Sep 2025 11:37:54 +0200 Subject: [PATCH 3/9] robust host(name) checking --- fortigate_api/fortigate_base.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index 2923d00..32e7199 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -2,6 +2,7 @@ from __future__ import annotations +import ipaddress import json import logging import re @@ -60,7 +61,7 @@ def __init__(self, **kwargs): :param bool logging_error: Logging only the REST API response with error. `True` - Enable errors logging, `False` - otherwise. Default is `False`. """ - self.host = str(kwargs.get("host")) + self.host = _init_host(**kwargs) self.username = str(kwargs.get("username")) self.password = str(kwargs.get("password")) self.token = _init_token(**kwargs) @@ -315,7 +316,24 @@ def _init_scheme(**kwargs) -> str: raise ValueError(f"{scheme=!r}, {expected=!r}.") return scheme - +def _init_host(**kwargs) -> str: + """Init host valid hostname or valid IP""" + host = str(kwargs.get("host")) + if re.fullmatch(r"\d+\.\d+\.\d+\.\d+", host) or ":" in host: + try: + ipaddress.ip_address(host) + return host + except ValueError: + raise ValueError(f"{host=!r}, not a valid ip address") + if len(host) > 255: + raise ValueError(f"{host=!r}, hostname is too long.") + if host[-1] == ".": + host = host[:-1] + allowed = re.compile(r"^(?!-)[A-Z\d-]{1,63}(? str: """Init token.""" token = str(kwargs.get("token") or "") From 3519777cd9e599e9355b648d70a1a75ae5939588 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Wed, 3 Sep 2025 16:06:30 +0200 Subject: [PATCH 4/9] increase code robustness --- fortigate_api/fortigate_base.py | 19 +++++-------------- tests/test__fortigate_base.py | 21 --------------------- 2 files changed, 5 insertions(+), 35 deletions(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index 32e7199..ed451f8 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -125,7 +125,7 @@ def is_connected(self) -> bool: @property def url(self) -> str: """Return URL to the Fortigate.""" - return urlunparse((self.scheme, f"{self.host}:{self.port}", "", "", "", "")) + return urlunparse((self.scheme, f"{self.host}:{self.port}", "/", "", "", "")) # ============================ login ============================= @@ -143,7 +143,7 @@ def login(self) -> None: if self.token: try: response: Response = session.get( - url=f"{self.url}/api/v2/monitor/system/status", + url=urljoin(self.url, "/api/v2/monitor/system/status"), headers=self._bearer_token(), verify=self.verify, ) @@ -156,7 +156,7 @@ def login(self) -> None: # password try: response = session.post( - url=f"{self.url}/logincheck", + url=urljoin(self.url, "/logincheck"), data=urlencode([("username", self.username), ("secretkey", self.password)]), timeout=self.timeout, verify=self.verify, @@ -180,7 +180,7 @@ def logout(self) -> None: if not self.token: try: self._session.get( - url=f"{self.url}/logout", + url=urljoin(self.url, "/logout"), timeout=self.timeout, verify=self.verify, ) @@ -278,7 +278,7 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response: :rtype: Response """ params: DAny = { - "url": self._valid_url(url), + "url": urljoin(self.url, url), "params": urlencode([("vdom", self.vdom)]), "timeout": self.timeout, "verify": self.verify, @@ -296,15 +296,6 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response: raise self._hide_secret_ex(ex) return response - def _valid_url(self, url: str) -> str: - """Return a valid URL string. - - Ensures no trailing `/` character. - """ - full_url = urljoin(self.url, url) - return full_url.rstrip("/") - - # =========================== helpers ============================ diff --git a/tests/test__fortigate_base.py b/tests/test__fortigate_base.py index 636e945..1f36279 100644 --- a/tests/test__fortigate_base.py +++ b/tests/test__fortigate_base.py @@ -199,27 +199,6 @@ def test__init_port(api: FortiGate, kwargs, scheme, expected): with pytest.raises(expected): api._init_port(**kwargs) - -@pytest.mark.parametrize("kwargs, url, expected", [ - ({}, QUERY, f"https://host/{QUERY}"), - ({}, f"/{QUERY}/", f"https://host/{QUERY}"), - ({}, f"https://host/{QUERY}", f"https://host/{QUERY}"), - ({}, f"https://host/{QUERY}/", f"https://host/{QUERY}"), - ({"port": 80}, QUERY, f"https://host:80/{QUERY}"), - ({"port": 80}, f"https://host:80/{QUERY}", f"https://host:80/{QUERY}"), - ({"scheme": "http", "port": 80}, QUERY, f"http://host/{QUERY}"), - ({"scheme": "http", "port": 80}, f"http://host/{QUERY}", f"http://host/{QUERY}"), -]) -def test__valid_url(kwargs, url, expected): - """FortiGateBase._valid_url()""" - default = dict(host="host", username="username", password="", port=443) - kwargs_ = {**default, **kwargs} - fgt = FortiGate(**kwargs_) - - actual = fgt._valid_url(url=url) - assert actual == expected - - # =========================== helpers ============================ @pytest.mark.parametrize("kwargs, expected", [ From c94762ba4e0b9faaf849c7f478c60eb62fadeef3 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Wed, 3 Sep 2025 16:38:52 +0200 Subject: [PATCH 5/9] fix tests for simplified port logic --- tests/test__fortigate_base.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/test__fortigate_base.py b/tests/test__fortigate_base.py index 1f36279..b9e1fc1 100644 --- a/tests/test__fortigate_base.py +++ b/tests/test__fortigate_base.py @@ -23,9 +23,9 @@ def api() -> FortiGate: @pytest.mark.parametrize("kwargs, expected", [ # username password (dict(host="a", username="b", password="c"), - "FortiGate(host='a', username='b')"), + "FortiGate(host='a', username='b', port=443)"), (dict(host="a", username="b", password="c", scheme="https", port=443, timeout=15, vdom="root"), - "FortiGate(host='a', username='b')"), + "FortiGate(host='a', username='b', port=443)"), (dict(host="a", username="b", password="c", port=80), "FortiGate(host='a', username='b', port=80)"), (dict(host="a", username="b", password="c", scheme="https", port=80), @@ -33,17 +33,17 @@ def api() -> FortiGate: (dict(host="a", username="b", password="c", scheme="http", port=80), "FortiGate(host='a', username='b', scheme='http', port=80)"), (dict(host="a", username="b", password="c", timeout=1), - "FortiGate(host='a', username='b', timeout=1)"), + "FortiGate(host='a', username='b', port=443, timeout=1)"), (dict(host="a", username="b", password="c", vdom="d"), - "FortiGate(host='a', username='b', vdom='d')"), + "FortiGate(host='a', username='b', port=443, vdom='d')"), (dict(host="a", username="b", password="c", vdom="d", timeout=1, port=80), "FortiGate(host='a', username='b', port=80, timeout=1, vdom='d')"), (dict(host="a", username="b", password="c", verify=True), - "FortiGate(host='a', username='b', verify=True)"), + "FortiGate(host='a', username='b', port=443, verify=True)"), (dict(host="a", username="b", password="c", verify=False), - "FortiGate(host='a', username='b')"), + "FortiGate(host='a', username='b', port=443)"), # token - (dict(host="a", token="b"), "FortiGate(host='a', username='')"), + (dict(host="a", token="b"), "FortiGate(host='a', username='', port=443)"), ]) def test__repr__(kwargs, expected): """FortiGateBase.__repr__()""" @@ -82,14 +82,14 @@ def test__is_connected(api: FortiGate, session, expected): @pytest.mark.parametrize("scheme, host, port, expected", [ - ("https", "host", 80, "https://host:80"), - ("https", "127.0.0.255", 80, "https://127.0.0.255:80"), - ("https", "host", 443, "https://host"), - ("https", "127.0.0.255", 443, "https://127.0.0.255"), - ("http", "host", 80, "http://host"), - ("http", "127.0.0.255", 80, "http://127.0.0.255"), - ("http", "host", 443, "http://host:443"), - ("http", "127.0.0.255", 443, "http://127.0.0.255:443"), + ("https", "host", 80, "https://host:80/"), + ("https", "127.0.0.255", 80, "https://127.0.0.255:80/"), + ("https", "host", 443, "https://host:443/"), + ("https", "127.0.0.255", 443, "https://127.0.0.255:443/"), + ("http", "host", 80, "http://host:80/"), + ("http", "127.0.0.255", 80, "http://127.0.0.255:80/"), + ("http", "host", 443, "http://host:443/"), + ("http", "127.0.0.255", 443, "http://127.0.0.255:443/"), ]) def test__url(scheme, host, port, expected): """FortiGateBase.url()""" @@ -110,9 +110,9 @@ def test__login(token, expected, headers): api = FortiGate(host="HOST", token=token) with requests_mock.Mocker() as mock: if token: - mock.get("https://host/api/v2/monitor/system/status") + mock.get("https://host:443/api/v2/monitor/system/status") else: - mock.post("https://host/logincheck") + mock.post("https://host:443/logincheck") with patch("fortigate_api.FortiGate._get_token_from_cookies", return_value=expected): api.login() From 0a5e88b8e5959fac8bde9b1a2441f9a28da19a91 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Fri, 5 Sep 2025 15:14:29 +0200 Subject: [PATCH 6/9] correctly handle ipv6 addresses --- fortigate_api/fortigate_base.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index ed451f8..79266a3 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -310,14 +310,20 @@ def _init_scheme(**kwargs) -> str: def _init_host(**kwargs) -> str: """Init host valid hostname or valid IP""" host = str(kwargs.get("host")) + if not host: + raise ValueError(f"{host=!r}, hostname is not specified.") + if len(host) > 255: + raise ValueError(f"{host=!r}, hostname is too long.") + if host.startswith('[') and host.endswith(']'): + host = host[1:-1] if re.fullmatch(r"\d+\.\d+\.\d+\.\d+", host) or ":" in host: try: - ipaddress.ip_address(host) + ip = ipaddress.ip_address(host) + if isinstance(ip, ipaddress.IPv6Address): + return f"[{ip}]" return host except ValueError: raise ValueError(f"{host=!r}, not a valid ip address") - if len(host) > 255: - raise ValueError(f"{host=!r}, hostname is too long.") if host[-1] == ".": host = host[:-1] allowed = re.compile(r"^(?!-)[A-Z\d-]{1,63}(? Date: Fri, 5 Sep 2025 15:18:29 +0200 Subject: [PATCH 7/9] add tests for host checking function --- tests/test__fortigate_base.py | 44 +++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/test__fortigate_base.py b/tests/test__fortigate_base.py index b9e1fc1..e6da4f1 100644 --- a/tests/test__fortigate_base.py +++ b/tests/test__fortigate_base.py @@ -219,6 +219,50 @@ def test__init_scheme(kwargs, expected): fortigate_base._init_scheme(**kwargs) +@pytest.mark.parametrize("kwargs, expected", [ + (dict(host=""), ValueError), + (dict(host=" "), ValueError), + (dict(host=":"), ValueError), + (dict(host=":80"), ValueError), + (dict(host="//"), ValueError), + (dict(host="/path"), ValueError), + (dict(host="///host"), ValueError), + (dict(host="https://example.com"), ValueError), + (dict(host="host..com"), ValueError), + (dict(host="hostname-.com"), ValueError), + (dict(host="ho$stname.com"), ValueError), + (dict(host="host name.com"), ValueError), + (dict(host="hostname." + "a" * 256), ValueError), + (dict(host="256.256.256.256"), ValueError), + (dict(host="192.168.1.999"), ValueError), + (dict(host="[2001:db8:::1]"), ValueError), + (dict(host="[2001:db8::1"), ValueError), + (dict(host="2001:db8::1]"), ValueError), + (dict(host="host<>name.com"), ValueError), + (dict(host="host|name.com"), ValueError), + (dict(host="host^name.com"), ValueError), + (dict(host="host/name.com"), ValueError), + (dict(host="host\\name.com"), ValueError), + (dict(host="localhost:"), ValueError), + (dict(host="localhost:abc"), ValueError), + (dict(host="localhost:99999"), ValueError), + (dict(host="[::1]:999999"), ValueError), + (dict(host="[2001:db8::1]"), "[2001:db8::1]"), + (dict(host="2001:db8::1"), "[2001:db8::1]"), + (dict(host="192.168.1.1"), "192.168.1.1"), + (dict(host="example.com"), "example.com"), + (dict(host="sub.example.com"), "sub.example.com"), +]) +def test__init_host(kwargs, expected): + """FortiGateBase._init_port()""" + if isinstance(expected, str): + actual = fortigate_base._init_host(**kwargs) + assert actual == expected + else: + with pytest.raises(expected): + fortigate_base._init_host(**kwargs) + + @pytest.mark.parametrize("kwargs, expected", [ ({}, ""), (dict(token="token"), "token"), From ef86ebb78ed3d9ce7d335427911f5cb6db96f59e Mon Sep 17 00:00:00 2001 From: Christiandus Date: Fri, 5 Sep 2025 15:28:23 +0200 Subject: [PATCH 8/9] consistency ;) --- fortigate_api/fortigate_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index 79266a3..3a3b26f 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -320,7 +320,7 @@ def _init_host(**kwargs) -> str: try: ip = ipaddress.ip_address(host) if isinstance(ip, ipaddress.IPv6Address): - return f"[{ip}]" + return f"[{host}]" return host except ValueError: raise ValueError(f"{host=!r}, not a valid ip address") From d172baf72145d44fb9fd0ac3747312ac472750d3 Mon Sep 17 00:00:00 2001 From: Christiandus Date: Fri, 5 Sep 2025 15:29:29 +0200 Subject: [PATCH 9/9] fix typo :) --- tests/test__fortigate_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test__fortigate_base.py b/tests/test__fortigate_base.py index e6da4f1..282ee6d 100644 --- a/tests/test__fortigate_base.py +++ b/tests/test__fortigate_base.py @@ -254,7 +254,7 @@ def test__init_scheme(kwargs, expected): (dict(host="sub.example.com"), "sub.example.com"), ]) def test__init_host(kwargs, expected): - """FortiGateBase._init_port()""" + """FortiGateBase._init_host()""" if isinstance(expected, str): actual = fortigate_base._init_host(**kwargs) assert actual == expected