From ef56cf31dee301f07d1b980f1b4fa708789e0210 Mon Sep 17 00:00:00 2001 From: Diego Ramirez Date: Wed, 18 Dec 2024 14:21:30 -0300 Subject: [PATCH] enable automation on artifacts --- zerofox.json | 6 ++-- zerofox_connector.py | 65 ++++++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/zerofox.json b/zerofox.json index 09dbf70..b4e36ff 100644 --- a/zerofox.json +++ b/zerofox.json @@ -15,7 +15,7 @@ "name": "Diego Ramirez" }, { - "name": "Felipe Garrido" + "name": "Leonardo de Requesens" } ], "license": "Copyright (c) ZeroFox, 2024", @@ -26,7 +26,9 @@ "min_phantom_version": "6.1.1", "fips_compliant": false, "app_wizard_version": "1.0.0", - "pip_dependencies": {}, + "pip_dependencies": { + "wheel": [] + }, "configuration": { "zerofox_api_token": { "description": "ZeroFox API Token", diff --git a/zerofox_connector.py b/zerofox_connector.py index a85ead3..9051de1 100644 --- a/zerofox_connector.py +++ b/zerofox_connector.py @@ -102,13 +102,15 @@ def build_artifact(self, container_id, alert): artifact["label"] = "alert" artifact["name"] = alert["rule_name"] artifact["description"] = alert["offending_content_url"] - artifact["severity"] = self._phantom_severity_transform(alert["severity"]) + artifact["severity"] = self._phantom_severity_transform( + alert["severity"] + ) artifact["label"] = "event" artifact["type"] = alert["network"] artifact["tags"] = [alert["network"]] artifact["start_time"] = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ") artifact["source_data_identifier"] = alert["id"] - artifact["run_automation"] = False + artifact["run_automation"] = True # get screenshot from metadata try: @@ -196,12 +198,18 @@ def prepare_alert_container(self, alert): "alert_url" ] = f"https://cloud.zerofox.com/alerts/{alert['id']}" - container["severity"] = self._phantom_severity_transform(alert["severity"]) + container["severity"] = self._phantom_severity_transform( + alert["severity"]) container["source_data_identifier"] = alert["id"] container["asset_name"] = alert["entity"]["name"] container["tags"] = alert["tags"] - date_time_obj = datetime.strptime(alert["timestamp"], "%Y-%m-%dT%H:%M:%S+00:00") - container["start_time"] = date_time_obj.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + date_time_obj = datetime.strptime( + alert["timestamp"], + "%Y-%m-%dT%H:%M:%S+00:00" + ) + container["start_time"] = date_time_obj.strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + ) container["ingest_app_id"] = self.app_id return container @@ -426,7 +434,9 @@ def _save_alert(self, alert): def _on_poll(self, param): # Implement the handler here # use self.save_progress(...) to send progress messages back to the platform - self.save_progress(f"In action handler for: {self.get_action_identifier()}") + self.save_progress( + f"In action handler for: {self.get_action_identifier()}" + ) self.debug_print(f"Param: {param}") @@ -446,7 +456,9 @@ def _on_poll(self, param): else: self.save_progress("Start to create alerts") - self.save_progress(f"incident interval_days: {self._history_days_interval}") + self.save_progress( + f"incident interval_days: {self._history_days_interval}" + ) history_date = datetime.utcnow() - timedelta( int(self._history_days_interval) @@ -455,7 +467,9 @@ def _on_poll(self, param): # reformat date to use with last_modified_min_date interval_startdate = history_date.strftime("%Y-%m-%d %H:%M:%S") - self.save_progress(f"incident interval_startdate: {interval_startdate}") + self.save_progress( + f"incident interval_startdate: {interval_startdate}" + ) alert_types = [] alert_types.append({"type": "ALL", "subTypes": "ALL"}) @@ -580,7 +594,8 @@ def _on_poll(self, param): self.debug_print(f"alert_id: {alert_id}") # create container - status, message, container_id = self._save_alert(alert) + status, message, container_id = self._save_alert( + alert) if status == phantom.APP_SUCCESS: num_processed += 1 @@ -588,8 +603,10 @@ def _on_poll(self, param): f"ZeroFOX Alert {alert_id} ingested ({num_processed} of {alert_total})" ) else: - self.error_print(f"Did not ingest alert {alert_id}") - action_result.set_status(phantom.APP_ERROR, message) + self.error_print( + f"Did not ingest alert {alert_id}") + action_result.set_status( + phantom.APP_ERROR, message) self.add_action_result(action_result) return action_result.get_status() @@ -627,7 +644,8 @@ def _get_alert_by_id(self, param): action_result = ActionResult(dict(param)) self.add_action_result(action_result) self.debug_print( - "Initial action_result dictionary: {}".format(action_result.get_dict()) + "Initial action_result dictionary: {}".format( + action_result.get_dict()) ) alert_id = param.get("alert_id", 0.0) @@ -688,7 +706,9 @@ def _get_alert_by_id(self, param): def _modify_alert_tag(self, param): # Implement the handler here # use self.save_progress(...) to send progress messages back to the platform - self.save_progress(f"In action handler for: {self.get_action_identifier()}") + self.save_progress( + f"In action handler for: {self.get_action_identifier()}" + ) self.debug_print(f"Param: {param}") @@ -761,7 +781,9 @@ def _modify_alert_tag(self, param): return action_result.set_status(phantom.APP_SUCCESS) def _threat_submit(self, param): - self.save_progress(f"In action handler for: {self.get_action_identifier()}") + self.save_progress( + f"In action handler for: {self.get_action_identifier()}" + ) self.debug_print(f"Param: {param}") @@ -879,7 +901,9 @@ def _modify_notes(self, param): elif action == "append": new_notes = notes if not previous_notes else f"{previous_notes}\n{notes}" else: - self.debug_print(f"Modify notes failed because it found action: {action}") + self.debug_print( + f"Modify notes failed because it found action: {action}" + ) summary = action_result.update_summary({}) summary["status"] = "failed" return action_result.set_status(phantom.APP_ERROR) @@ -920,7 +944,9 @@ def _modify_notes(self, param): def _take_alert_action(self, param): # Implement the handler here # use self.save_progress(...) to send progress messages back to the platform - self.save_progress(f"In action handler for: {self.get_action_identifier()}") + self.save_progress( + f"In action handler for: {self.get_action_identifier()}" + ) self.debug_print(f"Param: {param}") @@ -1104,7 +1130,12 @@ def finalize(self): headers["Referer"] = login_url print("Logging into Platform to get the session id") - r2 = requests.post(login_url, verify=verify, data=data, headers=headers) + r2 = requests.post( + login_url, + verify=verify, + data=data, + headers=headers + ) session_id = r2.cookies["sessionid"] except Exception as e: print(f"Unable to get session id from the platform. Error: {e}")