diff --git a/socketdev/__init__.py b/socketdev/__init__.py index dbe491b..e52e642 100644 --- a/socketdev/__init__.py +++ b/socketdev/__init__.py @@ -1,5 +1,6 @@ from socketdev.core.api import API from socketdev.dependencies import Dependencies +from socketdev.diffscans import DiffScans from socketdev.export import Export from socketdev.fullscans import FullScans from socketdev.historical import Historical @@ -61,6 +62,7 @@ def __init__(self, token: str, timeout: int = 1200): self.utils = Utils() self.labels = Labels(self.api) self.licensemetadata = LicenseMetadata(self.api) + self.diffscans = DiffScans(self.api) @staticmethod def set_timeout(timeout: int): diff --git a/socketdev/core/api.py b/socketdev/core/api.py index 4f2faa3..566b53b 100644 --- a/socketdev/core/api.py +++ b/socketdev/core/api.py @@ -54,8 +54,9 @@ def do_request( def format_headers(headers_dict): return "\n".join(f"{k}: {v}" for k, v in headers_dict.items()) + start_time = time.time() try: - start_time = time.time() + response = requests.request( method.upper(), url, headers=headers, data=payload, files=files, timeout=self.request_timeout ) @@ -103,14 +104,19 @@ def format_headers(headers_dict): log.error(f"Upstream server error{path_str}{headers_str}") raise APIBadGateway() if response.status_code >= 400: + try: + error_json = response.json() + except Exception: + error_json = None + error_message = error_json.get("error", {}).get("message") if error_json else response.text error = ( - f"Bad Request: HTTP original_status_code:{response.status_code}{path_str}{headers_str}" + f"Bad Request: HTTP original_status_code:{response.status_code}{path_str}{headers_str}\n" + f"Error message: {error_message}" ) log.error(error) - raise APIFailure() + raise APIFailure(error) return response - except Timeout: request_duration = time.time() - start_time log.error(f"Request timed out after {request_duration:.2f} seconds") diff --git a/socketdev/diffscans/__init__.py b/socketdev/diffscans/__init__.py new file mode 100644 index 0000000..86ea575 --- /dev/null +++ b/socketdev/diffscans/__init__.py @@ -0,0 +1,73 @@ +import json +import logging +from typing import Any, Dict, Optional, Union + +log = logging.getLogger("socketdev") + +class DiffScans: + def __init__(self, api): + self.api = api + + def list(self, org_slug: str, params: Optional[Dict[str, Any]] = None) -> dict: + """List all diff scans for an organization.""" + path = f"orgs/{org_slug}/diff-scans" + if params: + import urllib.parse + path += "?" + urllib.parse.urlencode(params) + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + return response.json() + log.error(f"Error listing diff scans: {response.status_code}, message: {response.text}") + return {} + + def get(self, org_slug: str, diff_scan_id: str) -> dict: + """Fetch a diff scan by ID.""" + path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}" + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + return response.json() + log.error(f"Error fetching diff scan: {response.status_code}, message: {response.text}") + return {} + + def create_from_repo(self, org_slug: str, repo_slug: str, files: list, params: Optional[Dict[str, Any]] = None) -> dict: + """Create a diff scan from repo HEAD, uploading files as multipart form data.""" + import urllib.parse + path = f"orgs/{org_slug}/diff-scans/from-repo/{repo_slug}" + if params: + path += "?" + urllib.parse.urlencode(params) + response = self.api.do_request(path=path, method="POST", files=files) + if response.status_code in (200, 201): + return response.json() + log.error(f"Error creating diff scan from repo: {response.status_code}, message: {response.text}") + return {} + + def create_from_ids(self, org_slug: str, params: Dict[str, Any]) -> dict: + """Create a diff scan from two full scan IDs using query params.""" + import urllib.parse + path = f"orgs/{org_slug}/diff-scans/from-ids" + if params: + path += "?" + urllib.parse.urlencode(params) + response = self.api.do_request(path=path, method="POST") + if response.status_code in (200, 201): + return response.json() + log.error(f"Error creating diff scan from IDs: {response.status_code}, message: {response.text}") + return {} + + def gfm(self, org_slug: str, diff_scan_id: str) -> dict: + """Fetch GFM (GitHub Flavored Markdown) comments for a diff scan.""" + path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}/gfm" + response = self.api.do_request(path=path, method="GET") + if response.status_code == 200: + return response.json() + log.error(f"Error fetching diff scan GFM: {response.status_code}, message: {response.text}") + return {} + + def delete(self, org_slug: str, diff_scan_id: str) -> bool: + """Delete a diff scan by ID.""" + path = f"orgs/{org_slug}/diff-scans/{diff_scan_id}" + response = self.api.do_request(path=path, method="DELETE") + if response.status_code == 200: + if "status" in response.json() and response.json()["status"] == "ok": + return True + log.error(f"Error deleting diff scan: {response.status_code}, message: {response.text}") + return False diff --git a/socketdev/version.py b/socketdev/version.py index df4be5e..0b167e6 100644 --- a/socketdev/version.py +++ b/socketdev/version.py @@ -1 +1 @@ -__version__ = "2.1.4" +__version__ = "2.1.5" diff --git a/tests/integration/test_diffscans_integration.py b/tests/integration/test_diffscans_integration.py new file mode 100644 index 0000000..54ddf6b --- /dev/null +++ b/tests/integration/test_diffscans_integration.py @@ -0,0 +1,319 @@ +import unittest +import os +import tempfile +import shutil +from socketdev import socketdev +from socketdev.fullscans import FullScanParams, IntegrationType + +class TestDiffScansIntegration(unittest.TestCase): + @classmethod + def setUpClass(cls): + api_key = os.getenv("SOCKET_SECURITY_API_KEY", "") + org_slug = os.getenv("SOCKET_ORG_SLUG", "") + repo_slug = os.getenv("SOCKET_REPO_SLUG", "") + missing = [ + name for name, val in [ + ("SOCKET_SECURITY_API_KEY", api_key), + ("SOCKET_ORG_SLUG", org_slug), + ("SOCKET_REPO_SLUG", repo_slug), + ] if not val + ] + if missing: + raise RuntimeError(f"Missing required environment variables: {', '.join(missing)}") + cls.sdk = socketdev(token=api_key) + cls.org_slug = org_slug + cls.repo_slug = repo_slug + # Prepare temp dir for manifest files + cls.temp_dir = tempfile.mkdtemp() + cls.package_json_path = os.path.join(cls.temp_dir, "package.json") + # Copy sample package.json + shutil.copyfile( + os.path.join(os.path.dirname(__file__), "package.json"), + cls.package_json_path + ) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.temp_dir) + + def test_full_diff_scan_flow(self): + params_before = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="before scan commit message", + commit_hash="deadbeefdeadbeefdeadbeefdeadbeefdeadbeef", + pull_request=123, + committers=["integration-tester"], + integration_type="api" + ) + + # 1. Create 'before' full scan with empty package.json + empty_package_path = os.path.join(os.path.dirname(__file__), "package-empty.json") + with open(empty_package_path, "rb") as f_before: + files_before = [("file", ("package.json", f_before))] + before_result = self.sdk.fullscans.post( + files=files_before, + params=params_before + ) + if not before_result or "id" not in before_result: + print("Full scan creation failed. Response:", before_result) + self.fail(f"Full scan creation failed: {before_result}") + before_id = before_result["id"] + self.assertIsNotNone(before_id) + + params_after = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="after scan commit message", + commit_hash="beefdeadbeefdeadbeefdeadbeefdeadbeefdead", + pull_request=124, + committers=["integration-tester"], + integration_type="api" + ) + + # 2. Create 'after' full scan with malware package.json + with open(self.package_json_path, "rb") as f_after: + files_after = [("file", ("package.json", f_after))] + after_result = self.sdk.fullscans.post( + files=files_after, + params=params_after + ) + if not after_result or "id" not in after_result: + print("Full scan creation failed. Response:", after_result) + self.fail(f"Full scan creation failed: {after_result}") + after_id = after_result["id"] + self.assertIsNotNone(after_id) + + # Only print before/after IDs if they are missing + if not before_id or not after_id: + print("before_id:", before_id) + print("after_id:", after_id) + self.assertIsNotNone(before_id, f"before_id is None. Full scan creation failed.") + self.assertIsNotNone(after_id, f"after_id is None. Full scan creation failed.") + + diff_body = {"before": before_id, "after": after_id, "description": "Integration test diff scan"} + diff_result = self.sdk.diffscans.create_from_ids(self.org_slug, diff_body) + diff_scan_id = ( + (diff_result.get("diff_scan", {}) or {}).get("id") + or diff_result.get("id") + or diff_result.get("diff_scan_id") + ) + if not diff_scan_id: + print("diff_result:", diff_result) + self.assertIsNotNone(diff_scan_id, f"Diff scan creation failed: {diff_result}") + assert isinstance(diff_scan_id, str), "diff_scan_id must be a string" + + # 4. Use diff_scan_id for further tests + get_result = self.sdk.diffscans.get(self.org_slug, diff_scan_id) + # The API now returns the diff scan object under 'diff_scan' + if "diff_scan" in get_result and isinstance(get_result["diff_scan"], dict): + returned_id = get_result["diff_scan"].get("id") + else: + returned_id = ( + get_result.get("id") + or get_result.get("diff_scan_id") + or (get_result.get("diff_scan", {}).get("id") if "diff_scan" in get_result else None) + ) + self.assertEqual(returned_id, diff_scan_id) + + gfm_result = self.sdk.diffscans.gfm(self.org_slug, diff_scan_id) + self.assertIsInstance(gfm_result, dict) + + # 5. Cleanup + self.sdk.diffscans.delete(self.org_slug, diff_scan_id) + self.sdk.fullscans.delete(self.org_slug, before_id) + self.sdk.fullscans.delete(self.org_slug, after_id) + + def test_diffscans_create_from_repo(self): + """Integration test for create_from_repo endpoint.""" + # This test assumes a valid repo exists and the environment is set up + with open(self.package_json_path, "rb") as f: + files = [("file", ("package.json", f))] + params = {"description": "Integration test diff scan from repo"} + result = self.sdk.diffscans.create_from_repo(self.org_slug, self.repo_slug, files, params) + self.assertTrue("id" in result or "diff_scan" in result) + # Cleanup if possible + diff_scan_id = ( + (result.get("diff_scan", {}) or {}).get("id") + or result.get("id") + or result.get("diff_scan_id") + ) + if diff_scan_id: + self.sdk.diffscans.delete(self.org_slug, diff_scan_id) + + def test_diffscans_get(self): + """Integration test for get endpoint.""" + # Create a diff scan first + params_before = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="before scan commit message", + commit_hash="deadbeefdeadbeefdeadbeefdeadbeefdeadbeef", + pull_request=123, + committers=["integration-tester"], + integration_type="api" + ) + empty_package_path = os.path.join(os.path.dirname(__file__), "package-empty.json") + with open(empty_package_path, "rb") as f_before: + files_before = [("file", ("package.json", f_before))] + before_result = self.sdk.fullscans.post(files=files_before, params=params_before) + before_id = before_result["id"] + params_after = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="after scan commit message", + commit_hash="beefdeadbeefdeadbeefdeadbeefdeadbeefdead", + pull_request=124, + committers=["integration-tester"], + integration_type="api" + ) + with open(self.package_json_path, "rb") as f_after: + files_after = [("file", ("package.json", f_after))] + after_result = self.sdk.fullscans.post(files=files_after, params=params_after) + after_id = after_result["id"] + diff_body = {"before": before_id, "after": after_id, "description": "Integration test diff scan"} + diff_result = self.sdk.diffscans.create_from_ids(self.org_slug, diff_body) + diff_scan_id = ( + diff_result.get("id") + or diff_result.get("diff_scan_id") + or (diff_result.get("diff_scan", {}).get("id") if "diff_scan" in diff_result else None) + ) + get_result = self.sdk.diffscans.get(self.org_slug, diff_scan_id) + self.assertTrue( + ("diff_scan" in get_result and get_result["diff_scan"]["id"] == diff_scan_id) or + (get_result.get("id") == diff_scan_id) + ) + # Cleanup + self.sdk.diffscans.delete(self.org_slug, diff_scan_id) + self.sdk.fullscans.delete(self.org_slug, before_id) + self.sdk.fullscans.delete(self.org_slug, after_id) + + def test_diffscans_gfm(self): + """Integration test for gfm endpoint.""" + # Create a diff scan first + params_before = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="before scan commit message", + commit_hash="deadbeefdeadbeefdeadbeefdeadbeefdeadbeef", + pull_request=123, + committers=["integration-tester"], + integration_type="api" + ) + empty_package_path = os.path.join(os.path.dirname(__file__), "package-empty.json") + with open(empty_package_path, "rb") as f_before: + files_before = [("file", ("package.json", f_before))] + before_result = self.sdk.fullscans.post(files=files_before, params=params_before) + before_id = before_result["id"] + params_after = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="after scan commit message", + commit_hash="beefdeadbeefdeadbeefdeadbeefdeadbeefdead", + pull_request=124, + committers=["integration-tester"], + integration_type="api" + ) + with open(self.package_json_path, "rb") as f_after: + files_after = [("file", ("package.json", f_after))] + after_result = self.sdk.fullscans.post(files=files_after, params=params_after) + after_id = after_result["id"] + diff_body = {"before": before_id, "after": after_id, "description": "Integration test diff scan"} + diff_result = self.sdk.diffscans.create_from_ids(self.org_slug, diff_body) + diff_scan_id = ( + diff_result.get("id") + or diff_result.get("diff_scan_id") + or (diff_result.get("diff_scan", {}).get("id") if "diff_scan" in diff_result else None) + ) + gfm_result = self.sdk.diffscans.gfm(self.org_slug, diff_scan_id) + self.assertIsInstance(gfm_result, dict) + # Cleanup + self.sdk.diffscans.delete(self.org_slug, diff_scan_id) + self.sdk.fullscans.delete(self.org_slug, before_id) + self.sdk.fullscans.delete(self.org_slug, after_id) + + def test_diffscans_delete(self): + """Integration test for delete endpoint.""" + # Create a diff scan first + params_before = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="before scan commit message", + commit_hash="deadbeefdeadbeefdeadbeefdeadbeefdeadbeef", + pull_request=123, + committers=["integration-tester"], + integration_type="api" + ) + empty_package_path = os.path.join(os.path.dirname(__file__), "package-empty.json") + with open(empty_package_path, "rb") as f_before: + files_before = [("file", ("package.json", f_before))] + before_result = self.sdk.fullscans.post(files=files_before, params=params_before) + before_id = before_result["id"] + params_after = FullScanParams( + org_slug=self.org_slug, + repo=self.repo_slug, + branch="main", + commit_message="after scan commit message", + commit_hash="beefdeadbeefdeadbeefdeadbeefdeadbeefdead", + pull_request=124, + committers=["integration-tester"], + integration_type="api" + ) + with open(self.package_json_path, "rb") as f_after: + files_after = [("file", ("package.json", f_after))] + after_result = self.sdk.fullscans.post(files=files_after, params=params_after) + after_id = after_result["id"] + diff_body = {"before": before_id, "after": after_id, "description": "Integration test diff scan"} + diff_result = self.sdk.diffscans.create_from_ids(self.org_slug, diff_body) + diff_scan_id = ( + diff_result.get("id") + or diff_result.get("diff_scan_id") + or (diff_result.get("diff_scan", {}).get("id") if "diff_scan" in diff_result else None) + ) + delete_result = self.sdk.diffscans.delete(self.org_slug, diff_scan_id) + self.assertTrue(delete_result) + # Cleanup + self.sdk.fullscans.delete(self.org_slug, before_id) + self.sdk.fullscans.delete(self.org_slug, after_id) + + def test_diffscans_list(self): + """Integration test for list endpoint.""" + result = self.sdk.diffscans.list(self.org_slug) + self.assertIn("results", result) + + def run(self, result=None): + try: + super().run(result) + if self._testMethodName == "test_full_diff_scan_flow": + print("Test create_from_ids: success") + elif self._testMethodName == "test_diffscans_create_from_repo": + print("Test create_from_repo: success") + elif self._testMethodName == "test_diffscans_get": + print("Test get: success") + elif self._testMethodName == "test_diffscans_gfm": + print("Test gfm: success") + elif self._testMethodName == "test_diffscans_delete": + print("Test delete: success") + elif self._testMethodName == "test_diffscans_list": + print("Test list: success") + except Exception as e: + if self._testMethodName == "test_full_diff_scan_flow": + print("Test create_from_ids: failure") + elif self._testMethodName == "test_diffscans_create_from_repo": + print("Test create_from_repo: failure") + elif self._testMethodName == "test_diffscans_get": + print("Test get: failure") + elif self._testMethodName == "test_diffscans_gfm": + print("Test gfm: failure") + elif self._testMethodName == "test_diffscans_delete": + print("Test delete: failure") + elif self._testMethodName == "test_diffscans_list": + print("Test list: failure") + raise diff --git a/tests/test_diffscans.py b/tests/test_diffscans.py new file mode 100644 index 0000000..720b060 --- /dev/null +++ b/tests/test_diffscans.py @@ -0,0 +1,60 @@ +import unittest +from unittest.mock import MagicMock +from socketdev.diffscans import DiffScans + +class TestDiffScans(unittest.TestCase): + def setUp(self): + self.api = MagicMock() + self.diffscans = DiffScans(self.api) + self.org_slug = "test-org" + self.diff_scan_id = "test-diff-scan-id" + self.repo_slug = "test-repo" + + def test_list(self): + self.api.do_request.return_value.status_code = 200 + self.api.do_request.return_value.json.return_value = {"results": []} + result = self.diffscans.list(self.org_slug) + self.assertIn("results", result) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans", method="GET") + + def test_get(self): + self.api.do_request.return_value.status_code = 200 + # Simulate new API response structure + self.api.do_request.return_value.json.return_value = {"diff_scan": {"id": self.diff_scan_id}} + result = self.diffscans.get(self.org_slug, self.diff_scan_id) + self.assertIn("diff_scan", result) + self.assertEqual(result["diff_scan"]["id"], self.diff_scan_id) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans/{self.diff_scan_id}", method="GET") + + def test_create_from_repo(self): + self.api.do_request.return_value.status_code = 201 + self.api.do_request.return_value.json.return_value = {"created": True} + body = {"foo": "bar"} + result = self.diffscans.create_from_repo(self.org_slug, self.repo_slug, body) + self.assertTrue(result["created"]) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans/from-repo/{self.repo_slug}", method="POST", json=body) + + def test_create_from_ids(self): + self.api.do_request.return_value.status_code = 201 + self.api.do_request.return_value.json.return_value = {"created": True} + body = {"before": "id1", "after": "id2"} + result = self.diffscans.create_from_ids(self.org_slug, body) + self.assertTrue(result["created"]) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans/from-ids", method="POST", json=body) + + def test_gfm(self): + self.api.do_request.return_value.status_code = 200 + self.api.do_request.return_value.json.return_value = {"gfm": "markdown"} + result = self.diffscans.gfm(self.org_slug, self.diff_scan_id) + self.assertIn("gfm", result) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans/{self.diff_scan_id}/gfm", method="GET") + + def test_delete(self): + self.api.do_request.return_value.status_code = 200 + self.api.do_request.return_value.json.return_value = {"deleted": True} + result = self.diffscans.delete(self.org_slug, self.diff_scan_id) + self.assertTrue(result["deleted"]) + self.api.do_request.assert_called_with(path=f"orgs/{self.org_slug}/diff-scans/{self.diff_scan_id}", method="DELETE") + +if __name__ == "__main__": + unittest.main()