diff --git a/molecule/system_access_users/converge.yml b/molecule/system_access_users/converge.yml new file mode 100644 index 00000000..2093e320 --- /dev/null +++ b/molecule/system_access_users/converge.yml @@ -0,0 +1,209 @@ +--- +- name: converge + hosts: all + become: true + tasks: + - name: "test" + ansible.builtin.debug: + msg: "test" + + # Test User minimum requirements + - name: "Test User 1: Test minimum requirements User Creation" + puzzle.opnsense.system_access_users: + username: test_user_1 + password: test_password_1 + + # Test User minimum requirements disabled + - name: "Test User 2: Test disabled User Creation" + puzzle.opnsense.system_access_users: + username: test_user_2 + password: test_password_2 + full_name: "Test User 2: Test disabled User Creation" + disabled: True + + # Test User with Full Name + - name: "Test User 3: Test User Creation with Full Name" + puzzle.opnsense.system_access_users: + username: test_user_3 + password: test_password_3 + full_name: "Test User 3: Test User Creation with Full Name" + + # Test User with E-Mail + - name: "Test User 4: Test User Creation with E-Mail" + puzzle.opnsense.system_access_users: + username: test_user_4 + password: test_password_4 + email: test_user_4@test.ch + full_name: "Test User 4: Test User Creation with E-Mail" + + # Test User with Comment + - name: "Test User 5: Test User Creation with Comment" + puzzle.opnsense.system_access_users: + username: test_user_5 + password: test_password_5 + comment: Test User 5 Comment + full_name: "Test User 5: Test User Creation with Comment" + + # Test User with Preferred landing page + - name: "Test User 6: Test User Creation with Preferred landing page" + puzzle.opnsense.system_access_users: + username: test_user_6 + password: test_password_6 + landing_page: /ui/ipsec/sessions + full_name: "Test User 6: Test User Creation with Preferred landing page" + + # Test User with nologin shell + - name: "Test User 7: Test User Creation with nologin shell" + puzzle.opnsense.system_access_users: + username: test_user_7 + password: test_password_7 + shell: /sbin/nologin + full_name: "Test User 7: Test User Creation with nologin shell" + + # Test User with csh shell + - name: "Test User 8: Test User Creation with csh shell" + puzzle.opnsense.system_access_users: + username: test_user_8 + password: test_password_8 + shell: /bin/csh + full_name: "Test User 8: Test User Creation with csh shell" + + # Test User with sh shell + - name: "Test User 9: Test User Creation with sh shell" + puzzle.opnsense.system_access_users: + username: test_user_9 + password: test_password_9 + shell: /bin/sh + full_name: "Test User 9: Test User Creation with sh shell" + + # Test User with tcsh shell + - name: "Test User 10: Test User Creation with tcsh shell" + puzzle.opnsense.system_access_users: + username: test_user_10 + password: test_password_10 + shell: /bin/tcsh + full_name: "Test User 10: Test User Creation with tcsh shell" + + # Test User with Expiration date + - name: "Test User 11: Test User Creation with Expiration date" + puzzle.opnsense.system_access_users: + username: test_user_11 + password: test_password_11 + expires: 02/27/2024 + full_name: "Test User 11: Test User Creation with Expiration date" + + # Test User with group as string + - name: "Test User 12: Test User Creation with group as string" + puzzle.opnsense.system_access_users: + username: test_user_12 + password: test_password_12 + full_name: "Test User 12: Test User Creation with group as string" + groups: admins + + # Test User with group as list + - name: "Test User 13: Test User Creation with group as list" + puzzle.opnsense.system_access_users: + username: test_user_13 + password: test_password_13 + full_name: "Test User 13: Test User Creation with group as list" + groups: + - admins + + # Test User with not existing group as list + - name: "Test User 14: Test User Creation with not existing group as list" + puzzle.opnsense.system_access_users: + username: test_user_14 + password: test_password_14 + full_name: "Test User 14: Test User Creation with not existing group as list" + groups: + - test + register: test_user_14_result + ignore_errors: yes + + - name: "Verify that the user creation failed due to non-existing group" + ansible.builtin.assert: + that: + - test_user_14_result is failed + fail_msg: "User creation should fail due to non-existing group" + success_msg: "User creation failed as expected due to non-existing group" + + # Test User with empty otp_seed + - name: "Test User 15: Test User Creation with empty otp_seed" + puzzle.opnsense.system_access_users: + username: test_user_15 + password: test_password_15 + otp_seed: "" + full_name: "Test User 15: Test User Creation with empty otp_seed" + + # Test User with otp_seed + - name: "Test User 16: Test User Creation with otp_seed" + puzzle.opnsense.system_access_users: + username: test_user_16 + password: test_password_16 + otp_seed: test_seed + full_name: "Test User 16: Test User Creation with otp_seed" + + # Test User with empty authorizedkeys + - name: "Test User 17: Test User Creation with empty authorizedkeys" + puzzle.opnsense.system_access_users: + username: test_user_17 + password: test_password_17 + authorizedkeys: "" + full_name: "Test User 17: Test User Creation with empty authorizedkeys" + + # Test User with authorizedkeys + - name: "Test User 18: Test User Creation with authorizedkeys" + puzzle.opnsense.system_access_users: + username: test_user_18 + password: test_password_18 + authorizedkeys: test_authorized_key + full_name: "Test User 18: Test User Creation with authorizedkeys" + + # Test User with empty api_keys + - name: "Test User 19: Test User Creation with empty api_keys" + puzzle.opnsense.system_access_users: + username: test_user_19 + password: test_password_19 + apikeys: "" + full_name: "Test User 19: Test User Creation with empty api_keys" + register: api_keys_result + + - name: Return the created apikeys and secret of Test User 19 + ansible.builtin.debug: + msg: "The following api_keys were created {{ api_keys_result.generated_apikeys }}" + when: + - "'generated_apikeys' in api_keys_result" + - api_keys_result.generated_apikeys | length > 0 + + # Test User with too short api_keys + - name: "Test User 20: Test User Creation with too short api_keys" + puzzle.opnsense.system_access_users: + username: test_user_20 + password: test_password_20 + apikeys: "TEST_API_KEY" + full_name: "Test User 20: Test User Creation with too short api_keys" + register: test_user_20_result + ignore_errors: yes + + - name: "Verify that the user creation failed due to too short api key" + ansible.builtin.assert: + that: + - test_user_20_result is failed + fail_msg: "The API key: TEST_API_KEY is not a valid string. Must be >= 80 characters." + success_msg: "The API key: TEST_API_KEY is not a valid string. Must be >= 80 characters." + + # Test User with valid api_keys + - name: "Test User 21: Test User Creation with valid api_keys" + puzzle.opnsense.system_access_users: + username: test_user_21 + password: test_password_21 + apikeys: "TEST_API_KEY_WITH_RANDOM_CHARS_UNTIL_80_zo5Y3bUpOQFfbQnAOB6GqbHsPAP9Jqbjofnqu9xc" + full_name: "Test User 21: Test User Creation with valid api_keys" + register: api_keys_result + + - name: Return the created apikeys and secret of Test User 21 + ansible.builtin.debug: + msg: "The following api_keys were created {{ api_keys_result.generated_apikeys }}" + when: + - "'generated_apikeys' in api_keys_result" + - api_keys_result.generated_apikeys | length > 0 \ No newline at end of file diff --git a/molecule/system_access_users/molecule.yml b/molecule/system_access_users/molecule.yml new file mode 100644 index 00000000..6af085f8 --- /dev/null +++ b/molecule/system_access_users/molecule.yml @@ -0,0 +1,69 @@ +--- +scenario: + name: system_access_users + test_sequence: + # - dependency not relevant unless we have requirements + - destroy + - syntax + - create +# - prepare + - converge + - idempotence + #- verify + - cleanup + - destroy + +driver: + name: vagrant + parallel: true + +platforms: + - name: "22.7" + hostname: false + box: puzzle/opnsense + box_version: "22.7" + memory: 1024 + cpus: 2 + instance_raw_config_args: + - 'vm.guest = :freebsd' + - 'ssh.sudo_command = "%c"' + - 'ssh.shell = "/bin/sh"' + - name: "23.1" + box: puzzle/opnsense + hostname: false + box_version: "23.1" + memory: 1024 + cpus: 2 + instance_raw_config_args: + - 'vm.guest = :freebsd' + - 'ssh.sudo_command = "%c"' + - 'ssh.shell = "/bin/sh"' + - name: "23.7" + box: puzzle/opnsense + hostname: false + box_version: "23.7" + memory: 1024 + cpus: 2 + instance_raw_config_args: + - 'vm.guest = :freebsd' + - 'ssh.sudo_command = "%c"' + - 'ssh.shell = "/bin/sh"' + - name: "24.1" + box: puzzle/opnsense + hostname: false + box_version: "24.1" + memory: 1024 + cpus: 2 + instance_raw_config_args: + - 'vm.guest = :freebsd' + - 'ssh.sudo_command = "%c"' + - 'ssh.shell = "/bin/sh"' + +provisioner: + name: ansible +# env: +# ANSIBLE_VERBOSITY: 3 +verifier: + name: ansible + options: + become: true diff --git a/molecule/system_access_users/verify.yml b/molecule/system_access_users/verify.yml new file mode 100644 index 00000000..b447dcf4 --- /dev/null +++ b/molecule/system_access_users/verify.yml @@ -0,0 +1,6 @@ +--- +- name: Verify connectivity to server + hosts: all + tasks: + - name: Ping the server + ansible.builtin.ping: diff --git a/plugins/module_utils/module_index.py b/plugins/module_utils/module_index.py index f599a373..b4a8a7c9 100644 --- a/plugins/module_utils/module_index.py +++ b/plugins/module_utils/module_index.py @@ -92,6 +92,29 @@ }, }, }, + "system_access_users": { + "users": "system/user", + "uid": "system/nextuid", + "gid": "system/nextgid", + "system": "system", + "php_requirements": [ + "/usr/local/etc/inc/system.inc", + ], + "configure_functions": {}, + }, + "password": { + "php_requirements": [ + "/usr/local/etc/inc/auth.inc", + ], + "configure_functions": { + "name": "echo password_hash", + "configure_params": [ + "'password'", + "PASSWORD_BCRYPT", + "[ 'cost' => 11 ]", + ], + }, + }, }, "23.1": { "system_settings_general": { @@ -156,6 +179,29 @@ }, }, }, + "system_access_users": { + "users": "system/user", + "uid": "system/nextuid", + "gid": "system/nextgid", + "system": "system", + "php_requirements": [ + "/usr/local/etc/inc/system.inc", + ], + "configure_functions": {}, + }, + "password": { + "php_requirements": [ + "/usr/local/etc/inc/auth.inc", + ], + "configure_functions": { + "name": "echo password_hash", + "configure_params": [ + "'password'", + "PASSWORD_BCRYPT", + "[ 'cost' => 11 ]", + ], + }, + }, }, "23.7": { "system_settings_general": { @@ -220,6 +266,29 @@ }, }, }, + "system_access_users": { + "users": "system/user", + "uid": "system/nextuid", + "gid": "system/nextgid", + "system": "system", + "php_requirements": [ + "/usr/local/etc/inc/system.inc", + ], + "configure_functions": {}, + }, + "password": { + "php_requirements": [ + "/usr/local/etc/inc/auth.inc", + ], + "configure_functions": { + "name": "echo password_hash", + "configure_params": [ + "'password'", + "PASSWORD_BCRYPT", + "[ 'cost' => 11 ]", + ], + }, + }, }, "24.1": { "system_settings_general": { @@ -285,5 +354,28 @@ }, }, }, + "system_access_users": { + "users": "system/user", + "uid": "system/nextuid", + "gid": "system/nextgid", + "system": "system", + "php_requirements": [ + "/usr/local/etc/inc/system.inc", + ], + "configure_functions": {}, + }, + "password": { + "php_requirements": [ + "/usr/local/etc/inc/auth.inc", + ], + "configure_functions": { + "name": "echo password_hash", + "configure_params": [ + "'password'", + "PASSWORD_BCRYPT", + "[ 'cost' => 11 ]", + ], + }, + }, }, } diff --git a/plugins/module_utils/system_access_users_utils.py b/plugins/module_utils/system_access_users_utils.py new file mode 100644 index 00000000..b22bd2f5 --- /dev/null +++ b/plugins/module_utils/system_access_users_utils.py @@ -0,0 +1,953 @@ +# Copyright: (c) 2024, Puzzle ITC, Kilian Soltermann +# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +This module manages user and group configurations within an OPNsense system. It provides +functionalities for handling user attributes and group memberships, utilizing data classes and +XML manipulation. Key features include creation, update, and deletion of user records, +secure password management, API key generation, and comprehensive error handling. + +Classes: +- `User`: Manages individual user accounts with functionalities such as XML serialization and + initialization from Ansible module parameters. +- `Group`: Manages group attributes and membership operations with XML interaction capabilities. +- `UserSet`: Handles bulk operations on users and groups, ensuring consistent state across the + system configuration. + +Exceptions are defined for handling specific group and API key validation errors, enhancing +the module's robustness in configuration management tasks. + +Designed for Ansible integration, specifically targeting the OPNsense firewall system, this +module provides a structured approach to system access management. + +Copyright: (c) 2024, Puzzle ITC, Kilian Soltermann +Licensed under the GNU General Public License v3.0+ +(see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt). +""" + + +from dataclasses import dataclass, asdict, fields +from enum import Enum +from typing import List, Optional +import base64 +import os +import binascii + +from xml.etree.ElementTree import Element, ElementTree + +from ansible_collections.puzzle.opnsense.plugins.module_utils import ( + xml_utils, + opnsense_utils, +) +from ansible_collections.puzzle.opnsense.plugins.module_utils.config_utils import ( + OPNsenseModuleConfig, +) + + +class OPNSenseGroupNotFoundError(Exception): + """ + Exception raised when an OPNsense group is not found. + """ + + +class OPNSenseNotValidBase64APIKeyError(Exception): + """ + Exception raised when a not valid base32 api code is provided + """ + + +class OPNSenseCryptReturnError(Exception): + """ + Exception raised when the return value of the instance is not what is expected + """ + + +class ListEnum(Enum): + """Enum class with some handy utility functions.""" + + @classmethod + def as_list(cls) -> List[str]: + """ + Return a list + Returns + ------- + + """ + return [entry.value for entry in cls] + + @classmethod + def from_string(cls, value: str) -> "ListEnum": + """ + Returns Enum value, from a given String. + If no enum value can be mapped to the input string, + ValueError is raised. + Parameters + ---------- + value: `str` + String to be mapped to enum value + + Returns + ------- + Enum value + """ + for _key, _value in cls.__members__.items(): + if value in (_key, _value.value): + return _value + raise ValueError(f"'{cls.__name__}' enum not found for '{value}'") + + +class UserLoginShell(ListEnum): + """Represents the user login shell.""" + + NOLOGIN = "/sbin/nologin" + CSH = "/bin/csh" + SH = "/bin/sh" + TCSH = "/bin/tcsh" + + +@dataclass +class Group: + """ + Represents a Group entity with various attributes. + + Args: + name (str): The name of the group. + description (str): A description of the group. + scope (Optional[str]): The scope of the group, if specified. + priv (Optional[str]): Privileges associated with the group, if applicable. + gid (Optional[str]): The group's unique identifier, if provided. + member (Optional[List[str]]): List of member usernames in the group, if any. + + Methods: + from_xml(element: Element): Creates a Group instance from an XML Element. + to_etree(self): Converts the Group instance to an XML Element. + remove_user(self): Removes a user from the group. + check_if_user_in_group(self, user: "User"): Checks if a user is already in the group. + add_user(self, user: "User"): Adds a user to the group. + + The Group class is designed to represent group entities with various attributes commonly used + in system configurations. It provides methods for creating from XML, converting to XML, + checking if a user is in the group, and adding/removing a user to/from the group. + """ + + name: str + description: str + scope: Optional[str] = None + priv: Optional[str] = None + gid: Optional[str] = None + member: Optional[List[str]] = None + + @staticmethod + def from_xml(element: Element) -> "Group": + """Creates a Group instance from an XML Element.""" + + group_dict: dict = xml_utils.etree_to_dict(element)["group"] + + if "member" in group_dict and isinstance(group_dict["member"], str): + group_dict["member"] = [group_dict["member"]] + + return Group(**group_dict) + + def to_etree(self) -> Element: + """Converts the Group instance to an XML Element.""" + + group_dict: dict = asdict(self) + + element: Element = xml_utils.dict_to_etree("group", group_dict)[0] + + return element + + def check_if_user_in_group(self, user: "User") -> bool: + """ + Checks if a user is already in the group. + + Args: + user (User): The User object to check if they are in the group. + + Returns: + bool: True if the user is in the group, False otherwise. + """ + + if self.member and user.uid in self.member: + return True + + return False + + def add_user(self, user: "User") -> None: + """ + Adds a user to the group. + + Args: + user (User): The User object to add to the group. + + This function adds a user to the group by appending their UID to the group's member list. + """ + + if not isinstance(self.member, list): + self.member = [self.member] if self.member else [] + + self.member.append(user.uid) + + def remove_user(self, user: "User") -> None: + """ + Removes a user from the group. + + Args: + user (User): The User object to remove from the group. + + This function removes a user from the group by removing their UID + from the group's member list. + """ + + if not isinstance(self.member, list): + # Convert self.member to a list if it's not already a list. + # If self.member is None or empty, this will set it to an empty list. + self.member = [self.member] if self.member else [] + + # Check if the user's UID is in the member list, then remove it. + if user.uid in self.member: + self.member.remove(user.uid) + + +# pylint: disable=too-many-instance-attributes +@dataclass +class User: + """ + Represents a User entity with various attributes. + + Args: + name (str): The username of the user. + password (str): The user's password. + scope (Optional[str]): The scope of the user, default is "User". + descr (Optional[str]): A description of the user, if available. + ipsecpsk (Optional[str]): IPsec pre-shared key, if applicable. + otp_seed (Optional[str]): OTP seed for two-factor authentication, if used. + shell (Optional[UserLoginShell]): The user's login shell, if specified. + uid (Optional[str]): The user's unique identifier. + disabled (bool): Whether the user is disabled (default is False). + full_name (Optional[str]): The user's full name, if available. + email (Optional[str]): The user's email address, if provided. + comment (Optional[str]): Additional comments or information about the user. + landing_page (Optional[str]): The landing page for the user, if specified. + expires (Optional[str]): The expiration date for the user, if set. + authorizedkeys (Optional[str]): Authorized SSH keys for the user, if applicable. + cert (Optional[str]): Certificate information for the user, if relevant. + apikeys (Optional[list[str]]): API key associated with the user, if any. Will be generated + if "" is provided + groupname (Optional[list[str]]): List of group names the user belongs to, if any. + + Methods: + __eq__(self, other): Compare two User objects, excluding sensitive fields. + to_etree(self): Convert User instance to an XML Element. + from_ansible_module_params(cls, params): Create a User from Ansible module parameters. + from_xml(element): Create a User from an XML Element. + __post_init__(self): Process post-initialization tasks. + set_otp_seed(self, otp_seed=None): Generate or encode OTP seed. + set_apikeys(self, apikeys=None): Generate or set API keys. + set_authorizedkeys(self, authorizedkeys=None): Encode authorized SSH keys. + + The User class is designed to represent user entities with various attributes commonly used in + system configurations. It provides methods for comparing, converting to XML, creating from + Ansible module parameters, and creating from XML representations. + """ + + name: str + password: str + scope: Optional[str] = "User" + descr: Optional[str] = None + ipsecpsk: Optional[str] = None + otp_seed: Optional[str] = None + shell: Optional[UserLoginShell] = UserLoginShell.NOLOGIN + uid: Optional[str] = None + disabled: bool = False + full_name: Optional[str] = None + email: Optional[str] = None + comment: Optional[str] = None + landing_page: Optional[str] = None + expires: Optional[str] = None + authorizedkeys: Optional[str] = None + cert: Optional[str] = None # will be handled in seperate module + apikeys: Optional[List[str]] = None + groupname: Optional[List[str]] = None + + def __eq__(self, other) -> bool: + if not isinstance(other, User): + return False + + for field in fields(self): + if field.name not in ["password", "uid", "otp_seed", "apikeys"]: + if getattr(self, field.name) != getattr(other, field.name): + return False + + return True + + def __post_init__(self) -> None: + # Manually define the fields and their expected types + enum_fields = { + "shell": UserLoginShell, + } + + for field_name, field_type in enum_fields.items(): + value = getattr(self, field_name) + + # Check if the value is a string and the field_type is a subclass of ListEnum + if isinstance(value, str) and issubclass(field_type, ListEnum): + # Convert string to ListEnum + setattr(self, field_name, field_type.from_string(value)) + + def set_otp_seed(self, otp_seed: str = None) -> str: + """ + Generates and returns a base32-encoded OTP seed. + + Args: + otp_seed (str, optional): Existing OTP seed to encode (default: None). + + Returns: + str: Base32-encoded OTP seed. + + If no OTP seed is provided, a random seed is generated and encoded as base32. + """ + + if otp_seed is None: + otp_seed = os.urandom(20) + + return base64.b32encode(otp_seed.encode("utf-8")).decode("utf-8") + + def _generate_hashed_secret(self, secret: str) -> str: + """ + function to generate hashed secrets using crypt + """ + + # load requirements + php_requirements = [] + configure_function = "echo crypt" + configure_params = [f"'{secret}'", "'$6$'"] + + # set user password + hashed_secret_value = opnsense_utils.run_function( + php_requirements=php_requirements, + configure_function=configure_function, + configure_params=configure_params, + ) + + # check if stderr returns value + if hashed_secret_value.get("stderr"): + raise OPNSenseCryptReturnError("error encounterd while creating secret") + + # validate secret + if ( + hashed_secret_value.get("stdout").startswith("$6$") + and len(hashed_secret_value.get("stdout")) == 90 + ): + return hashed_secret_value.get("stdout") + + # if validation fails, + raise OPNSenseCryptReturnError( + f""" + validation of the secret failed! + Secret must start with $6$ and have a min length of 90 + Value: {hashed_secret_value} + """ + ) + + def set_apikeys(self, apikeys: list = None) -> list: + """ + Generates a list of dictionaries, each containing a 'key' and a 'secret'. + If apikeys is provided, each element in apikeys is used as the 'key', + and a new 'secret' is generated. + If apikeys is not provided or is an empty list, a single 'key'-'secret' pair is generated. + + Args: + apikeys (list, optional): A list of strings to be used as the 'key' part of each pair. + + Returns: + list: A list of dictionaries, where each dictionary has a 'key' and a 'secret'. + """ + + api_keys = [] + + # Check if apikeys is None or contains an empty string + if apikeys is None or "" in apikeys: + key = base64.b64encode(os.urandom(60)).decode("utf-8") + secret = base64.b64encode(os.urandom(60)).decode("utf-8") + api_keys.append({"key": key, "secret": secret}) + else: + for api_key in apikeys: + secret = base64.b64encode(os.urandom(60)).decode("utf-8") + try: + base64.b64decode(api_key) + api_keys.append({"key": api_key, "secret": secret}) + except binascii.Error as binascii_error_message: + raise OPNSenseNotValidBase64APIKeyError( + f"The API key: {api_key} is not a valid base64 string. " + f"Error: {str(binascii_error_message)}" + ) from binascii_error_message + + return api_keys + + def set_authorizedkeys(self, authorizedkeys: str = None) -> Optional[str]: + """ + Encodes the authorized SSH keys as base32. + + Args: + authorizedkeys (str, optional): SSH keys to encode (default: None). + + Returns: + str: Base32-encoded authorized SSH keys. + + Encodes the provided SSH keys as base32. If no keys are provided, + an empty string is returned. + """ + + if authorizedkeys: + return base64.b64encode(authorizedkeys.encode("utf-8")).decode("utf-8") + + return None + + def to_etree(self) -> Element: + """ + Converts the User instance to an XML Element. + + This method serializes the User object into an XML Element, filtering out + None or False values except for specific fields. It handles special cases + for fields that are instances of ListEnum by converting their values to + their corresponding enum values. Boolean values are converted to "1" for + True, and fields with None values are removed unless they are part of a + predefined list of exceptions. + + Returns: + Element: An XML Element representing the serialized User object, ready + for inclusion in an XML document. + + This approach ensures that the XML representation is compact and adheres to + the expected schema, with consideration for optional fields and data types. + """ + + user_dict: dict = asdict(self) + + for user_key, user_val in user_dict.copy().items(): + + if user_val is None and user_key in [ + "expires", + "ipsecpsk", + "otp_seed", + "authorizedkeys", + ]: + continue + + if isinstance(user_val, list) and user_key == "apikeys": + # Modify the apikeys directly into the list of items + user_dict[user_key] = [ + { + "item": { + key_name: ( + self._generate_hashed_secret(secret_value) + if key_name == "secret" + and not secret_value.startswith("$6$") + else secret_value + ) + for key_name, secret_value in api_key_dict.items() + } + } + for api_key_dict in user_val + ] + + if issubclass(type(user_val), ListEnum): + user_dict[user_key] = user_val.value + + elif user_val is None or user_val is False: + del user_dict[user_key] + continue + + elif isinstance(user_val, bool): + user_dict[user_key] = "1" + + element: Element = xml_utils.dict_to_etree("user", user_dict)[0] + + return element + + @classmethod + def from_ansible_module_params(cls, params: dict) -> "User": + """ + Creates a User instance from Ansible module parameters. + + Args: + params (dict): Parameters from an Ansible module, expected to contain + user attributes such as 'username', 'password', etc. + + Returns: + User: An instance of the User class initialized with the provided parameters. + Fields not provided are omitted from initialization. + + This method processes parameters typically received from an Ansible module, + handling optional attributes and setting the password securely if provided. + """ + + user_dict = { + "disabled": params.get("disabled"), + "name": params.get("username"), + "password": params.get("password"), + "descr": params.get("full_name"), + "scope": params.get("scope"), + "ipsecpsk": params.get("ipsecpsk"), + "otp_seed": ( + cls.set_otp_seed(cls, otp_seed=params.get("otp_seed")) + if params.get("otp_seed") is not None + else None + ), + "shell": params.get("shell"), + "uid": params.get("uid"), + "full_name": params.get("full_name"), + "email": params.get("email"), + "comment": params.get("comment"), + "landing_page": params.get("landing_page"), + "expires": params.get("expires"), + "groupname": params.get("groups"), + "authorizedkeys": ( + cls.set_authorizedkeys(cls, authorizedkeys=params.get("authorizedkeys")) + if params.get("authorizedkeys") + else None + ), + "cert": params.get("cert"), + "apikeys": ( + cls.set_apikeys(cls, apikeys=params.get("apikeys")) + if params.get("apikeys") + else None + ), + } + + user_dict = { + key: value for key, value in user_dict.items() if value is not None + } + + return cls(**user_dict) + + @staticmethod + def from_xml(element: Element) -> "User": + """ + Converts an XML element into a User object. + + Parameters: + element (Element): An XML element representing a user, with child elements + for each user attribute. + + Returns: + User: A User object initialized with the data extracted from the XML element. + + This method extracts data from an XML element, handling different data types appropriately, + such as converting single group names into a list and interpreting the + 'disabled' field as a boolean. + """ + + user_dict: dict = xml_utils.etree_to_dict(element)["user"] + + if "groupname" in user_dict and isinstance(user_dict["groupname"], str): + user_dict["groupname"] = [user_dict["groupname"]] + + # Handle 'disabled' element + user_dict["disabled"] = user_dict.get("disabled", "0") == "1" + + if "apikeys" in user_dict and isinstance(user_dict["apikeys"], str): + apikeys_elements = user_dict["apikeys"].get("item", []) + if isinstance(apikeys_elements, dict): + apikeys_elements = [apikeys_elements] + user_dict["apikeys"] = [item["key"] for item in apikeys_elements] + + return User(**user_dict) + + +class UserSet(OPNsenseModuleConfig): + """ + Represents a collection of user and group configurations within the OPNsense system, + facilitating the management of users and groups through direct manipulation of the system's + configuration file. + + The UserSet class provides a high-level interface to add, update, delete, and find users and + groups in the system's configuration, abstracting the complexities of direct XML manipulation. + It ensures that changes to users and groups are consistent and coherent, maintaining the + integrity of the system's access control and configuration. + + Upon initialization, the class loads existing user and group configurations from the specified + configuration file path, allowing for subsequent operations to reflect the current state of the + system accurately. The class offers methods to perform CRUD (Create, Read, Update, Delete) + operations on user and group entities, alongside utility methods to check for changes and save + updates back to the configuration file. + + Attributes: + _users (List[User]): A list of User objects representing the users currently managed by + the system. + _groups (List[Group]): A list of Group objects representing the groups currently managed + by the system. + + Methods: + __init__(self, path: str): Initializes a new UserSet instance, loading users and groups + from the specified configuration file. + _load_users(self): Loads users from the system configuration into the _users list. + _load_groups(self): Loads groups from the system configuration into the _groups list. + add_or_update(self, user: User): Adds a new user or updates an existing one in the system. + delete(self, user: User): Removes a specified user from the system's configuration. + find(self, **kwargs): Searches for and returns a user matching specified criteria. + save(self): Saves changes made to users or groups back to the system's configuration file. + + Usage: + The UserSet class is intended for use within the OPNsense system's configuration management + tools, providing a structured and safe approach to modifying user and group settings. + + Note: + Modifications made through UserSet instances are not persisted automatically. The `save` + method must be called to write changes back to the configuration file. + """ + + _users: List[User] + + def __init__(self, path: str = "/conf/config.xml"): + super().__init__( + module_name="system_access_users", + config_context_names=["system_access_users", "password"], + path=path, + ) + self._users = self._load_users() + self._groups = self._load_groups() + self._config_xml_tree = self._load_config() + + def _load_users(self) -> List[User]: + """ + Loads user data from the system's configuration and converts it into a list of User objects. + + This method accesses the 'system' element of the configuration, searching for all 'user' + elements. Each found 'user' element represents a user configuration within the system. + The method collects these elements, and for each one, it creates a User object by parsing + the XML data into the structured format defined by the User data class. + + The conversion process relies on the `from_xml` class method of the User, which interprets + the XML data and initializes a User object with the corresponding attributes extracted from + the XML element. + + Returns: + List[User]: A list of User objects representing all users found in the system's + configuration. If no users are found, an empty list is returned. + + Note: + This method is intended to be used internally within the class to refresh or initialize + the in-memory representation of users based on the current state of the system's + configuration. + """ + + element_tree_users: Element = self.get("system") + + element_tree_users.findall("user") + + user_list = [] + for user in element_tree_users: + if user.tag == "user": + user_list.append(user) + + return [User.from_xml(user_data) for user_data in user_list] + + def _load_groups(self) -> List: + """ + Loads and returns a list of Group objects from the system's configuration XML. + + This method parses the system's configuration file to extract information about groups, + creating a list of Group objects. Each group found within the 'system' configuration + section is instantiated as a Group object based on its XML representation. + + Returns: + List[Group]: A list of Group objects representing all groups found in the system's + configuration file. The groups are extracted by searching for 'group' + tags within the 'system' section of the configuration XML. + + The process involves searching the XML for all 'group' elements, collecting these elements + into a list, and then transforming each XML element into a Group object using the static + method `Group.from_xml`. This method is critical for initializing the internal state of + the system with the current group configurations as defined in the configuration file. + + Note: + The method assumes that the 'system' element of the configuration XML is already loaded + and accessible via the `self.get("system")` call, which should return the relevant + XML section for parsing. + """ + + element_tree_groups: Element = self.get("system") + + element_tree_groups.findall("group") + + group_list = [] + for group in element_tree_groups: + if group.tag == "group": + group_list.append(group) + + return [Group.from_xml(group_data) for group_data in group_list] + + @property + def changed(self) -> bool: + """ + Evaluates whether there have been changes to user or group configurations that are not yet + reflected in the saved system configuration. This property serves as a check to determine + if updates have been made in memory to the user or group lists that differ from what is + currently persisted in the system's configuration files. + + Returns: + bool: True if there are changes to the user or group configurations that have not been + persisted yet; False otherwise. + + The method works by comparing the current in-memory representations of users and groups + against the versions loaded from the system's configuration files. A difference in these + lists indicates that changes have been made in the session that have not been saved, thus + prompting the need for a save operation to update the system configuration accordingly. + + Note: + This property should be consulted before performing a save operation to avoid + unnecessary writes to the system configuration when no changes have been made. + """ + + return self._load_users() != self._users or self._load_groups() != self._groups + + def _update_user_groups(self, user: User, existing_user: Optional[User] = None): + """ + Manages the association of a user with specified groups, either by updating the groups of an + existing user or adding a new user to the appropriate groups. This method ensures that the + user is a member of all specified groups, adding the user to any groups they are not already + a part of, and maintains the integrity of group memberships across updates. + If user.groupname is None, the user is removed from all groups. + + Parameters: + user (User): The user whose group memberships are to be updated. This includes both new + users and users whose group memberships might change. + existing_user (Optional[User]): If the user already exists, this parameter should be the + user's current information. It is used to determine if + the existing group memberships need to be updated. + + Raises: + OPNSenseGroupNotFoundError: If a specified group does not exist on the instance, this + exception is raised, indicating the need for corrective + action or error handling. + """ + target_user = existing_user if existing_user else user + + if user.groupname is None or not hasattr(user, "groupname"): + for existing_group in self._groups: + if existing_group.check_if_user_in_group(target_user): + existing_group.remove_user(target_user) + return # Exit the method after removing the user from all groups. + + # Convert groupname to a list if it's not already. + group_names = ( + user.groupname if isinstance(user.groupname, list) else [user.groupname] + ) + + for group_name in group_names: + group_found = False + for index, existing_group in enumerate(self._groups): + if existing_group.name == group_name: + group_found = True + if not existing_group.check_if_user_in_group(target_user): + existing_group.add_user(target_user) + self._groups[index] = existing_group + break # Stop searching once the group is found + + if not group_found: + # Group was not found, raise an exception + raise OPNSenseGroupNotFoundError( + f"Group '{group_name}' not found on Instance" + ) + + def set_user_password(self, user: User) -> None: + """ + Sets the user's password using specified PHP and configuration functions. + """ + + # load requirements + php_requirements = self._config_maps["password"]["php_requirements"] + configure_function = self._config_maps["password"]["configure_functions"][ + "name" + ] + configure_params = self._config_maps["password"]["configure_functions"][ + "configure_params" + ] + + # format parameters + formatted_params = [ + ( + param.replace("'password'", f"'{user.password}'") + if "password" in param + else param + ) + for param in configure_params + ] + + # set user password + user.password = opnsense_utils.run_function( + php_requirements=php_requirements, + configure_function=configure_function, + configure_params=formatted_params, + ).get("stdout") + + # since "password" is no longer needed, it can be popped + self._config_maps.pop("password") + + def add_or_update(self, user: User) -> None: + """ + Adds a new user to the system or updates an existing user's information, ensuring that group + associations are correctly managed. This method determines whether the provided user already + exists within the system. If the user exists, it updates the user's details and group + associations; if the user does not exist, it assigns a unique user ID and adds the user to + the system. + + The method handles the assignment of user IDs and updates the internal tracking of the next + available ID. It also manages group memberships by updating group associations for both new + and existing users as necessary. + + Parameters: + user (User): The user object to add or update. This object should contain all relevant + information about the user, including username, password, and any group + memberships. + + Note: + This operation directly affects the internal list of users managed by this instance, + reflecting changes immediately in the system's state. However, persistent storage or + external system updates must be handled separately to ensure that changes remain + effective across sessions or reboots. + + Returns: + None: This method does not return a value but updates the internal state to include or + modify the specified user's information. + """ + + existing_user: Optional[User] = next( + (u for u in self._users if u.name == user.name), None + ) + next_uid: Element = self.get("uid") + + # since the current password of an user cannot not be compared with the new one, + # we're setting the password anyways + self.set_user_password(user) + + if existing_user: + # Update groups if needed + self._update_user_groups(user, existing_user) + + # Update existing user's attributes + existing_user.__dict__.update(user.__dict__) + else: + # Assign UID if not set + if not user.uid: + user.uid = next_uid.text + # Increase the next_uid + self.set(value=str(int(next_uid.text) + 1), setting="uid") + + if user.groupname: + # Update groups for the new user + self._update_user_groups(user) + # Add the new user + self._users.append(user) + + def delete(self, user: User) -> None: + """ + Removes a specified user from the internal list of managed users. + + This method filters out the specified user from the current list of users managed by this + instance. It iterates over the list of users and retains only those that do not match the + user to be deleted. This approach ensures that the specified user is effectively removed + from the list, reflecting the deletion operation. + + It's important to note that this operation directly modifies the internal state of the + instance by updating the list of users to exclude the specified user. However, this method + does not handle the persistence of these changes to any external storage or configuration + files. Any required persistence mechanism should be handled separately, ensuring that the + deletion has the intended effect across sessions or system states. + + Parameters: + user (User): The user object to be removed from the list of managed users. + + Returns: + None: This method does not return a value but updates the internal list of users. + """ + + self._users = [r for r in self._users if r != user] + + def find(self, **kwargs) -> Optional[User]: + """ + Searches for a user matching specified criteria within the stored user list. + + This method iterates over the collection of users managed by the instance, evaluating each + user against the provided keyword arguments. The comparison is performed by ensuring all + specified attributes of a user match the corresponding values given in `kwargs`. + + The method employs a flexible approach, allowing for the search of users based on any number + of attributes, such as name, group, or any other user-specific detail that is available as + an attribute of the User objects. + + If a user meeting all the specified criteria is found, that User object is returned. If no + matching user is found after checking all users in the collection, the method returns None, + indicating the absence of a user with the specified attributes. + + Parameters: + **kwargs: Variable keyword arguments representing the attributes and their expected + values for the user to match. + + Returns: + Optional[User]: The User object that matches the criteria, or None if no match is found. + """ + + for user in self._users: + match = all( + getattr(user, key, None) == value for key, value in kwargs.items() + ) + if match: + return user + return None + + def save(self) -> bool: + """ + Saves updated configuration to the XML file if changes are detected. + + Initially checks for modifications via the `changed` attribute. If unchanged, it returns + False, indicating no save operation was necessary. For changes, the XML configuration tree + is updated accordingly. + + Retrieves the 'system' element using `_config_map`, removing old 'user' and 'group' elements + to clear outdated configurations. It then repopulates 'system' with updated configurations + for users and groups, converting each to an XML element via `to_etree()` method. + + After updating, it writes the changes to the file system with UTF-8 encoding and XML + declaration. Subsequently, the configuration file is reloaded to update the internal state + with the new changes. + + Concludes by returning True to indicate successful change persistence. + + Returns: + bool: True if changes were successfully saved, False if no changes occurred. + """ + + if not self.changed: + return False + + # Assuming self._config_maps["system_access_users"]["system"] + # gives you the path to the 'system' element + filter_element: Element = self._config_xml_tree.find( + self._config_maps["system_access_users"]["system"] + ) + + # Remove specific child elements (e.g., 'user', 'group') from filter_element + for user_element in list( + filter_element.findall("user") + ): # Use list() to avoid modification during iteration + filter_element.remove(user_element) + + for group_element in list(filter_element.findall("group")): + filter_element.remove(group_element) + + # Now, add the updated elements back directly to filter_element + filter_element.extend([group.to_etree() for group in self._groups]) + filter_element.extend([user.to_etree() for user in self._users]) + + # Write the updated XML tree to the file + tree: ElementTree = ElementTree(self._config_xml_tree) + + tree.write(self._config_path, encoding="utf-8", xml_declaration=True) + + # Reload the configuration to reflect the updated changes + self._config_xml_tree = self._load_config() + + return True diff --git a/plugins/modules/system_access_users.py b/plugins/modules/system_access_users.py new file mode 100644 index 00000000..3bd91e6e --- /dev/null +++ b/plugins/modules/system_access_users.py @@ -0,0 +1,268 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2024, Puzzle ITC, Kilian Soltermann +# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) + + +"""system_access_users module: Read, write, edit operations for OPNsense Users """ + +# pylint: disable=duplicate-code +__metaclass__ = type + +# https://docs.ansible.com/ansible/latest/dev_guide/developing_modules_documenting.html +# fmt: off + +DOCUMENTATION = r''' +--- +module: system_access_users +short_description: Manage OPNsense users +description: + - This module allows you to manage users on an OPNsense firewall. +author: + - Kilian Soltermann (@killuuuhh) +version_added: "1.0.0" +options: + username: + description: + - The username of the OPNsense user. + required: true + type: str + password: + description: + - The password of the OPNsense user. + required: true + type: str + disabled: + description: + - Indicates whether the user account should be disabled. + required: false + default: false + type: bool + full_name: + description: + - The full name of the OPNsense user. + required: false + type: str + email: + description: + - The email address of the OPNsense user. + required: false + type: str + comment: + description: + - Additional comments or notes for the OPNsense user. + required: false + type: str + landing_page: + description: + - The landing page for the OPNsense user. + required: false + type: str + shell: + description: + - The shell for the OPNsense user. + required: false + type: str + expires: + description: + - The expiration date for the OPNsense user account. + required: false + type: str + groups: + description: + - A list of groups the OPNsense user belongs to. + required: false + type: list + elements: str + apikeys: + description: + - A list of apikeys for an OPNsense User. Generates new apikey if "" is provided. + required: false + type: list + elements: str + otp_seed: + description: + - The otp_seed of a OPNsense user. + required: false + type: str + authorizedkeys: + description: + - The authorizedkeys of a OPNsense user. + required: false + type: str + scope: + description: + - The scope of the OPNsense user. + required: false + type: str + uid: + description: + - The UID of the OPNsense user. + required: false + type: str + state: + description: + - The desired state of the OPNsense user. + required: false + choices: + - present + - absent + default: present + type: str +''' + +EXAMPLES = r''' +- name: Add OPNsense user + puzzle.opnsense.system_access_users: + username: johndoe + password: secret + full_name: John Doe + email: johndoe@example.com + groups: + - admins + state: present + register: result + +- name: Remove OPNsense user + puzzle.opnsense.system_access_users: + username: johndoe + state: absent + register: result +''' + +RETURN = ''' +opnsense_configure_output: + description: A List of the executed OPNsense configure function along with their respective stdout, stderr and rc + returned: always + type: list + sample: + - function: "opnsense_configure_output" + params: [] + rc: 0 + stderr: "" + stderr_lines: [] + stdout: "" + stdout_lines: [] +''' +# fmt: on +from typing import Optional + +from ansible.module_utils.basic import AnsibleModule + +from ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils import ( + User, + UserSet, + OPNSenseGroupNotFoundError, + OPNSenseNotValidBase64APIKeyError, +) + + +ANSIBLE_MANAGED: str = "[ ANSIBLE ]" + + +def main(): + """ + Main function of the system_access_users module + """ + module_args = { + "username": { + "type": "str", + "required": True, + }, + "password": {"type": "str", "required": True, "no_log": True}, + "disabled": {"type": "bool", "default": False}, + "full_name": {"type": "str", "required": False}, + "email": {"type": "str", "required": False}, + "comment": {"type": "str", "required": False}, + "landing_page": {"type": "str", "required": False}, + "shell": {"type": "str", "required": False}, + "expires": {"type": "str", "required": False}, + "otp_seed": {"type": "str", "required": False}, + "authorizedkeys": {"type": "str", "required": False, "no_log": True}, + "groups": {"type": "list", "required": False, "elements": "str"}, + "apikeys": { + "type": "list", + "required": False, + "elements": "str", + "no_log": False, + }, + "scope": {"type": "str", "required": False}, + "uid": {"type": "str", "required": False}, + "state": { + "type": "str", + "default": "present", + "choices": ["present", "absent"], + }, + } + + module: AnsibleModule = AnsibleModule( + argument_spec=module_args, + supports_check_mode=True, + ) + + # https://docs.ansible.com/ansible/latest/reference_appendices/common_return_values.html + # https://docs.ansible.com/ansible/latest/dev_guide/developing_modules_documenting.html#return-block + result = { + "changed": False, + "invocation": module.params, + "diff": None, + } + # make description ansible-managed + description: Optional[str] = module.params["full_name"] + + if description and ANSIBLE_MANAGED not in description: + description = f"{ANSIBLE_MANAGED} - {description}" + else: + description = ANSIBLE_MANAGED + + # since description matches the full_name in GUI + module.params["full_name"] = description + + try: + ansible_user: User = User.from_ansible_module_params(module.params) + + ansible_user_state: str = module.params.get("state") + + with UserSet() as user_set: + if ansible_user_state == "present": + user_set.add_or_update(ansible_user) + elif ansible_user_state == "absent": + user_set.delete(ansible_user) + + if user_set.changed: + result["diff"] = user_set.diff + result["changed"] = True + + if user_set.changed and not module.check_mode: + user_set.save() + result["opnsense_configure_output"] = user_set.apply_settings() + + if ansible_user.apikeys: + result["generated_apikeys"] = [] + for new_generated_api_key in ansible_user.apikeys: + result["generated_apikeys"].append( + f"key={new_generated_api_key['key']}" + ) + result["generated_apikeys"].append( + f"secret={new_generated_api_key['secret']}" + ) + + for cmd_result in result["opnsense_configure_output"]: + if cmd_result["rc"] != 0: + module.fail_json( + msg="Apply of the OPNsense settings failed", + details=cmd_result, + ) + module.exit_json(**result) + + except OPNSenseGroupNotFoundError as opnsense_group_not_found_error_error_message: + module.fail_json(msg=str(opnsense_group_not_found_error_error_message)) + except ( + OPNSenseNotValidBase64APIKeyError + ) as opnsense_not_valid_base64_apikey_error_message: + module.fail_json(msg=str(opnsense_not_valid_base64_apikey_error_message)) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/plugins/module_utils/test_system_access_users_utils.py b/tests/unit/plugins/module_utils/test_system_access_users_utils.py new file mode 100644 index 00000000..68cb2e91 --- /dev/null +++ b/tests/unit/plugins/module_utils/test_system_access_users_utils.py @@ -0,0 +1,651 @@ +# Copyright: (c) 2024, Puzzle ITC, Kilian Soltermann +# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) +# pylint: skip-file +import os +from tempfile import NamedTemporaryFile +from unittest.mock import patch, MagicMock +from xml.etree import ElementTree +from xml.etree.ElementTree import Element + + +import pytest + +from ansible_collections.puzzle.opnsense.plugins.module_utils import xml_utils +from ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils import ( + Group, + User, + UserSet, + UserLoginShell, + OPNSenseGroupNotFoundError, + OPNSenseCryptReturnError, +) +from ansible_collections.puzzle.opnsense.plugins.module_utils.module_index import ( + VERSION_MAP, +) + +# Test version map for OPNsense versions and modules +TEST_VERSION_MAP = { + "OPNsense Test": { + "system_access_users": { + "users": "system/user", + "uid": "system/nextuid", + "gid": "system/nextgid", + "system": "system", + "php_requirements": [ + "/usr/local/etc/inc/system.inc", + ], + "configure_functions": {}, + }, + "password": { + "php_requirements": [ + "/usr/local/etc/inc/auth.inc", + ], + "configure_functions": { + "name": "echo password_hash", + "configure_params": [ + "'password'", + "PASSWORD_BCRYPT", + "[ 'cost' => 11 ]", + ], + }, + }, + } +} + +TEST_XML: str = """ + + + test_name + test_name_2 + + $2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O + user + vagrant + vagrant box management + + + + + /bin/sh + 1000 + + + $2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O + user + test_user_1 + test_user_1 + + + + + AMC39xLYvfD7PyaemZrIVuaWBIdRQVS9NgEHFWzW7+xj0ExFY+07/Vz6HcmUVkJkjb8N0Cg7yEdESvNy + $6$$f8zJvXeCng1iaUCaq8KLvg4tJbGQ.qWKmfgcpytflpGF4AXc4U.N8/TiczM6fu741KBB2PwWUC0k7fzet8asq0 + + + + + /bin/sh + 1001 + + + admins + System Administrators + system + 1999 + 0 + 1000 + 2004 + 2005 + 2006 + 2009 + 2010 + 2014 + page-all + + + test_group + test_group + system + 1000 + 2004 + 2000 + page-all + + + + """ + + +@pytest.fixture(scope="function") +def sample_config_path(request): + """ + Fixture that creates a temporary file with a test XML configuration. + The file is used in the tests. + + Returns: + - str: The path to the temporary file. + """ + with patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", # pylint: disable=line-too-long + return_value="OPNsense Test", + ), patch.dict(VERSION_MAP, TEST_VERSION_MAP, clear=True): + # Create a temporary file with a name based on the test function + with NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(TEST_XML.encode()) + temp_file.flush() + yield temp_file.name + + # Cleanup after the fixture is used + os.unlink(temp_file.name) + + +def test_user_from_xml(): + test_etree_opnsense: Element = ElementTree.fromstring(TEST_XML) + + test_etree_user: Element = list(list(test_etree_opnsense)[0])[2] + test_user: User = User.from_xml(test_etree_user) + + assert test_user.name == "vagrant" + assert ( + test_user.password + == "$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O" + ) + assert test_user.scope == "user" + assert test_user.descr == "vagrant box management" + assert test_user.expires is None + assert test_user.authorizedkeys is None + assert test_user.ipsecpsk is None + assert test_user.otp_seed is None + assert test_user.shell == UserLoginShell.SH + assert test_user.uid == "1000" + + +def test_user_to_etree(): + test_user: User = User( + password="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", + scope="user", + name="vagrant", + descr="vagrant box management", + shell="/bin/sh", + uid="1000", + ) + + test_element = test_user.to_etree() + + orig_etree: Element = ElementTree.fromstring(TEST_XML) + orig_user: Element = list(list(orig_etree)[0])[2] + + assert xml_utils.elements_equal(test_element, orig_user) + + +def test_user_with_api_key_from_xml(): + test_etree_opnsense: Element = ElementTree.fromstring(TEST_XML) + + test_etree_user: Element = list(list(test_etree_opnsense)[0])[3] + test_user: User = User.from_xml(test_etree_user) + + assert test_user.name == "test_user_1" + assert ( + test_user.password + == "$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O" + ) + assert test_user.scope == "user" + assert test_user.descr == "test_user_1" + assert ( + test_user.apikeys["item"]["secret"] + == "$6$$f8zJvXeCng1iaUCaq8KLvg4tJbGQ.qWKmfgcpytflpGF4AXc4U.N8/TiczM6fu741KBB2PwWUC0k7fzet8asq0" + ) + assert ( + test_user.apikeys["item"]["key"] + == "AMC39xLYvfD7PyaemZrIVuaWBIdRQVS9NgEHFWzW7+xj0ExFY+07/Vz6HcmUVkJkjb8N0Cg7yEdESvNy" + ) + assert test_user.expires is None + assert test_user.authorizedkeys is None + assert test_user.ipsecpsk is None + assert test_user.otp_seed is None + assert test_user.shell == UserLoginShell.SH + assert test_user.uid == "1001" + + +def test_user_to_etree(): + test_user: User = User( + password="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", + scope="user", + name="vagrant", + descr="vagrant box management", + shell="/bin/sh", + uid="1000", + ) + + test_element = test_user.to_etree() + + orig_etree: Element = ElementTree.fromstring(TEST_XML) + orig_user: Element = list(list(orig_etree)[0])[2] + + assert xml_utils.elements_equal(test_element, orig_user) + + +def test_user_from_ansible_module_params_simple(sample_config_path): + test_params: dict = { + "username": "vagrant", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + } + + new_test_user: User = User.from_ansible_module_params(test_params) + + assert new_test_user.name == "vagrant" + assert new_test_user.password == "vagrant" + assert new_test_user.scope == "user" + assert new_test_user.descr == "vagrant box management" + assert new_test_user.expires is None + assert new_test_user.authorizedkeys is None + assert new_test_user.ipsecpsk is None + assert new_test_user.shell == UserLoginShell.SH + assert new_test_user.uid == "1000" + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_set_load_simple_user(mocked_version_utils: MagicMock, sample_config_path): + with UserSet(sample_config_path) as user_set: + assert len(user_set._users) == 2 + user_set.save() + + +def test_group_from_xml(): + test_etree_opnsense: Element = ElementTree.fromstring(TEST_XML) + + test_etree_group: Element = list(list(test_etree_opnsense)[0])[4] + test_group: Group = Group.from_xml(test_etree_group) + + assert test_group.name == "admins" + assert test_group.description == "System Administrators" + assert test_group.scope == "system" + assert test_group.member == [ + "0", + "1000", + "2004", + "2005", + "2006", + "2009", + "2010", + "2014", + ] + assert test_group.gid == "1999" + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_set_add_group( + mocked_version_utils: MagicMock, mock_set_password: MagicMock, sample_config_path +): + with UserSet(sample_config_path) as user_set: + test_user: User = user_set.find(name="vagrant") + test_user.groupname = ["admins"] + + user_set.add_or_update(test_user) + + assert user_set.changed + + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + new_test_user: User = new_user_set.find(name="vagrant") + # group: Group = new_user_set + + assert new_test_user.groupname == ["admins"] + assert "1000" in new_user_set._groups[0].member + + new_user_set.save() + + +def test_user_from_ansible_module_params_with_group(sample_config_path): + test_params: dict = { + "username": "vagrant", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + "groups": ["admins"], + } + + new_test_user: User = User.from_ansible_module_params(test_params) + + assert new_test_user.name == "vagrant" + assert new_test_user.password == "vagrant" + assert new_test_user.scope == "user" + assert new_test_user.descr == "vagrant box management" + assert new_test_user.expires is None + assert new_test_user.authorizedkeys is None + assert new_test_user.ipsecpsk is None + assert new_test_user.shell == UserLoginShell.SH + assert new_test_user.uid == "1000" + assert new_test_user.groupname == ["admins"] + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_with_group_as_string( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "test", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + "groups": ["test_group"], + } + + with UserSet(sample_config_path) as user_set: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + assert user_set.changed + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + new_test_user = new_user_set.find(name="test") + + # Adjust the assertions based on the actual implementation of your User and UserSet classes + + assert "test_group" in new_test_user.groupname + + new_user_set.save() + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_with_multiple_groups_as_list( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "test", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + "groups": ["admins", "test_group"], + } + + with UserSet(sample_config_path) as user_set: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + assert user_set.changed + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + new_test_user = new_user_set.find(name="test") + + # Adjust the assertions based on the actual implementation of your User and UserSet classes + + assert ( + "admins" in new_test_user.groupname + and "test_group" in new_test_user.groupname + ) + + new_user_set.save() + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_with_no_groups( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "test", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + } + + with UserSet(sample_config_path) as user_set: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + assert user_set.changed + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + new_test_user = new_user_set.find(name="test") + + assert new_test_user.name == "test" + + new_user_set.save() + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_with_not_existing_group( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "test", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + "groups": ["not_existing_group"], + } + + with UserSet(sample_config_path) as user_set: + with pytest.raises(OPNSenseGroupNotFoundError) as excinfo: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + user_set.save() + + assert "Group 'not_existing_group' not found on Instance" in str(excinfo.value) + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.User.set_authorizedkeys", + return_value="3J35EY37QTNXFFEECJGZ32WVYQC5W4GZ", +) +def test_user_from_ansible_module_params_with_authorizedkeys( + mock_set_set_authorizedkeys, mock_set_password, sample_config_path +): + test_params: dict = { + "username": "vagrant", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + "authorizedkeys": "test_authorizedkey", + } + + new_test_user: User = User.from_ansible_module_params(test_params) + + assert new_test_user.name == "vagrant" + assert new_test_user.password == "vagrant" + assert new_test_user.scope == "user" + assert new_test_user.descr == "vagrant box management" + assert new_test_user.expires is None + assert new_test_user.authorizedkeys == "3J35EY37QTNXFFEECJGZ32WVYQC5W4GZ" + assert new_test_user.ipsecpsk is None + assert new_test_user.shell == UserLoginShell.SH + assert new_test_user.uid == "1000" + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_single_group_removal( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "vagrant", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + } + + with UserSet(sample_config_path) as user_set: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + assert user_set.changed + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + all_groups = new_user_set._load_groups() + + admin_group = all_groups[0] + + assert "1000" not in admin_group.member + + new_user_set.save() + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.version_utils.get_opnsense_version", + return_value="OPNsense Test", +) +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.system_access_users_utils.UserSet.set_user_password", + return_value="$2y$10$1BvUdvwM.a.dJACwfeNfAOgNT6Cqc4cKZ2F6byyvY8hIK9I8fn36O", +) +@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True) +def test_user_from_ansible_module_params_multiple_group_removal( + mock_set_password, mock_get_version, sample_config_path +): + test_params = { + "username": "vagrant", + "password": "vagrant", + "scope": "user", + "full_name": "vagrant box management", + "shell": "/bin/sh", + "uid": "1000", + } + + with UserSet(sample_config_path) as user_set: + test_user = User.from_ansible_module_params(test_params) + + user_set.add_or_update(test_user) + + assert user_set.changed + user_set.save() + + with UserSet(sample_config_path) as new_user_set: + all_groups = new_user_set._load_groups() + + admin_group = all_groups[0] + test_group = all_groups[1] + + assert "1000" not in admin_group.member + assert "1000" not in test_group.member + + new_user_set.save() + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.opnsense_utils.run_function" +) +def test_generate_hashed_secret_success(mock_run_function): + mock_run_function.return_value = { + "stdout": "$6$somerandomsalt$hashedsecretvalue1234567890123456789012345678901234567890123456789054583", + "stderr": None, + } + + user = User(name="test", password="test") + result = user._generate_hashed_secret("password123") + assert ( + result + == "$6$somerandomsalt$hashedsecretvalue1234567890123456789012345678901234567890123456789054583" + ) + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.opnsense_utils.run_function" +) +def test_generate_hashed_secret_failure_invalid_hash(mock_run_function): + mock_run_function.return_value = { + "stdout": "$5$somerandomsalt$shortvalue", + "stderr": None, + } + + user = User(name="test", password="test") + with pytest.raises(OPNSenseCryptReturnError) as excinfo: + user._generate_hashed_secret("password123") + + assert "validation of the secret failed!" in str(excinfo.value) + + +@patch( + "ansible_collections.puzzle.opnsense.plugins.module_utils.opnsense_utils.run_function" +) +def test_generate_hashed_secret_error_in_crypt(mock_run_function): + mock_run_function.return_value = {"stdout": "", "stderr": "error in crypt function"} + + user = User(name="test", password="test") + with pytest.raises(OPNSenseCryptReturnError) as excinfo: + user._generate_hashed_secret("password123") + + assert "error encounterd while creating secret" in str(excinfo.value)