From 773df9b8c21c99ab54077821102e1011c4abf83e Mon Sep 17 00:00:00 2001 From: cccs-kevin Date: Tue, 13 Dec 2022 13:05:16 +0000 Subject: [PATCH] Handling 404s gracefully --- intezer_static.py | 28 ++++++++++++++++++++++++++++ tests/test_intezer_static.py | 16 ++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/intezer_static.py b/intezer_static.py index 1fe5987..7d9ef73 100644 --- a/intezer_static.py +++ b/intezer_static.py @@ -171,6 +171,13 @@ def get_latest_analysis(self, f"Unable to get the latest analysis for SHA256 {file_hash} due to '{e}'." ) return None + # This issue can occur with certain private accounts on the public instance of analyze.intezer.com as + # per https://github.com/CybercentreCanada/assemblyline-service-intezer-dynamic/issues/31 + elif str(HTTPStatus.NOT_FOUND.value) in repr(e) or HTTPStatus.NOT_FOUND.name in repr(e): + self.log.debug( + f"Unable to get the latest analysis for SHA256 {file_hash} due to '{e}'." + ) + return None else: if not logged: self.log.error( @@ -210,6 +217,13 @@ def get_iocs(self, analysis_id: str) -> Dict[str, List[Dict[str, str]]]: f"Unable to retrieve IOCs for analysis ID {analysis_id} due to '{e}'." ) return {"files": [], "network": []} + # This issue can occur with certain private accounts on the public instance of analyze.intezer.com as + # per https://github.com/CybercentreCanada/assemblyline-service-intezer-dynamic/issues/31 + elif str(HTTPStatus.NOT_FOUND.value) in repr(e) or HTTPStatus.NOT_FOUND.name in repr(e): + self.log.debug( + f"Unable to retrieve IOCs for analysis ID {analysis_id} due to '{e}'." + ) + return {"files": [], "network": []} else: if not logged: self.log.error( @@ -249,6 +263,13 @@ def get_dynamic_ttps(self, analysis_id: str) -> List[Dict[str, str]]: f"Unable to retrieve TTPs for analysis ID {analysis_id} due to '{e}'." ) return [] + # This issue can occur with certain private accounts on the public instance of analyze.intezer.com as + # per https://github.com/CybercentreCanada/assemblyline-service-intezer-dynamic/issues/31 + elif str(HTTPStatus.NOT_FOUND.value) in repr(e) or HTTPStatus.NOT_FOUND.name in repr(e): + self.log.debug( + f"Unable to retrieve TTPs for analysis ID {analysis_id} due to '{e}'." + ) + return [] else: if not logged: self.log.error( @@ -375,6 +396,13 @@ def download_file_by_sha256(self, sha256: str, dir_path: str) -> bool: f"Unable to download file for SHA256 {sha256} due to '{e}'." ) return False + # This issue can occur with certain private accounts on the public instance of analyze.intezer.com as + # per https://github.com/CybercentreCanada/assemblyline-service-intezer-dynamic/issues/31 + elif str(HTTPStatus.NOT_FOUND.value) in repr(e) or HTTPStatus.NOT_FOUND.name in repr(e): + self.log.debug( + f"Unable to download file for SHA256 {sha256} due to '{e}'." + ) + return False else: if not logged: self.log.error( diff --git a/tests/test_intezer_static.py b/tests/test_intezer_static.py index 67201a5..993b205 100755 --- a/tests/test_intezer_static.py +++ b/tests/test_intezer_static.py @@ -821,6 +821,10 @@ def test_get_latest_analysis(dummy_al_intezer_api_instance): p1.terminate() assert p1.exitcode is None + # Case 5: "Good" HTTPError + m.get(f"{dummy_al_intezer_api_instance.full_url}/files/{file_hash}", exc=HTTPError(404)) + assert dummy_al_intezer_api_instance.get_latest_analysis(file_hash, private_only) is None + @staticmethod def test_get_iocs(dummy_al_intezer_api_instance): analysis_id = "blah" @@ -850,6 +854,10 @@ def test_get_iocs(dummy_al_intezer_api_instance): p1.terminate() assert p1.exitcode is None + # Case 5: "Good" HTTPError + m.get(f"{dummy_al_intezer_api_instance.full_url}/analyses/{analysis_id}/iocs", exc=HTTPError(404)) + assert dummy_al_intezer_api_instance.get_iocs(analysis_id) == {"files": [], "network": []} + @staticmethod def test_get_dynamic_ttps(dummy_al_intezer_api_instance): from intezer_sdk.errors import UnsupportedOnPremiseVersion @@ -885,6 +893,10 @@ def test_get_dynamic_ttps(dummy_al_intezer_api_instance): m.get(f"{dummy_al_intezer_api_instance.full_url}/analyses/{analysis_id}/dynamic-ttps", exc=UnsupportedOnPremiseVersion("blah")) assert dummy_al_intezer_api_instance.get_dynamic_ttps(analysis_id) == [] + # Case 6: "Good" HTTPError + m.get(f"{dummy_al_intezer_api_instance.full_url}/analyses/{analysis_id}/dynamic-ttps", exc=HTTPError(404)) + assert dummy_al_intezer_api_instance.get_dynamic_ttps(analysis_id) == [] + @staticmethod def test_get_sub_analyses_by_id(dummy_al_intezer_api_instance): analysis_id = "blah" @@ -983,3 +995,7 @@ def test_download_file_by_sha256(dummy_al_intezer_api_instance): # Case 5: FileExistsError m.get(f"{dummy_al_intezer_api_instance.full_url}/files/{analysis_id}/download", exc=FileExistsError("blah")) assert dummy_al_intezer_api_instance.download_file_by_sha256(analysis_id, dir_path) is False + + # Case 6: "Good" HTTPError + m.get(f"{dummy_al_intezer_api_instance.full_url}/files/{analysis_id}/download", exc=HTTPError(404)) + assert dummy_al_intezer_api_instance.download_file_by_sha256(analysis_id, dir_path) is False