Skip to content

Commit

Permalink
Refactor(plugins): Move filter snmp_hash to PyAVD
Browse files Browse the repository at this point in the history
  • Loading branch information
ClausHolbechArista committed Jun 19, 2024
1 parent 35ef6cd commit 3d27847
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 310 deletions.
125 changes: 15 additions & 110 deletions ansible_collections/arista/avd/plugins/filter/snmp_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,22 @@

__metaclass__ = type

import hashlib

from ansible.errors import AnsibleFilterError

from ansible_collections.arista.avd.plugins.plugin_utils.pyavd_wrappers import RaiseOnUse, wrap_filter

PLUGIN_NAME = "arista.avd.snmp_hash"

try:
from pyavd.j2filters.snmp_hash import snmp_hash
except ImportError as e:
snmp_hash = RaiseOnUse(
AnsibleFilterError(
f"The '{PLUGIN_NAME}' plugin requires the 'pyavd' Python library. Got import error",
orig_exc=e,
)
)

DOCUMENTATION = r"""
---
name: snmp_hash
Expand Down Expand Up @@ -59,114 +71,7 @@
type: string
"""

PRIV_KEY_LENGTH = {"des": 128, "aes": 128, "aes192": 192, "aes256": 256}


def get_hash_object(auth_type: str) -> hashlib._hashlib.HASH:
"""
:param auth_type: a string in [md5|sha|sha224|sha256|sha384|sha512]
:return: an instance of hashlib._hashlib.HASH corresponding to the
auth_type
:raises: AnsibleFilterError, when the auth_type is not valid
"""
auth = "sha1" if auth_type == "sha" else auth_type
try:
return hashlib.new(auth)
except ValueError:
raise AnsibleFilterError(f"{auth_type} is not a valid Auth algorithm for SNMPv3") from ValueError


def key_from_passphrase(passphrase: str, auth_type: str) -> str:
"""
RFC 2574 section A.2 algorithm
https://www.rfc-editor.org/rfc/rfc2574.html#appendix-A2
:param passphrase: the passphrase to use to generate the key
:param auth_type: a string in [md5|sha|sha224|sha256|sha384|sha512]
:return: the key generated from the passphrase using auth_type as per
the algorithm described in the RFC.
:raises: AnsibleFilterError, when the auth_type is not valid
"""
b_passphrase = passphrase
if isinstance(passphrase, str):
b_passphrase = passphrase.encode("UTF-8", errors="strict")
hash_object = get_hash_object(auth_type)
count = 0
password_index = 0
password_length = len(b_passphrase)
while count < 1048576:
cp = bytearray()
for index in range(0, 64):
cp.append(b_passphrase[password_index % password_length])
password_index += 1
hash_object.update(cp)
count += 64
return hash_object.hexdigest()


def localize_passphrase(passphrase: str, auth_type: str, engine_id: str, priv_type: str = None) -> str:
"""
Key localization as described in RFC 2574, section 2.6
https://www.rfc-editor.org/rfc/rfc2574.html#section-2.6
:param passphrase: the passphrase to localize, if priv_type is None
it is the auth passphrase else it is the priv
passphrase
:param auth_type: a string in [md5|sha|sha224|sha256|sha384|sha512]
:param engine_id: an hexadecimal string containing the engine_id to be
used to localize the key
:param auth_type: an optional argument indicating the priv algorithm
in [des|aes|aes192|aes256]
:return: the localized key generated from the passphrase using auth_type
and if required truncated to match the appropriate keylength for
the priv_type.
:raises: AnsibleFilterError, when the auth_type or priv_type is not valid
or if the engined_id is not a proper hexadecimal string
"""

key = bytes.fromhex(key_from_passphrase(passphrase, auth_type))
hash_object = get_hash_object(auth_type)
try:
hash_object.update(key + bytes.fromhex(engine_id) + key)
except ValueError as error:
raise AnsibleFilterError(f"engine ID {engine_id} is not an hexadecimal string") from error
localized_key = hash_object.hexdigest()
if priv_type is not None:
try:
while len(localized_key) * 4 < PRIV_KEY_LENGTH[priv_type]:
hash_object = get_hash_object(auth_type)
hash_object.update(bytes.fromhex(localized_key))
localized_key = localized_key + hash_object.hexdigest()
# Truncate ithe key if required
localized_key = localized_key[: PRIV_KEY_LENGTH[priv_type] // 4]
except KeyError as error:
raise AnsibleFilterError(f"{priv_type} is not a valid Priv algorithm for SNMPv3") from error
return localized_key


def hash_passphrase(input_dict):
try:
passphrase = input_dict["passphrase"]
auth_type = input_dict["auth"]
engine_id = input_dict["engine_id"]
except KeyError as exc:
raise AnsibleFilterError() from exc
try:
priv_type = input_dict["priv"]
except KeyError:
priv_type = None

return localize_passphrase(passphrase, auth_type, engine_id, priv_type)


class FilterModule(object):
def filters(self):
return {
"snmp_hash": hash_passphrase,
}
return {"snmp_hash": wrap_filter(PLUGIN_NAME)(snmp_hash)}
196 changes: 0 additions & 196 deletions ansible_collections/arista/avd/tests/unit/filters/test_snmp_hash.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from typing import TYPE_CHECKING

from ....j2filters.natural_sort import natural_sort
from ....j2filters.snmp_hash import snmp_hash
from ....vendor.errors import AristaAvdError, AristaAvdMissingVariableError
from ....vendor.j2.filter.snmp_hash import hash_passphrase
from ....vendor.strip_empties import strip_null_from_data
from ....vendor.utils import get, replace_or_append_item
from .utils import UtilsMixin
Expand All @@ -33,7 +33,7 @@ def snmp_server(self: AvdStructuredConfigBase) -> dict | None:
unique local_engine_id value based on hostname and mgmt_ip facts.
If user.version is set to 'v3', compute_local_engineid and compute_v3_user_localized_key are set to 'True'
we will use hash_passphrase filter to create an instance of hashlib._hashlib.HASH corresponding to the auth_type
we will use snmp_hash filter to create an instance of hashlib HASH corresponding to the auth_type
value based on various snmp_settings.users information.
"""
source_interfaces_inputs = self._source_interfaces.get("snmp")
Expand Down Expand Up @@ -138,7 +138,7 @@ def _snmp_users(self: AvdStructuredConfigBase, snmp_settings: dict, engine_ids:
user_dict["auth"] = auth
if compute_v3_user_localized_key:
hash_filter = {"passphrase": auth_passphrase, "auth": auth, "engine_id": engine_ids["local"]}
user_dict["auth_passphrase"] = hash_passphrase(hash_filter)
user_dict["auth_passphrase"] = snmp_hash(hash_filter)
else:
user_dict["auth_passphrase"] = auth_passphrase

Expand All @@ -148,7 +148,7 @@ def _snmp_users(self: AvdStructuredConfigBase, snmp_settings: dict, engine_ids:
user_dict["priv"] = priv
if compute_v3_user_localized_key:
hash_filter.update({"passphrase": priv_passphrase, "priv": priv})
user_dict["priv_passphrase"] = hash_passphrase(hash_filter)
user_dict["priv_passphrase"] = snmp_hash(hash_filter)
else:
user_dict["priv_passphrase"] = priv_passphrase

Expand Down
Loading

0 comments on commit 3d27847

Please sign in to comment.