Skip to content

Commit

Permalink
change path constants from str to Path type
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel committed Jul 20, 2024
1 parent 208d420 commit c02b8ef
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 43 deletions.
24 changes: 11 additions & 13 deletions policy_sentry/command/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import shutil
from pathlib import Path

import click

Expand Down Expand Up @@ -96,7 +97,7 @@ def initialize(
if not access_level_overrides_file:
overrides_file = LOCAL_ACCESS_OVERRIDES_FILE
else:
overrides_file = access_level_overrides_file
overrides_file = Path(access_level_overrides_file)
# Create the config directory
database_path = create_policy_sentry_config_directory()

Expand All @@ -107,15 +108,15 @@ def initialize(
# provided by AWS documentation
file_list = [
f
for f in os.listdir(BUNDLED_DATA_DIRECTORY)
if os.path.isfile(os.path.join(BUNDLED_DATA_DIRECTORY, f))
for f in BUNDLED_DATA_DIRECTORY.iterdir()
if (BUNDLED_DATA_DIRECTORY / f).is_file()
]

for file in file_list:
if file.endswith(".yml"):
shutil.copy(os.path.join(BUNDLED_DATA_DIRECTORY, file), CONFIG_DIRECTORY)
if file.suffix == ".yml":
shutil.copy(BUNDLED_DATA_DIRECTORY / file, CONFIG_DIRECTORY)
logger.debug("copying overrides file %s to %s", file, CONFIG_DIRECTORY)
print("Database will be stored here: " + database_path)
print(f"Database will be stored here: {database_path}")

if not build and not fetch:
# copy from the bundled database location to the destination path
Expand All @@ -140,24 +141,21 @@ def initialize(
logger.debug(", ".join(all_aws_service_prefixes))


def create_policy_sentry_config_directory() -> str:
def create_policy_sentry_config_directory() -> Path:
"""
Creates a config directory at $HOME/.policy_sentry/
:return: the path of the database file
"""
print("Creating the database...")
logger.debug(f"We will store the new database here: {DATASTORE_FILE_PATH}")
# If the database file already exists, remove it
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
logger.debug(
f"The database at {DATASTORE_FILE_PATH} already exists. Removing and replacing it."
)
os.remove(LOCAL_DATASTORE_FILE_PATH)
elif os.path.exists(CONFIG_DIRECTORY):
pass
# If the config directory does not exist
LOCAL_DATASTORE_FILE_PATH.unlink()
else:
os.mkdir(CONFIG_DIRECTORY)
CONFIG_DIRECTORY.mkdir(exist_ok=True)
return LOCAL_DATASTORE_FILE_PATH


Expand Down
9 changes: 4 additions & 5 deletions policy_sentry/command/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import json
import logging
import os
from typing import Any

import click
Expand Down Expand Up @@ -144,7 +143,7 @@ def query_action_table(
) -> list[str] | dict[str, list[dict[str, Any]]]:
"""Query the Action Table from the Policy Sentry database.
Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
Expand Down Expand Up @@ -267,7 +266,7 @@ def query_arn_table(
name: str, service: str, list_arn_types: bool, fmt: str
) -> list[str] | dict[str, str]:
"""Query the ARN Table from the Policy Sentry database. Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
Expand Down Expand Up @@ -329,7 +328,7 @@ def query_condition_table(
) -> list[str] | dict[str, str]:
"""Query the condition table from the Policy Sentry database.
Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
Expand Down Expand Up @@ -373,7 +372,7 @@ def service_table(fmt: str, verbose: str | None) -> None:
def query_service_table(fmt: str = "json") -> list[dict[str, str]]:
"""Query the service table from the Policy Sentry database.
Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
Expand Down
8 changes: 3 additions & 5 deletions policy_sentry/shared/awsdocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_action_access_level_overrides_from_yml(
return None


def update_html_docs_directory(html_docs_destination: str) -> None:
def update_html_docs_directory(html_docs_destination: Path) -> None:
"""
Updates the HTML docs from remote location to either:
(1) local directory (i.e., this repository, or
Expand Down Expand Up @@ -131,9 +131,7 @@ def update_html_docs_directory(html_docs_destination: str) -> None:
logger.warning(a_e)
logger.warning(script)

with open(
os.path.join(html_docs_destination, page), "w", encoding="utf-8"
) as file:
with open(html_docs_destination / page, "w", encoding="utf-8") as file:
# file.write(str(soup.html))
file.write(str(soup.prettify()))
file.close()
Expand All @@ -158,7 +156,7 @@ def sanitize_service_name(action: str) -> str:


def create_database(
destination_directory: str, access_level_overrides_file: str
destination_directory: str | Path, access_level_overrides_file: Path
) -> None:
"""
Create the JSON Data source that holds the IAM data.
Expand Down
30 changes: 12 additions & 18 deletions policy_sentry/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,24 @@
logger = logging.getLogger()

# General Folders
HOME = str(Path.home())
CONFIG_DIRECTORY = os.path.join(HOME, ".policy_sentry")
HOME = Path.home()
CONFIG_DIRECTORY = HOME / ".policy_sentry"

# HTML Docs
BUNDLED_HTML_DIRECTORY_PATH = os.path.join(
str(Path(os.path.dirname(__file__))), "data", "docs"
)
BUNDLED_DATA_DIRECTORY = os.path.join(str(Path(os.path.dirname(__file__))), "data")
BUNDLED_HTML_DIRECTORY_PATH = Path(__file__).parent / "data/docs"
BUNDLED_DATA_DIRECTORY = Path(__file__).parent / "data"

LOCAL_HTML_DIRECTORY_PATH = os.path.join(CONFIG_DIRECTORY, "data", "docs")
LOCAL_HTML_DIRECTORY_PATH = CONFIG_DIRECTORY / "data/docs"

BASE_DOCUMENTATION_URL = "https://docs.aws.amazon.com/service-authorization/latest/reference/reference_policies_actions-resources-contextkeys.html"
# BASE_DOCUMENTATION_URL = "https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_actions-resources-contextkeys.html"

# Data json file
# On initialization, load the IAM data
BUNDLED_DATASTORE_FILE_PATH = str(Path(__file__).parent / "data/iam-definition.json")
LOCAL_DATASTORE_FILE_PATH = str(Path(CONFIG_DIRECTORY) / "iam-definition.json")
BUNDLED_DATASTORE_FILE_PATH = Path(__file__).parent / "data/iam-definition.json"
LOCAL_DATASTORE_FILE_PATH = CONFIG_DIRECTORY / "iam-definition.json"
# Check for the existence of the local datastore first.
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
if LOCAL_DATASTORE_FILE_PATH.exists():
# If it exists, leverage that datastore instead of the one bundled with the python package
logger.info(
f"Leveraging the local IAM definition at the path: {LOCAL_DATASTORE_FILE_PATH} "
Expand All @@ -43,18 +41,14 @@
# Overrides
if "CUSTOM_ACCESS_OVERRIDES_FILE" in os.environ:
CUSTOM_ACCESS_OVERRIDES_FILE = os.environ["CUSTOM_ACCESS_OVERRIDES_FILE"]
BUNDLED_ACCESS_OVERRIDES_FILE = os.path.join(
os.path.abspath(os.path.dirname(__file__)), CUSTOM_ACCESS_OVERRIDES_FILE
)
BUNDLED_ACCESS_OVERRIDES_FILE = Path(__file__).parent / CUSTOM_ACCESS_OVERRIDES_FILE

else:
BUNDLED_ACCESS_OVERRIDES_FILE = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "data", "access-level-overrides.yml"
BUNDLED_ACCESS_OVERRIDES_FILE = (
Path(__file__).parent / "data/access-level-overrides.yml"
)

LOCAL_ACCESS_OVERRIDES_FILE = os.path.join(
CONFIG_DIRECTORY, "access-level-overrides.yml"
)
LOCAL_ACCESS_OVERRIDES_FILE = CONFIG_DIRECTORY / "access-level-overrides.yml"

# Policy constants
# https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html
Expand Down
3 changes: 1 addition & 2 deletions policy_sentry/shared/iam_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import functools
import gc
import logging
from pathlib import Path
from typing import Any, cast

import orjson
Expand All @@ -27,7 +26,7 @@ def load_iam_definition() -> dict[str, Any]:
# https://github.com/msgpack/msgpack-python?tab=readme-ov-file#performance-tips
gc.disable()

data: dict[str, Any] = orjson.loads(Path(iam_definition_path).read_bytes())
data: dict[str, Any] = orjson.loads(iam_definition_path.read_bytes())

if gc_enabled:
gc.enable()
Expand Down

0 comments on commit c02b8ef

Please sign in to comment.