Skip to content

Commit

Permalink
feat: 451 audit utilities directory (#476)
Browse files Browse the repository at this point in the history
secureli-451

<!-- Include general description here -->


## Changes
<!-- A detailed list of changes -->
* Updated utility files by moving to appropriate modules and combining
to single utilities.py file.

## Testing
<!--
Mention updated tests and any manual testing performed.
Are aspects not yet tested or not easily testable?
Feel free to include screenshots if appropriate.
 -->
* All existing unit tests are passing

## Clean Code Checklist
<!-- This is here to support you. Some/most checkboxes may not apply to
your change -->
- [x] Meets acceptance criteria for issue
- [ ] New logic is covered with automated tests
- [ ] Appropriate exception handling added
- [ ] Thoughtful logging included
- [ ] Documentation is updated
- [ ] Follow-up work is documented in TODOs
- [ ] TODOs have a ticket associated with them
- [x] No commented-out code included


<!--
Github-flavored markdown reference:
https://docs.github.com/en/get-started/writing-on-github
-->
  • Loading branch information
isaac-heist-slalom authored Mar 13, 2024
1 parent 8b1d92b commit 83bedc1
Show file tree
Hide file tree
Showing 30 changed files with 243 additions and 250 deletions.
2 changes: 1 addition & 1 deletion secureli/actions/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from secureli.modules.core.core_services.scanner import ScannerService
from secureli.modules.core.core_services.updater import UpdaterService

from secureli.modules.shared.utilities.formatter import format_sentence_list
from secureli.modules.shared.utilities import format_sentence_list


class ActionDependencies:
Expand Down
6 changes: 3 additions & 3 deletions secureli/actions/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from secureli.modules.core.core_services.scanner import ScannerService
from secureli.modules.shared.models.scan import ScanMode
from secureli.settings import Settings
from secureli.modules.shared.utilities import usage_stats
from secureli.modules.shared import utilities

ONE_WEEK_IN_SECONDS: int = 7 * 24 * 60 * 60

Expand Down Expand Up @@ -85,7 +85,7 @@ def publish_results(
publish_results_condition == PublishResultsOption.ON_FAIL
and not action_successful
):
result = usage_stats.post_log(log_str, Settings())
result = utilities.post_log(log_str, Settings())
self.echo.debug(result.result_message)

if result.result == Result.SUCCESS:
Expand Down Expand Up @@ -137,7 +137,7 @@ def scan_repo(
[ob.__dict__ for ob in scan_result.failures]
)

individual_failure_count = usage_stats.convert_failures_to_failure_count(
individual_failure_count = utilities.convert_failures_to_failure_count(
scan_result.failures
)

Expand Down
2 changes: 1 addition & 1 deletion secureli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from secureli.modules.shared.resources import read_resource
from secureli.settings import Settings
import secureli.repositories.secureli_config as SecureliConfig
from secureli.modules.shared.utilities.secureli_meta import secureli_version
from secureli.modules.shared.utilities import secureli_version

# Create SetupAction outside of DI, as it's not yet available.
setup_action = SetupAction(epilog_template_data=read_resource("epilog.md"))
Expand Down
3 changes: 1 addition & 2 deletions secureli/modules/language_analyzer/language_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from secureli.modules.shared.models import language
from secureli.modules.shared.resources.slugify import slugify
from secureli.modules.shared.utilities.hash import hash_config
from secureli.modules.shared.utilities.patterns import combine_patterns
from secureli.modules.shared.utilities import combine_patterns, hash_config


class LanguageConfigService:
Expand Down
2 changes: 1 addition & 1 deletion secureli/modules/language_analyzer/language_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import secureli.repositories.secureli_config as SecureliConfig
from secureli.modules.shared.abstractions.pre_commit import PreCommitAbstraction
from secureli.modules.language_analyzer import git_ignore, language_config
from secureli.modules.shared.utilities.hash import hash_config
from secureli.modules.shared.utilities import hash_config


class BuildConfigResult(pydantic.BaseModel):
Expand Down
11 changes: 5 additions & 6 deletions secureli/modules/observability/observability_services/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
import secureli.repositories.secureli_config as SecureliConfig
from secureli.modules.language_analyzer import language_support
from secureli.repositories.secureli_config import SecureliConfigRepository
from secureli.modules.shared.utilities import git_meta
from secureli.modules.shared.utilities.secureli_meta import secureli_version
from secureli.modules.shared import utilities


def generate_unique_id() -> str:
"""
A unique identifier representing the log entry, including various
bits specific to the user and environment
"""
origin_email_branch = f"{git_meta.origin_url()}|{git_meta.git_user_email()}|{git_meta.current_branch_name()}"
origin_email_branch = f"{utilities.origin_url()}|{utilities.git_user_email()}|{utilities.current_branch_name()}"
return f"{uuid4()}|{origin_email_branch}"


Expand All @@ -43,9 +42,9 @@ class LogEntry(pydantic.BaseModel):

id: str = generate_unique_id()
timestamp: datetime = datetime.utcnow()
username: str = git_meta.git_user_email()
username: str = utilities.git_user_email()
machineid: str = platform.uname().node
secureli_version: str = secureli_version()
secureli_version: str = utilities.secureli_version()
languages: Optional[list[str]]
status: LogStatus
action: LogAction
Expand Down Expand Up @@ -125,7 +124,7 @@ def failure(
def _log(self, log_entry: LogEntry):
"""Commit a log entry to the branch log file"""
log_folder_path = Path(SecureliConfig.FOLDER_PATH / ".secureli/logs")
path_to_log = log_folder_path / f"{git_meta.current_branch_name()}"
path_to_log = log_folder_path / f"{utilities.current_branch_name()}"

# Do not simply mkdir the log folder path, in case the branch name contains
# additional folder structure, like `bugfix/` or `feature/`
Expand Down
21 changes: 9 additions & 12 deletions secureli/modules/shared/abstractions/echo.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import IO, Optional

import sys
import typer
from secureli.modules.shared.models.echo import Color

from secureli.modules.shared.utilities.logging import EchoLevel
from secureli.modules.shared.models.echo import Color, Level


class EchoAbstraction(ABC):
Expand All @@ -18,15 +15,15 @@ class EchoAbstraction(ABC):
"""

def __init__(self, level: str):
self.print_enabled = level != EchoLevel.off
self.debug_enabled = level == EchoLevel.debug
self.info_enabled = level in [EchoLevel.debug, EchoLevel.info]
self.warn_enabled = level in [EchoLevel.debug, EchoLevel.info, EchoLevel.warn]
self.print_enabled = level != Level.off
self.debug_enabled = level == Level.debug
self.info_enabled = level in [Level.debug, Level.info]
self.warn_enabled = level in [Level.debug, Level.info, Level.warn]
self.error_enabled = level in [
EchoLevel.debug,
EchoLevel.info,
EchoLevel.warn,
EchoLevel.error,
Level.debug,
Level.info,
Level.warn,
Level.error,
]

@abstractmethod
Expand Down
14 changes: 14 additions & 0 deletions secureli/modules/shared/models/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@ class Color(str, Enum):
MAGENTA = "magenta"
CYAN = "cyan"
WHITE = "white"


class Level(str, Enum):
debug = "DEBUG"
info = "INFO"
warn = "WARN"
error = "ERROR"
off = "OFF"

def __str__(self) -> str:
return self.value

def __repr__(self) -> str:
return self.__str__()
155 changes: 155 additions & 0 deletions secureli/modules/shared/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import configparser
import hashlib
import os
import requests
import subprocess

from collections import Counter
from importlib.metadata import version
from typing import Optional

from secureli.modules.observability.consts import logging
from secureli.modules.shared.models.publish_results import PublishLogResult
from secureli.modules.shared.models.result import Result
from secureli.modules.shared.models.scan import ScanFailure
from secureli.settings import Settings


def combine_patterns(patterns: list[str]) -> Optional[str]:
"""
Combines a set of patterns from pathspec into a single combined regex
suitable for use in circumstances that require a broad matching, such as
re.find_all or a pre-commit exclusion pattern.
Calling this with an empty set of patterns will return None
:param patterns: The pattern strings provided by pathspec to combine
:return: a combined pattern containing the one or more patterns supplied, or None
if no patterns were supplied
"""
if not patterns:
return None

if len(patterns) == 1:
return patterns[0]

# Don't mutate the input
ignored_file_patterns = patterns.copy()

# Quick sanitization process. Ultimately we combine all the regexes into a single
# entry, and all patterns are generated with a capture group with the same ID, which
# is invalid. We need to make sure every capture group is unique to combine them
# into a single expression. This is not my favorite.
for i in range(0, len(ignored_file_patterns)):
ignored_file_patterns[i] = str.replace(
ignored_file_patterns[i], "ps_d", f"ps_d{i}"
)
combined_patterns = str.join("|", ignored_file_patterns)
combined_ignore_pattern = f"^({combined_patterns})$"
return combined_ignore_pattern


def convert_failures_to_failure_count(failure_list: list[ScanFailure]):
"""
Convert a list of Failure ids to a list of individual failure count with underscore naming convention
:param failure_list: a list of Failure Object
"""
list_of_failure_ids = []

for failure in failure_list:
failure_id_underscore = failure.id.replace("-", "_")
list_of_failure_ids.append(failure_id_underscore)

failure_count_list = Counter(list_of_failure_ids)
return failure_count_list


def current_branch_name() -> str:
"""Leverage the git HEAD file to determine the current branch name"""
try:
with open(".git/HEAD", "r") as f:
content = f.readlines()
for line in content:
if line[0:4] == "ref:":
return line.partition("refs/heads/")[2].strip()
except IOError:
return "UNKNOWN"


def format_sentence_list(items: list[str]) -> str:
"""
Formats a list of string values to a comma separated
string for use in a sentence structure.
:param items: list of strings to join
:return string of joined values as a sentence comma list
i.e. "x, y, and z" or "x and y"
"""
if not items:
return ""
elif len(items) == 1:
return items[0]
and_separator = ", and " if len(items) > 2 else " and "
return ", ".join(items[:-1]) + and_separator + items[-1]


def git_user_email() -> str:
"""Leverage the command prompt to derive the user's email address"""
args = ["git", "config", "user.email"]
completed_process = subprocess.run(args, stdout=subprocess.PIPE)
output = completed_process.stdout.decode("utf8").strip()
return output


def hash_config(config: str) -> str:
"""
Creates an MD5 hash from a config string
:return: A hash string
"""
config_hash = hashlib.md5(config.encode("utf8"), usedforsecurity=False).hexdigest()

return config_hash


def origin_url() -> str:
"""Leverage the git config file to determine the remote origin URL"""
git_config_parser = configparser.ConfigParser()
git_config_parser.read(".git/config")
return (
git_config_parser['remote "origin"'].get("url", "UNKNOWN", raw=True)
if git_config_parser.has_section('remote "origin"')
else "UNKNOWN"
)


def post_log(log_data: str, settings: Settings) -> PublishLogResult:
"""
Send a log through http post
:param log_data: a string to be sent to backend instrumentation
"""

api_endpoint = (
os.getenv(logging.TELEMETRY_ENDPOINT_ENV_VAR_NAME) or settings.telemetry.api_url
)
api_key = os.getenv(logging.TELEMETRY_KEY_ENV_VAR_NAME)

if not api_endpoint or not api_key:
return PublishLogResult(
result=Result.FAILURE,
result_message=f"{logging.TELEMETRY_ENDPOINT_ENV_VAR_NAME} or {logging.TELEMETRY_KEY_ENV_VAR_NAME} not found in environment variables",
)

try:
result = requests.post(
url=api_endpoint, headers={"Api-Key": api_key}, data=log_data
)
except Exception as e:
return PublishLogResult(
result=Result.FAILURE,
result_message=f'Error posting log to {api_endpoint}: "{e}"',
)

return PublishLogResult(result=Result.SUCCESS, result_message=result.text)


def secureli_version() -> str:
"""Leverage package resources to determine the current version of secureli"""
return version("secureli")
Empty file.
14 changes: 0 additions & 14 deletions secureli/modules/shared/utilities/formatter.py

This file was deleted.

33 changes: 0 additions & 33 deletions secureli/modules/shared/utilities/git_meta.py

This file was deleted.

11 changes: 0 additions & 11 deletions secureli/modules/shared/utilities/hash.py

This file was deleted.

15 changes: 0 additions & 15 deletions secureli/modules/shared/utilities/logging.py

This file was deleted.

Loading

0 comments on commit 83bedc1

Please sign in to comment.