Skip to content

Commit

Permalink
feat: Implement update status integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
topher-lo authored and daryllimyt committed Jun 22, 2024
1 parent 707b99b commit d5da5b0
Showing 1 changed file with 77 additions and 18 deletions.
95 changes: 77 additions & 18 deletions tracecat/actions/integrations/edr/crowdstrike.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Crowdstrike integration.
Authentication method: Direct Authentication (`client_id` and `client_secret`)
Authentication method: Direct Authentication
Requires a `crowdstrike` secret with:
- `CROWDSTRIKE_CLIENT_ID`
- `CROWDSTRIKE_CLIENT_SECRET`
References:
Expand All @@ -27,7 +31,8 @@
"""

import datetime
from typing import Annotated, Any
import os
from typing import Annotated, Any, Literal

from falconpy import Alerts, Detects

Expand All @@ -38,6 +43,20 @@
DETECTS_ENDPOINT = "/detects/queries/detects/v1"


AlertStatus = Literal[
"ignored", "new", "in_progress", "true_positive", "false_positive"
]
DetectStatus = Literal["ignored", "new", "in_progress", "resolved", "false_positive"]


def get_crowdstrike_credentials():
client_id = os.getenv("CROWDSTRIKE_CLIENT_ID")
client_secret = os.getenv("CROWDSTRIKE_CLIENT_SECRET")
if not client_id or not client_secret:
raise ValueError("Missing CROWDSTRIKE_CLIENT_ID or CROWDSTRIKE_CLIENT")
return {"client_id": client_id, "client_secret": client_secret}


@registry.register(
default_title="List Crowdstrike alerts",
description="Fetch all Crowdstrike alerts from Falcon SIEM.",
Expand All @@ -46,12 +65,6 @@
secrets=["crowdstrike"],
)
async def list_crowdstrike_alerts(
client_id: Annotated[
str, Field(..., description="The client ID for CrowdStrike API")
],
client_secret: Annotated[
str, Field(..., description="The client secret for CrowdStrike API")
],
start_time: Annotated[
datetime.datetime, Field(..., description="The start time for the alerts")
],
Expand All @@ -62,7 +75,7 @@ async def list_crowdstrike_alerts(
int, Field(default=9999, description="The maximum number of alerts to return")
] = 9999,
) -> list[dict[str, Any]]:
falcon = Alerts(client_id=client_id, client_secret=client_secret)
falcon = Alerts(**get_crowdstrike_credentials())
response = falcon.query_alerts_v2(
limit=limit,
filter=f"last_updated_timestamp:>='{start_time.isoformat()}' last_updated_timestamp:<='{end_time.isoformat()}'",
Expand All @@ -71,19 +84,13 @@ async def list_crowdstrike_alerts(


@registry.register(
default_title="List Crowdstrike detections",
default_title="List Crowdstrike detects",
description="Fetch all Crowdstrike detections from Falcon SIEM.",
display_group="EDR",
namespace="integrations.crowdstrike.detections",
secrets=["crowdstrike"],
)
async def list_crowdstrike_detections(
client_id: Annotated[
str, Field(..., description="The client ID for CrowdStrike API")
],
client_secret: Annotated[
str, Field(..., description="The client secret for CrowdStrike API")
],
async def list_crowdstrike_detects(
start_time: Annotated[
datetime.datetime, Field(..., description="The start time for the detections")
],
Expand All @@ -95,9 +102,61 @@ async def list_crowdstrike_detections(
Field(default=9999, description="The maximum number of detections to return"),
] = 9999,
) -> list[dict[str, Any]]:
falcon = Detects(client_id=client_id, client_secret=client_secret)
falcon = Detects(**get_crowdstrike_credentials())
response = falcon.query_detects(
limit=limit,
filter=f"date_updated:>='{start_time.isoformat()}' date_updated:<='{end_time.isoformat()}'",
)
return response


@registry.register(
default_title="Update Crowdstrike alert status",
description="Update the status of Crowdstrike alerts.",
display_group="EDR",
namespace="integrations.crowdstrike.alerts",
secrets=["crowdstrike"],
)
async def update_crowdstrike_alert_status(
alert_ids: Annotated[
list[str], Field(..., description="List of alert IDs to update")
],
status: Annotated[
AlertStatus, Field(..., description="The new status for the alerts")
],
) -> dict[str, Any]:
falcon = Alerts(**get_crowdstrike_credentials())

# Perform the action to update the alert status
response = falcon.update_alerts_v3(
composite_ids=alert_ids,
update_status=status,
)

return response


@registry.register(
default_title="Update Crowdstrike detect status",
description="Update the status of Crowdstrike detects.",
display_group="EDR",
namespace="integrations.crowdstrike.detections",
secrets=["crowdstrike"],
)
async def update_crowdstrike_detect_status(
detection_ids: Annotated[
list[str], Field(..., description="List of detect IDs to update")
],
status: Annotated[
DetectStatus, Field(..., description="The new status for the detects")
],
) -> dict[str, Any]:
falcon = Detects(**get_crowdstrike_credentials())

# Perform the action to update the detection status
response = falcon.update_detects_by_ids(
ids=detection_ids,
status=status,
)

return response

0 comments on commit d5da5b0

Please sign in to comment.