Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZeroFox Alerts: Feature - Enable automations on artifacts #14

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release_notes/3.6.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Enabled automations for created artifacts
6 changes: 4 additions & 2 deletions zerofox.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"name": "Diego Ramirez"
},
{
"name": "Felipe Garrido"
"name": "Leonardo de Requesens"
}
],
"license": "Copyright (c) ZeroFox, 2024",
Expand All @@ -26,7 +26,9 @@
"min_phantom_version": "6.1.1",
"fips_compliant": false,
"app_wizard_version": "1.0.0",
"pip_dependencies": {},
"pip_dependencies": {
"wheel": []
},
"configuration": {
"zerofox_api_token": {
"description": "ZeroFox API Token",
Expand Down
146 changes: 38 additions & 108 deletions zerofox_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def build_artifact(self, container_id, alert):
artifact["tags"] = [alert["network"]]
artifact["start_time"] = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
artifact["source_data_identifier"] = alert["id"]
artifact["run_automation"] = False
artifact["run_automation"] = True

# get screenshot from metadata
try:
Expand All @@ -125,9 +125,7 @@ def build_artifact(self, container_id, alert):

artifact["cef"] = dict()
artifact["cef"]["alert_id"] = alert["id"]
artifact["cef"][
"zerofox_url"
] = f"https://cloud.zerofox.com/alerts/{alert['id']}"
artifact["cef"]["zerofox_url"] = f"https://cloud.zerofox.com/alerts/{alert['id']}"
artifact["cef"]["alert_type"] = alert["alert_type"]
artifact["cef"]["offending_content_url"] = alert["offending_content_url"]
artifact["cef"]["screenshot_url"] = screenshot_url
Expand Down Expand Up @@ -186,15 +184,11 @@ def prepare_alert_container(self, alert):

container["label"] = self._container_label
container["name"] = "ZeroFOX Alert: {}".format(alert["rule_name"])
container["description"] = "{}, {}".format(
alert["network"].title().replace("_", " "), alert["alert_type"]
)
container["description"] = "{}, {}".format(alert["network"].title().replace("_", " "), alert["alert_type"])
container["sensitivity"] = "white"
container["custom_fields"] = dict()
container["custom_fields"]["alert_type"] = str(alert["alert_type"])
container["custom_fields"][
"alert_url"
] = f"https://cloud.zerofox.com/alerts/{alert['id']}"
container["custom_fields"]["alert_url"] = f"https://cloud.zerofox.com/alerts/{alert['id']}"

container["severity"] = self._phantom_severity_transform(alert["severity"])
container["source_data_identifier"] = alert["id"]
Expand Down Expand Up @@ -247,9 +241,7 @@ def _process_empty_response(self, response, action_result):
return RetVal(phantom.APP_SUCCESS, {})

return RetVal(
action_result.set_status(
phantom.APP_ERROR, "Empty response and no information in the header"
),
action_result.set_status(phantom.APP_ERROR, "Empty response and no information in the header"),
None,
)

Expand Down Expand Up @@ -290,9 +282,7 @@ def _process_json_response(self, r, action_result):
return RetVal(phantom.APP_SUCCESS, resp_json)

# You should process the error returned in the json
message = "Error from server. Status Code: {0} Data from server: {1}".format(
r.status_code, r.text.replace("{", "{{").replace("}", "}}")
)
message = "Error from server. Status Code: {0} Data from server: {1}".format(r.status_code, r.text.replace("{", "{{").replace("}", "}}"))

return RetVal(action_result.set_status(phantom.APP_ERROR, message), None)

Expand Down Expand Up @@ -334,9 +324,7 @@ def _make_rest_call(self, endpoint, action_result, method="get", **kwargs):
request_func = getattr(requests, method)
except AttributeError:
return RetVal(
action_result.set_status(
phantom.APP_ERROR, f"Invalid method: {method}"
),
action_result.set_status(phantom.APP_ERROR, f"Invalid method: {method}"),
resp_json,
)

Expand All @@ -347,9 +335,7 @@ def _make_rest_call(self, endpoint, action_result, method="get", **kwargs):
url = self._base_url + endpoint

try:
r = request_func(
url, verify=config.get("verify_server_cert", False), **kwargs
)
r = request_func(url, verify=config.get("verify_server_cert", False), **kwargs)
except Exception as e:
return RetVal(
action_result.set_status(
Expand All @@ -372,9 +358,7 @@ def _test_connectivity(self, param):
endpoint = "/1.0/users/me/"
url = ZEROFOX_API_URL + endpoint

ret_val, _ = self._make_rest_call(
url, action_result, params=None, headers=headers
)
ret_val, _ = self._make_rest_call(url, action_result, params=None, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand All @@ -401,9 +385,7 @@ def _phantom_daterange(self, param):
self.error_print("start time or end time not specified")
return None, None

return datetime.fromtimestamp(
start_time_param / 1000.0
), datetime.fromtimestamp(end_time_param / 1000.0)
return datetime.fromtimestamp(start_time_param / 1000.0), datetime.fromtimestamp(end_time_param / 1000.0)

def _save_alert(self, alert):
self.debug_print("----------------------------------------")
Expand Down Expand Up @@ -440,17 +422,13 @@ def _on_poll(self, param):
start_time, end_time = self._phantom_daterange(param)

if start_time is None or end_time is None:
action_result.set_status(
phantom.APP_ERROR, status_message="start time or end time not specified"
)
action_result.set_status(phantom.APP_ERROR, status_message="start time or end time not specified")

else:
self.save_progress("Start to create alerts")
self.save_progress(f"incident interval_days: {self._history_days_interval}")

history_date = datetime.utcnow() - timedelta(
int(self._history_days_interval)
)
history_date = datetime.utcnow() - timedelta(int(self._history_days_interval))

# reformat date to use with last_modified_min_date
interval_startdate = history_date.strftime("%Y-%m-%d %H:%M:%S")
Expand All @@ -470,15 +448,11 @@ def _on_poll(self, param):

try:
last_checked_alert_time = self._state["last_polled"]
last_checked_alert_time = last_checked_alert_time.strftime(
"%Y-%m-%d %H:%M:%S"
)
last_checked_alert_time = last_checked_alert_time.strftime("%Y-%m-%d %H:%M:%S")
except:
last_checked_alert_time = interval_startdate

self.debug_print(
"last_checked_alert_time: {}".format(last_checked_alert_time)
)
self.debug_print("last_checked_alert_time: {}".format(last_checked_alert_time))

if self.is_poll_now():
self.debug_print("POLL NOW")
Expand Down Expand Up @@ -510,9 +484,7 @@ def _on_poll(self, param):
self.debug_print(f"params={alert_params}")

# make rest call
ret_val, response = self._make_rest_call(
endpoint, action_result, params=alert_params, headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, params=alert_params, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand Down Expand Up @@ -546,9 +518,7 @@ def _on_poll(self, param):

if status == phantom.APP_SUCCESS:
num_processed += 1
self.save_progress(
f"ZeroFOX Alert {alert_id} ingested ({num_processed} of {alert_total})"
)
self.save_progress(f"ZeroFOX Alert {alert_id} ingested ({num_processed} of {alert_total})")
else:
self.error_print(f"Did not ingest alert {alert_id}")
action_result.set_status(phantom.APP_ERROR, message)
Expand All @@ -563,9 +533,7 @@ def _on_poll(self, param):
alert_params = None

# make rest call
ret_val, response = self._make_rest_call(
next_url, action_result, params=None, headers=headers
)
ret_val, response = self._make_rest_call(next_url, action_result, params=None, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand All @@ -584,9 +552,7 @@ def _on_poll(self, param):

if status == phantom.APP_SUCCESS:
num_processed += 1
self.save_progress(
f"ZeroFOX Alert {alert_id} ingested ({num_processed} of {alert_total})"
)
self.save_progress(f"ZeroFOX Alert {alert_id} ingested ({num_processed} of {alert_total})")
else:
self.error_print(f"Did not ingest alert {alert_id}")
action_result.set_status(phantom.APP_ERROR, message)
Expand Down Expand Up @@ -626,9 +592,7 @@ def _get_alert_by_id(self, param):

action_result = ActionResult(dict(param))
self.add_action_result(action_result)
self.debug_print(
"Initial action_result dictionary: {}".format(action_result.get_dict())
)
self.debug_print("Initial action_result dictionary: {}".format(action_result.get_dict()))

alert_id = param.get("alert_id", 0.0)

Expand Down Expand Up @@ -658,9 +622,7 @@ def _get_alert_by_id(self, param):
self.debug_print(f"token={self._api_key}")

# make rest call
ret_val, response = self._make_rest_call(
endpoint, action_result, params=None, headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, params=None, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand Down Expand Up @@ -718,9 +680,7 @@ def _modify_alert_tag(self, param):
self.debug_print(f"params={params}")

# make rest call
ret_val, response = self._make_rest_call(
endpoint, action_result, method="post", json=params, headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, method="post", json=params, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand All @@ -732,9 +692,7 @@ def _modify_alert_tag(self, param):
phantom.APP_ERROR,
f"Error adding tag {alert_tag} on alert for: {alert_id}",
)
self.debug_print(
f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}"
)
self.debug_print(f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}")
summary = action_result.update_summary({})
summary["status"] = "failed"
return action_result.set_status(phantom.APP_ERROR)
Expand All @@ -750,13 +708,9 @@ def _modify_alert_tag(self, param):
# Return success, no need to set the message, only the status
self.save_progress("Alert Tag Passed")

self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("%s response: %s" % (self._banner, response))
self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("--------------------")
self.debug_print(f"{self._banner} response: {response}")
self.debug_print("--------------------")

return action_result.set_status(phantom.APP_SUCCESS)

Expand Down Expand Up @@ -789,9 +743,7 @@ def _threat_submit(self, param):
self.debug_print(f"params={params}")

# make rest call
ret_val, response = self._make_rest_call(
endpoint, action_result, method="post", json=params, headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, method="post", json=params, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand All @@ -803,9 +755,7 @@ def _threat_submit(self, param):
phantom.APP_ERROR,
f"Error adding threat {source} on entity for: {asset_id}",
)
self.debug_print(
f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}"
)
self.debug_print(f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}")
summary = action_result.update_summary({})
summary["status"] = "failed"
return action_result.set_status(phantom.APP_ERROR)
Expand All @@ -814,7 +764,6 @@ def _threat_submit(self, param):
action_result.add_data(response)

self.debug_print(f"threat_response={response}")
# self.debug_print('threat_alert={}'.format(response['alert_id']))

# Add a dictionary that is made up of the most important values from data into the summary
summary = action_result.update_summary({})
Expand All @@ -825,13 +774,10 @@ def _threat_submit(self, param):
# Return success, no need to set the message, only the status
self.save_progress("Threat Submit Passed")

self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("--------------------")

self.debug_print("%s response: %s" % (self._banner, response))
self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("--------------------")

return action_result.set_status(phantom.APP_SUCCESS)

Expand All @@ -846,18 +792,14 @@ def _modify_notes(self, param):
endpoint = f"/1.0/alerts/{alert_id}/"
headers = self._get_app_headers()

ret_val, response = self._make_rest_call(
endpoint, action_result, method="get", headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, method="get", headers=headers)

if phantom.is_fail(ret_val):
action_result.set_status(
phantom.APP_ERROR,
f"Error fetching alert with id: {alert_id}",
)
self.debug_print(
f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}"
)
self.debug_print(f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}")
summary = action_result.update_summary({})
summary["status"] = "failed"
return action_result.set_status(phantom.APP_ERROR)
Expand Down Expand Up @@ -897,9 +839,7 @@ def _modify_notes(self, param):
phantom.APP_ERROR,
f"Error changing notes on alert for {alert_id}, with notes {notes}",
)
self.debug_print(
f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}"
)
self.debug_print(f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}")
summary = action_result.update_summary({})
summary["status"] = "failed"
return action_result.set_status(phantom.APP_ERROR)
Expand Down Expand Up @@ -941,9 +881,7 @@ def _take_alert_action(self, param):
self.debug_print(f"params={params}")

# make rest call
ret_val, response = self._make_rest_call(
endpoint, action_result, method="post", json=params, headers=headers
)
ret_val, response = self._make_rest_call(endpoint, action_result, method="post", json=params, headers=headers)

if phantom.is_fail(ret_val):
# the call to the 3rd party device or service failed, action result should contain all the error details
Expand All @@ -955,9 +893,7 @@ def _take_alert_action(self, param):
phantom.APP_ERROR,
f"Error taking {alert_action} action on alert data for: {alert_id}",
)
self.debug_print(
f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}"
)
self.debug_print(f"Interim action_result dictionary after adding FAILURE status: {action_result.get_dict()}")
summary = action_result.update_summary({})
summary["status"] = "failed"
return action_result.set_status(phantom.APP_ERROR)
Expand All @@ -973,13 +909,9 @@ def _take_alert_action(self, param):
# Return success, no need to set the message, only the status
self.save_progress("Alert Action Passed")

self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("--------------------")
self.debug_print("%s response: %s" % (self._banner, response))
self.debug_print(
"-------------------------------------------------------------"
)
self.debug_print("--------------------")

return action_result.set_status(phantom.APP_SUCCESS)

Expand Down Expand Up @@ -1039,9 +971,7 @@ def initialize(self):
self._container_label = config["ingest"]["container_label"]
self._actor = config.get("username")
self._banner = "ZeroFOX Alerts Connector"
self.zf_client = ZeroFoxClient(
token=config.get("zerofox_api_token"), username=config.get("username")
)
self.zf_client = ZeroFoxClient(token=config.get("zerofox_api_token"), username=config.get("username"))
self.mapper = AlertMapper(self._container_label, self.get_app_id())

return phantom.APP_SUCCESS
Expand Down
Loading