Skip to content

Commit

Permalink
add query command for service data
Browse files Browse the repository at this point in the history
  • Loading branch information
gruebel committed Jul 6, 2024
1 parent 36dbd37 commit c0e90b5
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
60 changes: 55 additions & 5 deletions policy_sentry/command/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import click
import yaml

from policy_sentry.querying.services import get_services_data
from policy_sentry.util.access_levels import transform_access_level_text
from policy_sentry.querying.all import get_all_service_prefixes
from policy_sentry.querying.arns import (
Expand Down Expand Up @@ -47,8 +49,17 @@ def print_list(output: Any, fmt: str = "json") -> None:


def print_dict(output: Any, fmt: str = "json") -> None:
"""Common method on how to print a dict, depending on whether the user requests JSON or YAML output"""
print(yaml.dump(output)) if fmt == "yaml" else [print(json.dumps(output, indent=4))]
"""Common method on how to print a dict, depending on whether the user requests JSON, YAML or CSV output"""
if fmt == "csv":
if not output:
return None
print(",".join(output[0].keys()))
for entry in output:
print(",".join(entry.values()))
elif fmt == "json":
print(json.dumps(output, indent=4))
elif fmt == "yaml":
print(yaml.dump(output))


@click.group()
Expand Down Expand Up @@ -97,7 +108,7 @@ def query() -> None:
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -229,7 +240,7 @@ def query_action_table(
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -296,7 +307,7 @@ def query_arn_table(
type=click.Choice(["yaml", "json"]),
default="json",
required=False,
help='Format output as YAML or JSON. Defaults to "yaml"',
help='Format output as YAML or JSON. Defaults to "json"',
)
@click.option(
"--verbose",
Expand Down Expand Up @@ -334,3 +345,42 @@ def query_condition_table(
output = get_condition_key_details(service, name)
print_dict(output=output, fmt=fmt)
return output


@query.command(short_help="Query the service table.")
@click.option(
"--fmt",
type=click.Choice(["yaml", "json", "csv"]),
default="json",
required=False,
help='Format output as YAML, JSON or CSV. Defaults to "json"',
)
@click.option(
"--verbose",
"-v",
type=click.Choice(
["critical", "error", "warning", "info", "debug"], case_sensitive=False
),
)
def service_table(fmt: str, verbose: str | None) -> None:
"""Query the service table from the Policy Sentry database"""
if verbose:
log_level = getattr(logging, verbose.upper())
set_stream_logger(level=log_level)
query_service_table(fmt)


def query_service_table(fmt: str = "json") -> list[dict[str, str]]:
"""Query the service table from the Policy Sentry database.
Use this one when leveraging Policy Sentry as a library."""
if os.path.exists(LOCAL_DATASTORE_FILE_PATH):
logger.info(
f"Using the Local IAM definition: {LOCAL_DATASTORE_FILE_PATH}. To leverage the bundled definition instead, remove the folder $HOME/.policy_sentry/"
)
else:
# Otherwise, leverage the datastore inside the python package
logger.debug("Leveraging the bundled IAM Definition.")

output = get_services_data()
print_dict(output=output, fmt=fmt)
return output
19 changes: 19 additions & 0 deletions policy_sentry/querying/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Methods that execute specific queries against the SQLite database for the SERIVCES table.
This supports the policy_sentry query functionality
"""

from __future__ import annotations

from policy_sentry.shared.iam_data import iam_definition


def get_services_data() -> list[dict[str, str]]:
return [
{
"prefix": service_prefix,
"service_name": data["service_name"],
}
for service_prefix, data in iam_definition.items()
if isinstance(data, dict)
]
16 changes: 16 additions & 0 deletions test/querying/test_query_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import unittest

from policy_sentry.querying.services import get_services_data


class QueryServicesTestCase(unittest.TestCase):
def test_get_services_data(self):
# when
results = get_services_data()

# then
self.assertGreater(len(results), 400) # in 07/24 it was 405

# both should be a non-empty string
self.assertTrue(results[0]["prefix"])
self.assertTrue(results[0]["service_name"])

0 comments on commit c0e90b5

Please sign in to comment.