From c68ac3f212542f1b9228c067adba138e71ed442d Mon Sep 17 00:00:00 2001 From: Vahagn Date: Thu, 29 Aug 2024 12:23:52 +0400 Subject: [PATCH] Add parallelism --- main.py | 10 +++++----- scanner.py | 55 ++++++++++++++++++++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/main.py b/main.py index 47d4d7c..f4c7ac7 100644 --- a/main.py +++ b/main.py @@ -11,12 +11,13 @@ def main(): parser = argparse.ArgumentParser(description="ABAP Code Scanner") parser.add_argument("path", help="Path to the ABAP code directory or file") parser.add_argument("-c", "--config", help="Path to configuration file", default="config.yml") + parser.add_argument("-t", "--threads", type=int, help="Number of threads to use for scanning", default=48) args = parser.parse_args() config = Config(args.config) scanner = Scanner(config) - results = scanner.scan(args.path) + results = scanner.scan(args.path, num_threads=args.threads) # Convert scanner results to ScanResult objects, now including severity report_results = [ @@ -25,16 +26,15 @@ def main(): line_number=result.line_number, title=result.title, message=result.message, - severity=result.severity # Make sure your scanner provides this information + severity=result.severity ) for result in results ] # Generate the XLSX report generate_xlsx_report(report_results, "abap_security_scan_report.xlsx") - print("Scan complete. XLSX report generated: abap_security_scan_report.xlsx") - + print(f"Scan complete. XLSX report generated: abap_security_scan_report.xlsx") if __name__ == "__main__": - main() + main() \ No newline at end of file diff --git a/scanner.py b/scanner.py index 33cb201..9e3a2eb 100644 --- a/scanner.py +++ b/scanner.py @@ -4,6 +4,8 @@ import os from typing import List, NamedTuple from tqdm import tqdm +import concurrent.futures +import chardet class ScanResult(NamedTuple): @@ -27,8 +29,7 @@ def _load_checks(self): checks.append(check_class()) return checks - def scan(self, path: str, limit: int = 40000) -> List[ScanResult]: - results = [] + def scan(self, path: str, limit: int = 1000000000, num_threads: int = 48) -> List[ScanResult]: files_to_scan = [] # Collect all files to scan @@ -47,24 +48,42 @@ def scan(self, path: str, limit: int = 40000) -> List[ScanResult]: # Limit the number of files to scan files_to_scan = files_to_scan[:limit] - # Scan files with progress bar - for file_path in tqdm(files_to_scan, desc="Scanning files", unit="file"): - results.extend(self._scan_file(file_path)) + # Scan files in parallel with progress bar + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(self._scan_file, file_path) for file_path in files_to_scan] + results = [] + for future in tqdm(concurrent.futures.as_completed(futures), total=len(files_to_scan), desc="Scanning files", unit=" file"): + results.extend(future.result()) return results def _scan_file(self, file_path: str) -> List[ScanResult]: results = [] - with open(file_path, 'r') as f: - content = f.read() - for check in self.checks: - check_results = check.run(content) - for result in check_results: - results.append(ScanResult( - file_path=file_path, - line_number=result.line_number, - title=check.title, - message=result.line_content, - severity=check.severity - )) - return results + try: + # First, try to detect the file encoding + with open(file_path, 'rb') as f: + raw_data = f.read() + detected_encoding = chardet.detect(raw_data)['encoding'] + + # Try to read the file with the detected encoding + try: + with open(file_path, 'r', encoding=detected_encoding) as f: + content = f.read() + except UnicodeDecodeError: + # If that fails, try with 'latin-1' encoding, which should read all byte values + with open(file_path, 'r', encoding='latin-1') as f: + content = f.read() + + for check in self.checks: + check_results = check.run(content) + for result in check_results: + results.append(ScanResult( + file_path=file_path, + line_number=result.line_number, + title=check.title, + message=result.line_content, + severity=check.severity + )) + except Exception as e: + print(f"Error scanning file {file_path}: {str(e)}") + return results \ No newline at end of file