From 1c4802d2f08c217518b00fe5988fb43b978f5d1a Mon Sep 17 00:00:00 2001 From: Roman Babenko Date: Tue, 24 Dec 2024 15:16:02 +0200 Subject: [PATCH] softreset --- .mypy.ini | 3 + credsweeper/__main__.py | 114 ++++++++++++++++++++++++++++++++--- docs/source/guide.rst | 9 ++- requirements.txt | 1 + tests/data/depth_3.json | 2 +- tests/data/doc.json | 2 +- tests/data/ml_threshold.json | 2 +- tests/data/output.json | 2 +- tests/test_app.py | 33 ++++++++++ tests/test_main.py | 42 +++++++++++++ 10 files changed, 195 insertions(+), 15 deletions(-) diff --git a/.mypy.ini b/.mypy.ini index 1946e5b90..1b3c8477b 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -49,5 +49,8 @@ ignore_missing_imports = True [mypy-docx.*] ignore_missing_imports = True +[mypy-pydriller.*] +ignore_missing_imports = True + [mypy-base62.*] ignore_missing_imports = True diff --git a/credsweeper/__main__.py b/credsweeper/__main__.py index 058bc15ac..526dfd07a 100644 --- a/credsweeper/__main__.py +++ b/credsweeper/__main__.py @@ -1,10 +1,16 @@ +import base64 import binascii +import hashlib +import io import logging import os import sys import time +import warnings from argparse import ArgumentParser, ArgumentTypeError, Namespace -from typing import Any, Union, Dict +from typing import Any, Union, Optional, Dict, List, Tuple + +from pydriller import Repository from credsweeper import __version__ from credsweeper.app import APP_PATH, CredSweeper @@ -116,16 +122,27 @@ def get_arguments() -> Namespace: const="log.yaml", dest="export_log_config", metavar="PATH") + group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH") + parser.add_argument("--commits", + help="scan git repo for N commits only", + type=positive_int, + dest="commits", + default=0, + metavar="POSITIVE_INT") + parser.add_argument("--branch", + help="scan git repo for single branch, otherwise - all branches were scanned (slow)", + dest="branch", + type=str) parser.add_argument("--rules", help="path of rule config file (default: credsweeper/rules/config.yaml). " - f"severity:{[i.value for i in Severity]} " - f"type:{[i.value for i in RuleType]}", + f"severity:{[i.value for i in Severity]} " + f"type:{[i.value for i in RuleType]}", default=None, dest="rule_path", metavar="PATH") parser.add_argument("--severity", help=f"set minimum level for rules to apply {[i.value for i in Severity]}" - f"(default: '{Severity.INFO}', case insensitive)", + f"(default: '{Severity.INFO}', case insensitive)", default=Severity.INFO, dest="severity", type=severity_levels) @@ -159,9 +176,9 @@ def get_arguments() -> Namespace: parser.add_argument("--doc", help="document-specific scanning", dest="doc", action="store_true") parser.add_argument("--ml_threshold", help="setup threshold for the ml model. " - "The lower the threshold - the more credentials will be reported. " - f"Allowed values: float between 0 and 1, or any of {[e.value for e in ThresholdPreset]} " - "(default: medium)", + "The lower the threshold - the more credentials will be reported. " + f"Allowed values: float between 0 and 1, or any of {[e.value for e in ThresholdPreset]} " + "(default: medium)", type=threshold_or_float, default=ThresholdPreset.medium, dest="ml_threshold", @@ -232,7 +249,7 @@ def get_arguments() -> Namespace: parser.add_argument("--log", "-l", help=f"provide logging level of {list(Logger.LEVELS.keys())}" - f"(default: 'warning', case insensitive)", + f"(default: 'warning', case insensitive)", default="warning", dest="log", metavar="LOG_LEVEL", @@ -252,7 +269,6 @@ def get_arguments() -> Namespace: version=f"CredSweeper {__version__}") return parser.parse_args() - def scan(args: Namespace, content_provider: AbstractProvider) -> int: """Scan content_provider data, print results or save them to json_filename is not None @@ -299,9 +315,80 @@ def scan(args: Namespace, content_provider: AbstractProvider) -> int: return credsweeper.run(content_provider=content_provider) except Exception as exc: logger.critical(exc, exc_info=True) + logger.exception(exc) return -1 +def drill(args: Namespace) -> Tuple[int, int, int]: + """Scan repository for branches and commits + Returns: + total credentials found + total scanned branches + total scanned commits + """ + total_credentials = 0 + total_branches = 0 + total_commits = 0 + try: + sha1git = hashlib.sha1(str(args.git).encode()).digest() + repo_hash = base64.b32encode(sha1git).decode("ascii") + journal_filename = f"{repo_hash}.json" + logger.info(f"{args.git} sha1 in base32 {repo_hash}") + repo_journal = Util.json_load(journal_filename) + if not isinstance(repo_journal, dict): + with open(journal_filename, "w") as f: + f.write("{}") + repo_journal = {"repo": args.git} + credsweeper = CredSweeper(rule_path=args.rule_path, + config_path=args.config_path, + sort_output=args.sort_output, + use_filters=args.no_filters, + pool_count=args.jobs, + ml_batch_size=args.ml_batch_size, + ml_threshold=args.ml_threshold, + ml_providers=args.ml_providers, + find_by_ext=args.find_by_ext, + depth=args.depth, + doc=args.doc, + severity=args.severity, + size_limit=args.size_limit, + log_level=args.log) + repository = Repository(args.git, only_in_branch=args.branch) + for commit in repository.traverse_commits(): + if commit.hash in repo_journal: + logger.debug(f"Skip already scanned commit: {commit.hash}") + continue + logger.info(f"Scan commit: {commit.hash}") + paths: List[Tuple[str, io.BytesIO]] = [] + for file in commit.modified_files: + logger.info(f"FILE: {file.old_path} -> {file.new_path}") + try: + if file.new_path is not None: + _io = io.BytesIO(file.content) + paths.append((file.new_path or file.old_path, _io)) + except ValueError as exc: + logger.error("Possible missed submodule:%s", str(exc)) + provider = FilesProvider(paths) + if args.json_filename: + ext = Util.get_extension(args.json_filename, False) + credsweeper.json_filename = f"{args.json_filename[:-len(ext)]}.{commit.hash}{ext}" + if args.xlsx_filename: + ext = Util.get_extension(args.xlsx_filename, False) + credsweeper.xlsx_filename = f"{args.xlsx_filename[:-len(ext)]}.{commit.hash}{ext}" + + commit_cred_number = credsweeper.run(provider) + credsweeper.credential_manager.candidates.clear() + total_credentials += commit_cred_number + total_commits += 1 + repo_journal[commit.hash] = commit_cred_number + Util.json_dump(repo_journal, journal_filename) + total_branches += 1 + except Exception as exc: + logger.critical(exc, exc_info=True) + return -1, total_branches, total_commits + return total_credentials, total_branches, total_commits + + def main() -> int: """Main function""" result = EXIT_FAILURE @@ -310,7 +397,7 @@ def main() -> int: if args.banner: print(f"CredSweeper {__version__} crc32:{check_integrity():08x}") Logger.init_logging(args.log, args.log_config_path) - logger.info(f"Init CredSweeper object with arguments: {args}") + logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}") summary: Dict[str, int] = {} if args.path: logger.info(f"Run analyzer on path: {args.path}") @@ -332,6 +419,13 @@ def main() -> int: summary["Deleted File Credentials"] = del_credentials_number if 0 <= add_credentials_number and 0 <= del_credentials_number: result = EXIT_SUCCESS + elif args.git: + logger.info(f"Run analyzer on GIT: {args.git}") + credentials_number, branches_number, commits_number = drill(args) + summary[ + f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number + if 0 <= credentials_number: + result = EXIT_SUCCESS elif args.export_config: logging.info(f"Exporting default config to file: {args.export_config}") config_dict = Util.json_load(APP_PATH / "secret" / "config.json") diff --git a/docs/source/guide.rst b/docs/source/guide.rst index 76d40d52a..92689fa78 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -13,7 +13,9 @@ Get all argument list: .. code-block:: text - usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH]) + usage: python -m credsweeper [-h] + (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...]) + [--commits POSITIVE_INT] [--branch BRANCH] [--rules PATH] [--severity SEVERITY] [--config PATH] [--log_config PATH] [--denylist PATH] [--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR] [--ml_batch_size POSITIVE_INT] [--ml_config PATH] [--ml_model PATH] [--ml_providers STR] @@ -31,6 +33,11 @@ Get all argument list: exporting default config to file (default: config.json) --export_log_config [PATH] exporting default logger config to file (default: log.yaml) + --git PATH [PATH ...] + git repo to scan + --commits POSITIVE_INT + scan git repo for N commits only + --branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow) --rules PATH path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi'] --severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive) --config PATH use custom config (default: built-in) diff --git a/requirements.txt b/requirements.txt index 0c1822218..ade38c68b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,7 @@ pandas==2.2.3; python_version >= '3.9' password-strength==0.0.3.post2 pdfminer.six==20240706 pybase62==1.0.0 +PyDriller==2.7 pyjks==20.0.0 python-dateutil==2.9.0.post0 python-docx==1.1.2 diff --git a/tests/data/depth_3.json b/tests/data/depth_3.json index f0c4c2c15..e7ac5671a 100644 --- a/tests/data/depth_3.json +++ b/tests/data/depth_3.json @@ -13859,4 +13859,4 @@ } ] } -] +] \ No newline at end of file diff --git a/tests/data/doc.json b/tests/data/doc.json index 3684276cd..d63148d99 100644 --- a/tests/data/doc.json +++ b/tests/data/doc.json @@ -19486,4 +19486,4 @@ } ] } -] +] \ No newline at end of file diff --git a/tests/data/ml_threshold.json b/tests/data/ml_threshold.json index b8d5aa894..65565c086 100644 --- a/tests/data/ml_threshold.json +++ b/tests/data/ml_threshold.json @@ -11424,4 +11424,4 @@ } ] } -] +] \ No newline at end of file diff --git a/tests/data/output.json b/tests/data/output.json index c2c06a5cd..4f6e4c81e 100644 --- a/tests/data/output.json +++ b/tests/data/output.json @@ -10592,4 +10592,4 @@ } ] } -] +] \ No newline at end of file diff --git a/tests/test_app.py b/tests/test_app.py index 5152cd451..284fbcb8d 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -5,6 +5,7 @@ import subprocess import sys import tempfile +from pathlib import Path import time from typing import AnyStr, Tuple from unittest import TestCase @@ -12,6 +13,7 @@ import deepdiff import numpy as np import pandas as pd +from git import Repo from credsweeper.app import APP_PATH from credsweeper.utils import Util @@ -203,7 +205,10 @@ def test_it_works_n(self) -> None: " | --diff_path PATH [PATH ...]" \ " | --export_config [PATH]" \ " | --export_log_config [PATH]" \ + " | --git PATH [PATH ...]" \ ")" \ + " [--commits POSITIVE_INT]" \ + " [--branch BRANCH]" \ " [--rules PATH]" \ " [--severity SEVERITY]" \ " [--config PATH]" \ @@ -235,6 +240,7 @@ def test_it_works_n(self) -> None: " --diff_path" \ " --export_config" \ " --export_log_config" \ + " --git" \ " is required " expected = " ".join(expected.split()) self.assertEqual(expected, output) @@ -707,6 +713,33 @@ def test_doc_n(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_pydriller_p(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + with Repo.init(tmp_dir) as repo: + cred_file = Path(tmp_dir) / "with_cred" + value = "GbdD@23#d0" + with open(cred_file, "w") as f: + f.write(f"git_password: {value}") + repo.index.add([cred_file]) + repo.index.commit("added file") + with open(cred_file, "w") as f: + f.write("DELETED") + repo.index.add([cred_file]) + repo.index.commit("cleared file") + # check that value is not in the file + with open(cred_file, "r") as f: + self.assertNotIn(value, f.read()) + # run git scan + _stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)]) + self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stderr, _stderr) + # check detected value in stdout + self.assertIn(value, _stdout, _stdout) + # del repo + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_external_ml_n(self) -> None: # not existed ml_config _stdout, _stderr = self._m_credsweeper( diff --git a/tests/test_main.py b/tests/test_main.py index 0436658ff..ff0f5ece4 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -325,6 +325,48 @@ def test_find_by_ext_and_not_ignore_p(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multiple_invocation_p(self) -> None: + # test whether ml_validator is created once + self.maxDiff = None + cred_sweeper = CredSweeper() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + # found candidate is not ML validated + provider = StringContentProvider(["qpF8Q~PCM5MhMoyTFc5TYEomnzRUKim9UJhe8a6E"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Azure Secret Value", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + + # found candidate is ML validated + provider = StringContentProvider(['"nonce": "qPRjfoZWaBPH0KbXMCicm5v1VdG5Hj0DUFMHdSxPOiS"']) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Nonce", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # remember id of the validator + validator_id = id(cred_sweeper.ml_validator) + + # found candidate is ML validated also + provider = StringContentProvider(["password = Xdj@jcN834b"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Password", candidates[0].rule_name) + # the ml_validator still initialized + self.assertTrue(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # the same id of the validator + self.assertEqual(validator_id, id(cred_sweeper.ml_validator)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multi_jobs_p(self) -> None: # real result might be shown in code coverage content_provider: AbstractProvider = FilesProvider([SAMPLES_PATH])