diff --git a/tests/test_auth.py b/tests/test_auth.py index e9cd21f9..659dbc57 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,14 +1,29 @@ from __future__ import annotations from pathlib import Path +from typing import TYPE_CHECKING +from typing import Optional +import pytest +from inline_snapshot import snapshot +from packaging.version import Version +from pydantic import SecretStr +from zabbix_cli import auth from zabbix_cli._v2_compat import AUTH_FILE as AUTH_FILE_LEGACY from zabbix_cli._v2_compat import AUTH_TOKEN_FILE as AUTH_TOKEN_FILE_LEGACY +from zabbix_cli.auth import Authenticator +from zabbix_cli.auth import Credentials +from zabbix_cli.auth import CredentialsSource +from zabbix_cli.auth import CredentialsType from zabbix_cli.auth import get_auth_file_paths from zabbix_cli.auth import get_auth_token_file_paths from zabbix_cli.config.constants import AUTH_FILE from zabbix_cli.config.constants import AUTH_TOKEN_FILE from zabbix_cli.config.model import Config +from zabbix_cli.pyzabbix.client import ZabbixAPI + +if TYPE_CHECKING: + from zabbix_cli.models import TableRenderable def test_get_auth_file_paths_defafult(config: Config) -> None: @@ -51,3 +66,229 @@ def test_get_auth_token_file_paths_override(tmp_path: Path, config: Config) -> N AUTH_TOKEN_FILE, AUTH_TOKEN_FILE_LEGACY, ] + + +@pytest.fixture +def table_renderable_mock(monkeypatch) -> type[TableRenderable]: + """Replace TableRenderable class in zabbix_cli.models with mock class + so that tests can mutate it without affecting other tests.""" + from zabbix_cli.models import TableRenderable + + class MockTableRenderable(TableRenderable): + pass + + # Set a default version that is different from the real class + # so that we can tell if the mock class was used + MockTableRenderable.zabbix_version = Version("0.0.1") + + # Use monkeypatch to temporarily replace the real class with the mock class + monkeypatch.setattr("zabbix_cli.models.TableRenderable", MockTableRenderable) + + return MockTableRenderable + + +@pytest.fixture(name="auth_token_file") +def _auth_token_file(tmp_path: Path) -> Path: + return tmp_path / ".zabbix-cli_auth_token" + + +@pytest.fixture(name="auth_file") +def _auth_file(tmp_path: Path) -> Path: + return tmp_path / ".zabbix-cli_auth" + + +@pytest.mark.parametrize( + "sources, expect_type, expect_source", + [ + pytest.param( + [ + (CredentialsType.AUTH_TOKEN, CredentialsSource.CONFIG), + (CredentialsType.AUTH_TOKEN, CredentialsSource.ENV), + (CredentialsType.AUTH_TOKEN, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.CONFIG), + (CredentialsType.PASSWORD, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.AUTH_TOKEN, + CredentialsSource.CONFIG, + id="expect_auth_token_config", + ), + pytest.param( + [ + (CredentialsType.AUTH_TOKEN, CredentialsSource.ENV), + (CredentialsType.AUTH_TOKEN, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.CONFIG), + (CredentialsType.PASSWORD, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.AUTH_TOKEN, + CredentialsSource.ENV, + id="expect_auth_token_env", + ), + pytest.param( + [ + (CredentialsType.AUTH_TOKEN, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.CONFIG), + (CredentialsType.PASSWORD, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.AUTH_TOKEN, + CredentialsSource.FILE, + id="expect_auth_token_file", + ), + pytest.param( + [ + (CredentialsType.PASSWORD, CredentialsSource.CONFIG), + (CredentialsType.PASSWORD, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.PASSWORD, + CredentialsSource.CONFIG, + id="expect_password_config", + ), + pytest.param( + [ + (CredentialsType.PASSWORD, CredentialsSource.FILE), + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.PASSWORD, + CredentialsSource.FILE, + id="expect_password_file", + ), + pytest.param( + [ + (CredentialsType.PASSWORD, CredentialsSource.ENV), + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.PASSWORD, + CredentialsSource.ENV, + id="expect_password_env", + ), + pytest.param( + [ + (CredentialsType.PASSWORD, CredentialsSource.PROMPT), + ], + CredentialsType.PASSWORD, + CredentialsSource.PROMPT, + id="expect_password_prompt", + ), + ], +) +def test_authenticator_login_with_any( + monkeypatch: pytest.MonkeyPatch, + table_renderable_mock: type[TableRenderable], + auth_token_file: Path, + auth_file: Path, + config: Config, + sources: list[tuple[CredentialsType, CredentialsSource]], + expect_source: CredentialsSource, + expect_type: CredentialsType, +) -> None: + """Test the authenticator login with a variety of credential sources.""" + # TODO: test with other variations of sources and types + authenticator = Authenticator(config) + + MOCK_USER = "Admin" + MOCK_PASSWORD = "zabbix" + MOCK_TOKEN = "abc1234567890" + + # Mock certain methods that are difficult to test + # States reasons for mocking each method + + # REASON: Makes HTTP calls to the Zabbix API + def mock_login(self: ZabbixAPI, *args, **kwargs): + self.auth = MOCK_TOKEN + return self.auth + + monkeypatch.setattr(auth.ZabbixAPI, "login", mock_login) + + # REASON: Makes HTTP calls to the Zabbix API + monkeypatch.setattr(auth.ZabbixAPI, "version", Version("1.2.3")) + + # REASON: Prompts for input (could also be tested with a fake input stream) + def mock_get_username_password_prompt() -> Credentials: + return Credentials( + username=MOCK_USER, + password=MOCK_PASSWORD, + source=CredentialsSource.PROMPT, + ) + + monkeypatch.setattr( + authenticator, + "_get_username_password_prompt", + mock_get_username_password_prompt, + ) + + # REASON: Falls back on default auth token file path (which might exist on test user's system) + def mock_get_auth_token_file_paths(config: Optional[Config] = None) -> list[Path]: + return [auth_token_file] + + monkeypatch.setattr( + auth, "get_auth_token_file_paths", mock_get_auth_token_file_paths + ) + + # REASON: Falls back on default auth file path (which might exist on test user's system) + def mock_get_auth_file_paths(config: Optional[Config] = None) -> list[Path]: + return [auth_file] + + monkeypatch.setattr(auth, "get_auth_file_paths", mock_get_auth_file_paths) + + # Set the credentials in the various sources + for ctype, csource in sources: + if csource == CredentialsSource.CONFIG: + if ctype == CredentialsType.AUTH_TOKEN: + config.api.auth_token = SecretStr(MOCK_TOKEN) + elif ctype == CredentialsType.PASSWORD: + config.api.username = MOCK_USER + config.api.password = SecretStr(MOCK_PASSWORD) + elif csource == CredentialsSource.ENV: + if ctype == CredentialsType.AUTH_TOKEN: + monkeypatch.setenv("ZABBIX_API_TOKEN", MOCK_TOKEN) + elif ctype == CredentialsType.PASSWORD: + monkeypatch.setenv("ZABBIX_USERNAME", MOCK_USER) + monkeypatch.setenv("ZABBIX_PASSWORD", MOCK_PASSWORD) + elif csource == CredentialsSource.FILE: + if ctype == CredentialsType.AUTH_TOKEN: + auth_token_file.write_text(f"{MOCK_USER}::{MOCK_TOKEN}") + config.app.auth_token_file = auth_token_file + config.app.use_auth_token_file = True + config.app.allow_insecure_auth_file = True + elif ctype == CredentialsType.PASSWORD: + auth_file.write_text(f"{MOCK_USER}::{MOCK_PASSWORD}") + config.app.auth_file = auth_file + + client, info = authenticator.login_with_any() + assert info.credentials.source == expect_source + assert info.credentials.type == expect_type + assert info.token == MOCK_TOKEN + + # Ensure the login method modified the base renderable's zabbix version attribute + # with the one we got from the mocked API response + + # Check the mocked version we replace the real class with + assert table_renderable_mock.zabbix_version == Version("1.2.3") + + # Try to import the real class and check that our mock changes were applied + from zabbix_cli.models import TableRenderable + + assert TableRenderable.zabbix_version == Version("1.2.3") + + +def test_table_renderable_mock_reverted() -> None: + """Attempt to ensure that the TableRenderable has been unchanged after + running tests that mutate it. + + If those tests have used the mock class correctly, this test should pass. + + TODO: Ensure this test is run _after_ the tests that mutate the class. + """ + from zabbix_cli.models import TableRenderable + + # Ensure the mock changes were reverted + assert TableRenderable.zabbix_version != Version("1.2.3") + assert TableRenderable.zabbix_version.release == snapshot((7, 0, 0)) diff --git a/zabbix_cli/auth.py b/zabbix_cli/auth.py index ebc69efe..109c3a57 100644 --- a/zabbix_cli/auth.py +++ b/zabbix_cli/auth.py @@ -18,6 +18,7 @@ from pathlib import Path from typing import TYPE_CHECKING from typing import Final +from typing import Generator from typing import List from typing import NamedTuple from typing import Optional @@ -56,7 +57,7 @@ SECURE_PERMISSIONS_STR = format(SECURE_PERMISSIONS, "o") -class LoginCredentialType(StrEnum): +class CredentialsType(StrEnum): """Types of valid login credentials.""" PASSWORD = "username and password" @@ -70,11 +71,7 @@ class CredentialsSource(StrEnum): FILE = "file" PROMPT = "prompt" CONFIG = "config" - - -class LoginInfo(NamedTuple): - credentials: Credentials - token: str + LOGIN_COMMAND = "login command" class Credentials(NamedTuple): @@ -86,11 +83,11 @@ class Credentials(NamedTuple): auth_token: Optional[str] = None @property - def type(self) -> Optional[LoginCredentialType]: + def type(self) -> Optional[CredentialsType]: if self.auth_token: - return LoginCredentialType.AUTH_TOKEN + return CredentialsType.AUTH_TOKEN if self.username and self.password: - return LoginCredentialType.PASSWORD + return CredentialsType.PASSWORD return None def is_valid(self) -> bool: @@ -98,6 +95,11 @@ def is_valid(self) -> bool: return self.type is not None +class LoginInfo(NamedTuple): + credentials: Credentials + token: str + + class Authenticator: """Encapsulates logic for authenticating with the Zabbix API using various methods, as well as storing and loading auth tokens.""" @@ -106,13 +108,16 @@ class Authenticator: def __init__(self, config: Config) -> None: self.config = config + # Ensure we have a Zabbix API URL before instantiating client + self.config.api.url = self.get_zabbix_url() + self.client = ZabbixAPI.from_config(self.config) @cached_property def screen(self) -> ScreenContext: return err_console.screen() - def login(self) -> ZabbixAPI: - """Log in to the Zabbix API using the configured credentials. + def login_with_any(self) -> Tuple[ZabbixAPI, LoginInfo]: + """Log in to the Zabbix API using any valid method. Returns the Zabbix API client object. @@ -126,30 +131,25 @@ def login(self) -> ZabbixAPI: 6. Username and password in environment variables 7. Username and password from prompt """ - # Ensure we have a Zabbix API URL - self.config.api.url = self.get_zabbix_url() - client = ZabbixAPI.from_config(self.config) - info = self._do_login(client) - if info.credentials.username and self.config.app.use_auth_token_file: - write_auth_token_file( - info.credentials.username, info.token, self.config.app.auth_token_file - ) - if info.credentials.username: - add_user(info.credentials.username) + for credentials in self._iter_all_credentials(): + if not credentials.is_valid(): + logger.debug("No valid credentials found with %s", credentials) + continue + info = self.login_with_credentials(credentials) + if info: + return self.client, info + raise AuthError( + f"No authentication method succeeded for {self.config.api.url}. Check the logs for more information." + ) - if info.credentials.type == LoginCredentialType.AUTH_TOKEN: - logger.info("Logged in using auth token from %s", info.credentials.source) - else: - logger.info( - "Logged in as %s using username and password from %s", - info.credentials.username, - info.credentials.source, - ) - self._update_config(info.credentials) - return client + def _iter_all_credentials( + self, prompt_password: bool = True + ) -> Generator[Credentials, None, None]: + """Generator that yields credentials from all possible sources. - def _do_login(self, client: ZabbixAPI) -> LoginInfo: + Finally yields a prompt for username and password if `prompt_password` is True. + """ for func in [ self._get_auth_token_config, self._get_auth_token_env, @@ -157,42 +157,128 @@ def _do_login(self, client: ZabbixAPI) -> LoginInfo: self._get_username_password_config, self._get_username_password_auth_file, self._get_username_password_env, - self._get_username_password_prompt, ]: - try: - credentials = func() - if not credentials.is_valid(): - logger.debug("No valid credentials found with %s", func.__name__) - continue - logger.debug( - "Attempting to log in with %s from %s", - credentials.type, - credentials.source, - ) - token = self.login_with_credentials(client, credentials) - return LoginInfo(credentials, token) - except ZabbixAPIException as e: - logger.warning("Failed to log in with %s: %s", func.__name__, e) - continue - except Exception as e: - logger.error( - "Unexpected error logging in with %s: %s", func.__name__, e - ) - continue - else: - raise AuthError( - f"No authentication method succeeded for {self.config.api.url}. Check the logs for more information." - ) + yield func() + if prompt_password: + yield self._get_username_password_prompt() + + @classmethod + def login_with_prompt(cls, config: Config) -> Tuple[ZabbixAPI, LoginInfo]: + """Log in to the Zabbix API using username and password from a prompt.""" + auth = cls(config) + creds = auth._get_username_password_prompt() + info = auth.login_with_credentials(creds) + creds._replace(source=CredentialsSource.LOGIN_COMMAND) + if not info: + raise AuthError("Failed to log in with username and password.") + return auth.client, info + + @classmethod + def login_with_username_password( + cls, config: Config, username: str, password: str + ) -> Tuple[ZabbixAPI, LoginInfo]: + """Log in to the Zabbix API using username and password from a prompt.""" + auth = cls(config) + creds = Credentials( + username=username, + password=password, + source=CredentialsSource.LOGIN_COMMAND, + ) + info = auth.login_with_credentials(creds) + if not info: + raise AuthError("Failed to log in with username and password.") + return auth.client, info + + @classmethod + def login_with_token( + cls, config: Config, token: str + ) -> Tuple[ZabbixAPI, LoginInfo]: + """Log in to the Zabbix API using username and password from a prompt.""" + auth = cls(config) + creds = Credentials( + auth_token=token, + source=CredentialsSource.LOGIN_COMMAND, + ) + info = auth.login_with_credentials(creds) + if not info: + raise AuthError("Failed to log in with auth token.") + return auth.client, info + + def login_with_credentials(self, credentials: Credentials) -> LoginInfo | None: + """Log in to the Zabbix API using the provided credentials. + + Cannot fail; logs errors and returns None if unsuccessful. - def login_with_credentials( - self, client: ZabbixAPI, credentials: Credentials - ) -> str: - """Log in to the Zabbix API using the provided credentials.""" - return client.login( + Args: + credentials (Credentials): Credentials to use for logging in. + + Returns: + LoginInfo | None: Login information if successful, None if not. + """ + try: + logger.debug( + "Attempting to log in with %s from %s", + credentials.type, + credentials.source, + ) + return self._do_login_with_credentials(credentials) + except ZabbixAPIException as e: + logger.warning("Failed to log in with %s: %s", credentials, e) + return + except Exception as e: + logger.error("Unexpected error logging in with %s: %s", credentials, e) + return + + def _do_login_with_credentials(self, credentials: Credentials) -> LoginInfo: + """Login to Zabbix API, and update application state if successful.""" + token = self.client.login( user=credentials.username, password=credentials.password, auth_token=credentials.auth_token, ) + info = LoginInfo(credentials, token) + + if info.credentials.type == CredentialsType.AUTH_TOKEN: + logger.info("Logged in using auth token from %s", info.credentials.source) + else: + logger.info( + "Logged in as %s using username and password from %s", + info.credentials.username, + info.credentials.source, + ) + + self._update_application_with_login_info(info) + return info + + def _update_application_with_login_info(self, info: LoginInfo) -> None: + """Update the application state with the login information. + + Includes the following: + - Write auth token file if configured + - Add username to logs + - Update config with credentials + - Set Zabbix API version on the TableRenderable base class + """ + from zabbix_cli.models import TableRenderable + + # Write auth token file + if info.credentials.username and self.config.app.use_auth_token_file: + write_auth_token_file( + info.credentials.username, info.token, self.config.app.auth_token_file + ) + + # Log context + if info.credentials.username: + add_user(info.credentials.username) + else: + logger.debug("No username detected, adding to logs") + add_user("") + + # Update config with the new credentials + self._update_config(info.credentials) + + # Set the Zabbix API version on the TableRenderable base class + TableRenderable.zabbix_version = self.client.version def _update_config(self, credentials: Credentials) -> None: """Update the config with credentials from the successful login.""" @@ -353,7 +439,7 @@ def login(config: Config) -> ZabbixAPI: Returns the Zabbix API client object. """ auth = Authenticator(config) - client = auth.login() + client, _ = auth.login_with_any() return client diff --git a/zabbix_cli/state.py b/zabbix_cli/state.py index c924b90a..255ca4a0 100644 --- a/zabbix_cli/state.py +++ b/zabbix_cli/state.py @@ -215,13 +215,12 @@ def login(self) -> None: Uses the authentication info from the config to log into the Zabbix API. - Also sets the Zabbix API version on the ZabbixAPIBaseModel class, - so that each model is aware of which version its data is from.""" + Also sets the JSON rendering mode on the TableRenderable base class + used to render application output.""" from zabbix_cli import auth from zabbix_cli.models import TableRenderable self.client = auth.login(self.config) - TableRenderable.zabbix_version = self.client.version TableRenderable.legacy_json_format = self.config.app.legacy_json_format def logout(self) -> None: