Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unsupported Transceivers #111

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6da04e0
Unsupported transceiver test introduction
adambaumeister Aug 22, 2023
5e82c48
Add unsupported_transceivers to config
adambaumeister Aug 22, 2023
54a49c1
chore: introduce README badges (#112)
FoSix Aug 29, 2023
24acbdb
ci: switch to release token (#110)
FoSix Aug 29, 2023
4b303f9
feat!: check for scheduled update jobs (#106)
FoSix Sep 12, 2023
986d098
chore(release): 0.2.0
semantic-release-bot Sep 13, 2023
3f7450a
chore(deps): bump cycjimmy/semantic-release-action from 3 to 4 (#113)
dependabot[bot] Sep 14, 2023
b035eb1
chore(deps): bump actions/checkout from 3 to 4 (#114)
dependabot[bot] Sep 14, 2023
a11efcc
Minor fixes
adambaumeister Sep 15, 2023
792bdb3
Add supported_sfp_regex argument
adambaumeister Sep 19, 2023
34ad992
Change wording in docstring
adambaumeister Sep 19, 2023
e790bda
feat(FirewallProxy)!: add a constructor (#119)
FoSix Sep 20, 2023
bf99542
chore(release): 0.3.0
semantic-release-bot Sep 20, 2023
5fbd7b9
chore(deps): bump docker/login-action from 2 to 3 (#115)
dependabot[bot] Sep 25, 2023
3b0d0d8
chore(deps): bump docker/build-push-action from 4 to 5 (#117)
dependabot[bot] Sep 25, 2023
27db006
chore(deps): bump docker/metadata-action from 4 to 5 (#116)
dependabot[bot] Sep 25, 2023
50dd7ac
Unsupported transceiver test introduction
adambaumeister Aug 22, 2023
37f2e13
Add unsupported_transceivers to config
adambaumeister Aug 22, 2023
dceac19
Minor fixes
adambaumeister Sep 15, 2023
050765e
Add supported_sfp_regex argument
adambaumeister Sep 19, 2023
b0766a4
Change wording in docstring
adambaumeister Sep 19, 2023
4605fe2
Fix annotation syntax for python <3.11
adambaumeister Sep 28, 2023
175907b
Merge remote-tracking branch 'origin/adam-feat-unsupported-transceive…
adambaumeister Sep 28, 2023
a91f333
Update docs and fix lint issues
adambaumeister Oct 5, 2023
b186eba
Black formatting
adambaumeister Oct 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/readiness_checks/run_readiness_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"candidate_config",
"active_support",
"jobs",
"unsupported_transceivers",
# checks below have optional configuration
{"ha": {"skip_config_sync": True, "ignore_non_functional": True}},
{"content_version": {"version": "8635-7675"}},
Expand Down
95 changes: 75 additions & 20 deletions panos_upgrade_assurance/check_firewall.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from typing import Optional, Union, List, Dict
from math import ceil
from datetime import datetime
Expand Down Expand Up @@ -95,6 +96,7 @@ def __init__(self, node: FirewallProxy, skip_force_locale: Optional[bool] = Fals
CheckType.MP_DP_CLOCK_SYNC: self.check_mp_dp_sync,
CheckType.CERTS: self.check_ssl_cert_requirements,
CheckType.JOBS: self.check_non_finished_jobs,
CheckType.UNSUPPORTED_TRANSCEIVERS: self.check_unsupported_transceivers
}
if not skip_force_locale:
locale.setlocale(
Expand Down Expand Up @@ -152,9 +154,9 @@ def check_panorama_connectivity(self) -> CheckResult:
return CheckResult(status=CheckStatus.ERROR, reason="Device not configured with Panorama.")

def check_ha_status(
self,
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False,
self,
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False,
) -> CheckResult:
"""Checks HA pair status from the perspective of the current device.

Expand Down Expand Up @@ -203,9 +205,9 @@ def check_ha_status(
result.reason = f"Both devices have the same state: {ha_pair['local-info']['state']}."

elif (
not skip_config_sync
and interpret_yes_no(ha_pair["running-sync-enabled"])
and ha_pair["running-sync"] != "synchronized"
not skip_config_sync
and interpret_yes_no(ha_pair["running-sync-enabled"])
and ha_pair["running-sync"] != "synchronized"
):
result.status = CheckStatus.ERROR
result.reason = "Device configuration is not synchronized between the nodes."
Expand All @@ -219,9 +221,9 @@ def check_ha_status(
return result

def check_is_ha_active(
self,
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False,
self,
skip_config_sync: Optional[bool] = False,
ignore_non_functional: Optional[bool] = False,
) -> CheckResult:
"""Checks whether this is an active node of an HA pair.

Expand Down Expand Up @@ -294,7 +296,8 @@ def check_expired_licenses(self, skip_licenses: Optional[list] = []) -> CheckRes

"""
if not isinstance(skip_licenses, list):
raise exceptions.WrongDataTypeException(f"The skip_licenses variable is a {type(skip_licenses)} but should be a list")
raise exceptions.WrongDataTypeException(
f"The skip_licenses variable is a {type(skip_licenses)} but should be a list")

result = CheckResult()
try:
Expand Down Expand Up @@ -364,10 +367,10 @@ def check_active_support_license(self) -> CheckResult:
return result

def check_critical_session(
self,
source: Optional[str] = None,
destination: Optional[str] = None,
dest_port: Optional[Union[str, int]] = None,
self,
source: Optional[str] = None,
destination: Optional[str] = None,
dest_port: Optional[Union[str, int]] = None,
) -> CheckResult:
"""Check if a critical session is present in the sessions table.

Expand Down Expand Up @@ -699,7 +702,7 @@ def check_free_disk_space(self, image_version: Optional[str] = None) -> CheckRes
if free_space_panrepo > minimum_free_space:
result.status = CheckStatus.SUCCESS
else:
result.reason = f"There is not enough free space, only {str(round(free_space_panrepo/1024,1)) + 'G' if free_space_panrepo >= 1024 else str(free_space_panrepo) + 'M'}B is available."
result.reason = f"There is not enough free space, only {str(round(free_space_panrepo / 1024, 1)) + 'G' if free_space_panrepo >= 1024 else str(free_space_panrepo) + 'M'}B is available."
return result

def check_mp_dp_sync(self, diff_threshold: int = 0) -> CheckResult:
Expand Down Expand Up @@ -860,7 +863,7 @@ def check_ssl_cert_requirements(self, rsa: dict = {}, ecdsa: dict = {}) -> Check
return result

if (cert_key_size < (rsa_min_key_size if cert_algorithm == "RSA" else ecdsa_min_key_size)) or (
cert_hash.value < (rsa_min_hash.value if cert_algorithm == "RSA" else ecdsa_min_hash.value)
cert_hash.value < (rsa_min_hash.value if cert_algorithm == "RSA" else ecdsa_min_hash.value)
):
failed_certs.append(f"{cert_name} (size: {cert_key_size}, hash: {cert_hash_method})")

Expand Down Expand Up @@ -903,6 +906,57 @@ def check_non_finished_jobs(self) -> CheckResult:
result.reason = "No jobs found on device. This is unusual, please investigate."
return result

def check_unsupported_transceivers(self, supported_sfp_regex: Optional[List[str]] = None) -> CheckResult:
"""Check for any Optical Transceivers (SFPs or otherwise) that aren't supported by Palo Alto Networks.

# Parameters

supported_sfp_regex (list, optional): List of supported transceivers, as regex strings, to mark SFP's as
supported even if they aren't OEM.

# Returns

CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \
value of:

* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) When all optics are OEM and
PAN supported
* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason`
field contains information about which Slots and Physical ports currently have unsupported transceivers installed
* [`CheckStatus.SKIPPED`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there are no transceiver
slots at all.
"""
result = CheckResult()

system_state = self._node.get_system_state()

compiled_regex = []
if supported_sfp_regex:
compiled_regex = [re.compile(regex_string) for regex_string in supported_sfp_regex]

no_sfp_interfaces = True
bad_interfaces = []
for key, value in system_state.items():
if re.match(r"sys\.s[0-9]+\.p[0-9]+\.phy", key):
if "'sfp':" in value and "'vendor-name': OEM" not in value:
if not any(regex.search(value) for regex in compiled_regex):
bad_interfaces.append(key)

if "'sfp'" in value:
no_sfp_interfaces = False

if bad_interfaces:
result.reason = f"The following interfaces have non-Palo Alto Networks supported transceivers installed: {', '.join(bad_interfaces)}"
return result

if no_sfp_interfaces:
result.status = CheckStatus.SKIPPED
result.reason = "No SFP Interfaces were found, or no SFP Transceivers were present in the system."
return result

result.status = CheckStatus.SUCCESS
return result

def get_content_db_version(self) -> Dict[str, str]:
"""Get Content DB version.

Expand Down Expand Up @@ -949,9 +1003,9 @@ def get_ip_sec_tunnels(self) -> Dict[str, dict]:
return self._node.get_tunnels().get("IPSec", {})

def run_readiness_checks(
self,
checks_configuration: Optional[List[Union[str, dict]]] = None,
report_style: bool = False,
self,
checks_configuration: Optional[List[Union[str, dict]]] = None,
report_style: bool = False,
) -> Union[Dict[str, dict], Dict[str, str]]:
"""Run readiness checks.

Expand Down Expand Up @@ -994,7 +1048,8 @@ def run_readiness_checks(
check_result = self._check_method_mapping[check_type](
**check_config
) # (**) would pass dict config values as separate parameters to method.
result[check_type] = str(check_result) if report_style else {"state": bool(check_result), "reason": str(check_result)}
result[check_type] = str(check_result) if report_style else {"state": bool(check_result),
"reason": str(check_result)}

return result

Expand Down
31 changes: 31 additions & 0 deletions panos_upgrade_assurance/firewall_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,3 +1167,34 @@ def get_jobs(self) -> dict:
results[jid] = job

return results

def get_system_state(self) -> dict[str, str]:
"""Gets the entire output of the show system state command.

Show system state returns low level information about PAN-OS and the attributes of the system. Note that this function
does not parse the data structures beyond the first level.

The actual API command is `show system state`.

# Returns

dict: Each item from the state, where the key is the item key and the value is the value as a string.

```python showLineNumbers title="Sample output"
{
local.name: "mp"
local.octeon: "{ }"
local.ppid: "0"
local.role: "mp"
local.slot: "1"
}
adambaumeister marked this conversation as resolved.
Show resolved Hide resolved
```
"""
result = {}
show_system_state_str = self.op_parser(cmd="show system state", return_xml=True)
for line in show_system_state_str.text.split("\n"):
clean_line = line.strip()
key, value = clean_line.split(":")[0], ":".join(clean_line.split(":")[1:]).strip()
result[key] = value

return result
1 change: 1 addition & 0 deletions panos_upgrade_assurance/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class CheckType:
MP_DP_CLOCK_SYNC = "planes_clock_sync"
CERTS = "certificates_requirements"
JOBS = "jobs"
UNSUPPORTED_TRANSCEIVERS = "unsupported_transceivers"


class SnapType:
Expand Down
90 changes: 90 additions & 0 deletions tests/test_check_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,96 @@ def test_check_ntp_synchronization_synched_unknown(self, check_firewall_mock):
reason="NTP synchronization in unknown state: unknown."
)

def test_check_unsupported_transceivers_not_oem(self, check_firewall_mock):
check_firewall_mock._node.get_system_state.return_value = {
'': '',
'local.brdagent': '{ }',
'local.family': 'vm',
'local.info': "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }",
'local.model': 'PA-VM', 'local.name': 'mp', 'local.octeon': '{ }',
'local.ppid': '0',
'local.role': 'mp',
'local.slot': '1',
'sys.s5.p1.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
'sys.s6.p2.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
}
assert check_firewall_mock.check_unsupported_transceivers() == CheckResult(
reason="The following interfaces have non-Palo Alto Networks supported transceivers installed: sys.s5.p1.phy, sys.s6.p2.phy",
)

def test_check_unsupported_transceivers_custom_supported_regex_match(self, check_firewall_mock):
check_firewall_mock._node.get_system_state.return_value = {
'': '',
'local.brdagent': '{ }',
'local.family': 'vm',
'local.info': "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }",
'local.model': 'PA-VM', 'local.name': 'mp', 'local.octeon': '{ }',
'local.ppid': '0',
'local.role': 'mp',
'local.slot': '1',
'sys.s5.p1.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
'sys.s6.p2.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
}
supported_regex = [
"FCBN4"
]
assert check_firewall_mock.check_unsupported_transceivers(supported_sfp_regex=supported_regex) == CheckResult(
status=CheckStatus.SUCCESS
)

def test_check_unsupported_transceivers_custom_supported_regex_miss(self, check_firewall_mock):
check_firewall_mock._node.get_system_state.return_value = {
'': '',
'local.brdagent': '{ }',
'local.family': 'vm',
'local.info': "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }",
'local.model': 'PA-VM', 'local.name': 'mp', 'local.octeon': '{ }',
'local.ppid': '0',
'local.role': 'mp',
'local.slot': '1',
'sys.s5.p1.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
'sys.s6.p2.phy': "{ 'duration': 3969, 'last-sample': 1970-01-01 08:00:00, 'link-partner': { }, 'media': QSFP-Plus-Fiber, 'sfp': { 'ch1': { 'rx-power': 0.00 mW, }, 'ch2': { 'rx-power': 0.00 mW, }, 'ch3': { 'rx-power': 0.00 mW, }, 'ch4': { 'rx-power': 0.00 mW, }, 'connector': Reserved, 'diagnostic-monitor': Yes, 'encoding': 64B66B, 'ex-spec-compliance-code': 0x0, 'identifier': QSFPP, 'link-len-km': 0 km, 'link-len-om1': 0 m, 'link-len-om2': 10 m, 'link-len-om3': 10 m, 'link-len-om4': 20 m, 'rx-power-alarm-hi': 0.00 mW, 'rx-power-alarm-lo': 0.00 mW, 'rx-power-warn-hi': 0.00 mW, 'rx-power-warn-lo': 0.00 mW, 'transceiver': S dist,SN,M5, 'vendor-name': FINISAR CORP , 'vendor-part-number': FCBN410QB1C10 , 'vendor-part-rev': B , 'vendor-serial-number': YYYYYYY , }, 'type': Ethernet, }",
}
supported_regex = [
"BADREGEX"
]
assert check_firewall_mock.check_unsupported_transceivers(supported_sfp_regex=supported_regex) == CheckResult(
reason="The following interfaces have non-Palo Alto Networks supported transceivers installed: sys.s5.p1.phy, sys.s6.p2.phy",
)

def test_check_unsupported_transceivers_all_supported(self, check_firewall_mock):
check_firewall_mock._node.get_system_state.return_value = {
'': '',
'local.brdagent': '{ }',
'local.family': 'vm',
'local.info': "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }",
'local.model': 'PA-VM', 'local.name': 'mp', 'local.octeon': '{ }',
'local.ppid': '0',
'local.role': 'mp',
'local.slot': '1',
'sys.s5.p1.phy': "{ 'link-partner': { }, 'media': SFP-Plus-Fiber, 'sfp': { 'connector': LC, 'encoding': Reserved, 'identifier': SFP, 'transceiver': 10000B-SR, 'vendor-name': OEM , 'vendor-part-number': PAN-SFP-PLUS-SR , 'vendor-part-rev': B4 , }, 'type': Ethernet, }",
}
assert check_firewall_mock.check_unsupported_transceivers() == CheckResult(
status=CheckStatus.SUCCESS
)

def test_check_unsupported_transceivers_no_sfp(self, check_firewall_mock):
check_firewall_mock._node.get_system_state.return_value = {
'': '',
'local.brdagent': '{ }',
'local.family': 'vm',
'local.info': "{ 'family': vm, 'model': PA-VM, 'name': mp, 'ppid': 0, 'role': mp, 'slot': 1, }",
'local.model': 'PA-VM', 'local.name': 'mp', 'local.octeon': '{ }',
'local.ppid': '0',
'local.role': 'mp',
'local.slot': '1',
'sys.s4.p20.phy': "{ 'link-partner': { }, 'media': CAT5, 'type': Ethernet, }",
}
assert check_firewall_mock.check_unsupported_transceivers() == CheckResult(
status=CheckStatus.SKIPPED,
reason="No SFP Interfaces were found, or no SFP Transceivers were present in the system."
)

def test_check_arp_entry_none(self, check_firewall_mock):
assert check_firewall_mock.check_arp_entry(ip=None) == CheckResult(
CheckStatus.SKIPPED, reason="Missing ARP table entry description."
Expand Down
Loading
Loading