From c02b8ef760409bf528ffa8a8a815470b71443728 Mon Sep 17 00:00:00 2001 From: gruebel Date: Sat, 20 Jul 2024 21:35:20 +0200 Subject: [PATCH] change path constants from str to Path type --- policy_sentry/command/initialize.py | 24 +++++++++++------------ policy_sentry/command/query.py | 9 ++++----- policy_sentry/shared/awsdocs.py | 8 +++----- policy_sentry/shared/constants.py | 30 ++++++++++++----------------- policy_sentry/shared/iam_data.py | 3 +-- 5 files changed, 31 insertions(+), 43 deletions(-) diff --git a/policy_sentry/command/initialize.py b/policy_sentry/command/initialize.py index 828962d5..032022a4 100755 --- a/policy_sentry/command/initialize.py +++ b/policy_sentry/command/initialize.py @@ -8,6 +8,7 @@ import logging import os import shutil +from pathlib import Path import click @@ -96,7 +97,7 @@ def initialize( if not access_level_overrides_file: overrides_file = LOCAL_ACCESS_OVERRIDES_FILE else: - overrides_file = access_level_overrides_file + overrides_file = Path(access_level_overrides_file) # Create the config directory database_path = create_policy_sentry_config_directory() @@ -107,15 +108,15 @@ def initialize( # provided by AWS documentation file_list = [ f - for f in os.listdir(BUNDLED_DATA_DIRECTORY) - if os.path.isfile(os.path.join(BUNDLED_DATA_DIRECTORY, f)) + for f in BUNDLED_DATA_DIRECTORY.iterdir() + if (BUNDLED_DATA_DIRECTORY / f).is_file() ] for file in file_list: - if file.endswith(".yml"): - shutil.copy(os.path.join(BUNDLED_DATA_DIRECTORY, file), CONFIG_DIRECTORY) + if file.suffix == ".yml": + shutil.copy(BUNDLED_DATA_DIRECTORY / file, CONFIG_DIRECTORY) logger.debug("copying overrides file %s to %s", file, CONFIG_DIRECTORY) - print("Database will be stored here: " + database_path) + print(f"Database will be stored here: {database_path}") if not build and not fetch: # copy from the bundled database location to the destination path @@ -140,7 +141,7 @@ def initialize( logger.debug(", ".join(all_aws_service_prefixes)) -def create_policy_sentry_config_directory() -> str: +def create_policy_sentry_config_directory() -> Path: """ Creates a config directory at $HOME/.policy_sentry/ :return: the path of the database file @@ -148,16 +149,13 @@ def create_policy_sentry_config_directory() -> str: print("Creating the database...") logger.debug(f"We will store the new database here: {DATASTORE_FILE_PATH}") # If the database file already exists, remove it - if os.path.exists(LOCAL_DATASTORE_FILE_PATH): + if LOCAL_DATASTORE_FILE_PATH.exists(): logger.debug( f"The database at {DATASTORE_FILE_PATH} already exists. Removing and replacing it." ) - os.remove(LOCAL_DATASTORE_FILE_PATH) - elif os.path.exists(CONFIG_DIRECTORY): - pass - # If the config directory does not exist + LOCAL_DATASTORE_FILE_PATH.unlink() else: - os.mkdir(CONFIG_DIRECTORY) + CONFIG_DIRECTORY.mkdir(exist_ok=True) return LOCAL_DATASTORE_FILE_PATH diff --git a/policy_sentry/command/query.py b/policy_sentry/command/query.py index 521dc8a4..c5f65f55 100644 --- a/policy_sentry/command/query.py +++ b/policy_sentry/command/query.py @@ -6,7 +6,6 @@ import json import logging -import os from typing import Any import click @@ -144,7 +143,7 @@ def query_action_table( ) -> list[str] | dict[str, list[dict[str, Any]]]: """Query the Action Table from the Policy Sentry database. Use this one when leveraging Policy Sentry as a library.""" - if os.path.exists(LOCAL_DATASTORE_FILE_PATH): + if LOCAL_DATASTORE_FILE_PATH.exists(): logger.info( f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/" ) @@ -267,7 +266,7 @@ def query_arn_table( name: str, service: str, list_arn_types: bool, fmt: str ) -> list[str] | dict[str, str]: """Query the ARN Table from the Policy Sentry database. Use this one when leveraging Policy Sentry as a library.""" - if os.path.exists(LOCAL_DATASTORE_FILE_PATH): + if LOCAL_DATASTORE_FILE_PATH.exists(): logger.info( f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/" ) @@ -329,7 +328,7 @@ def query_condition_table( ) -> list[str] | dict[str, str]: """Query the condition table from the Policy Sentry database. Use this one when leveraging Policy Sentry as a library.""" - if os.path.exists(LOCAL_DATASTORE_FILE_PATH): + if LOCAL_DATASTORE_FILE_PATH.exists(): logger.info( f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/" ) @@ -373,7 +372,7 @@ def service_table(fmt: str, verbose: str | None) -> None: def query_service_table(fmt: str = "json") -> list[dict[str, str]]: """Query the service table from the Policy Sentry database. Use this one when leveraging Policy Sentry as a library.""" - if os.path.exists(LOCAL_DATASTORE_FILE_PATH): + if LOCAL_DATASTORE_FILE_PATH.exists(): logger.info( f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/" ) diff --git a/policy_sentry/shared/awsdocs.py b/policy_sentry/shared/awsdocs.py index d0e7f882..9de3e608 100644 --- a/policy_sentry/shared/awsdocs.py +++ b/policy_sentry/shared/awsdocs.py @@ -77,7 +77,7 @@ def get_action_access_level_overrides_from_yml( return None -def update_html_docs_directory(html_docs_destination: str) -> None: +def update_html_docs_directory(html_docs_destination: Path) -> None: """ Updates the HTML docs from remote location to either: (1) local directory (i.e., this repository, or @@ -131,9 +131,7 @@ def update_html_docs_directory(html_docs_destination: str) -> None: logger.warning(a_e) logger.warning(script) - with open( - os.path.join(html_docs_destination, page), "w", encoding="utf-8" - ) as file: + with open(html_docs_destination / page, "w", encoding="utf-8") as file: # file.write(str(soup.html)) file.write(str(soup.prettify())) file.close() @@ -158,7 +156,7 @@ def sanitize_service_name(action: str) -> str: def create_database( - destination_directory: str, access_level_overrides_file: str + destination_directory: str | Path, access_level_overrides_file: Path ) -> None: """ Create the JSON Data source that holds the IAM data. diff --git a/policy_sentry/shared/constants.py b/policy_sentry/shared/constants.py index d29bcf3f..57158a9d 100644 --- a/policy_sentry/shared/constants.py +++ b/policy_sentry/shared/constants.py @@ -9,26 +9,24 @@ logger = logging.getLogger() # General Folders -HOME = str(Path.home()) -CONFIG_DIRECTORY = os.path.join(HOME, ".policy_sentry") +HOME = Path.home() +CONFIG_DIRECTORY = HOME / ".policy_sentry" # HTML Docs -BUNDLED_HTML_DIRECTORY_PATH = os.path.join( - str(Path(os.path.dirname(__file__))), "data", "docs" -) -BUNDLED_DATA_DIRECTORY = os.path.join(str(Path(os.path.dirname(__file__))), "data") +BUNDLED_HTML_DIRECTORY_PATH = Path(__file__).parent / "data/docs" +BUNDLED_DATA_DIRECTORY = Path(__file__).parent / "data" -LOCAL_HTML_DIRECTORY_PATH = os.path.join(CONFIG_DIRECTORY, "data", "docs") +LOCAL_HTML_DIRECTORY_PATH = CONFIG_DIRECTORY / "data/docs" BASE_DOCUMENTATION_URL = "https://docs.aws.amazon.com/service-authorization/latest/reference/reference_policies_actions-resources-contextkeys.html" # BASE_DOCUMENTATION_URL = "https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_actions-resources-contextkeys.html" # Data json file # On initialization, load the IAM data -BUNDLED_DATASTORE_FILE_PATH = str(Path(__file__).parent / "data/iam-definition.json") -LOCAL_DATASTORE_FILE_PATH = str(Path(CONFIG_DIRECTORY) / "iam-definition.json") +BUNDLED_DATASTORE_FILE_PATH = Path(__file__).parent / "data/iam-definition.json" +LOCAL_DATASTORE_FILE_PATH = CONFIG_DIRECTORY / "iam-definition.json" # Check for the existence of the local datastore first. -if os.path.exists(LOCAL_DATASTORE_FILE_PATH): +if LOCAL_DATASTORE_FILE_PATH.exists(): # If it exists, leverage that datastore instead of the one bundled with the python package logger.info( f"Leveraging the local IAM definition at the path: {LOCAL_DATASTORE_FILE_PATH} " @@ -43,18 +41,14 @@ # Overrides if "CUSTOM_ACCESS_OVERRIDES_FILE" in os.environ: CUSTOM_ACCESS_OVERRIDES_FILE = os.environ["CUSTOM_ACCESS_OVERRIDES_FILE"] - BUNDLED_ACCESS_OVERRIDES_FILE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), CUSTOM_ACCESS_OVERRIDES_FILE - ) + BUNDLED_ACCESS_OVERRIDES_FILE = Path(__file__).parent / CUSTOM_ACCESS_OVERRIDES_FILE else: - BUNDLED_ACCESS_OVERRIDES_FILE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), "data", "access-level-overrides.yml" + BUNDLED_ACCESS_OVERRIDES_FILE = ( + Path(__file__).parent / "data/access-level-overrides.yml" ) -LOCAL_ACCESS_OVERRIDES_FILE = os.path.join( - CONFIG_DIRECTORY, "access-level-overrides.yml" -) +LOCAL_ACCESS_OVERRIDES_FILE = CONFIG_DIRECTORY / "access-level-overrides.yml" # Policy constants # https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html diff --git a/policy_sentry/shared/iam_data.py b/policy_sentry/shared/iam_data.py index 2dbaed70..fc9e4a30 100644 --- a/policy_sentry/shared/iam_data.py +++ b/policy_sentry/shared/iam_data.py @@ -5,7 +5,6 @@ import functools import gc import logging -from pathlib import Path from typing import Any, cast import orjson @@ -27,7 +26,7 @@ def load_iam_definition() -> dict[str, Any]: # https://github.com/msgpack/msgpack-python?tab=readme-ov-file#performance-tips gc.disable() - data: dict[str, Any] = orjson.loads(Path(iam_definition_path).read_bytes()) + data: dict[str, Any] = orjson.loads(iam_definition_path.read_bytes()) if gc_enabled: gc.enable()