Skip to content

Commit

Permalink
Migrate list alerts to falconpy
Browse files Browse the repository at this point in the history
  • Loading branch information
topher-lo authored and daryllimyt committed Jun 22, 2024
1 parent a53e69a commit 707b99b
Showing 1 changed file with 24 additions and 61 deletions.
85 changes: 24 additions & 61 deletions tracecat/actions/integrations/edr/crowdstrike.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Crowdstrike integration.
Authentication method: OAuth 2.0
Authentication method: Direct Authentication (`client_id` and `client_secret`)
References:
- https://falconpy.io/Service-Collections/Alerts.html
- https://www.crowdstrike.com/blog/tech-center/get-access-falcon-apis/
- https://falconpy.io/Service-Collections
- https://www.falconpy.io/Usage/Authenticating-to-the-API.html
Supported APIs:
Expand All @@ -29,8 +29,7 @@
import datetime
from typing import Annotated, Any

import httpx
from authlib.integrations.httpx_client import AsyncOAuth2Client
from falconpy import Alerts, Detects

from tracecat.registry import Field, registry

Expand All @@ -40,13 +39,13 @@


@registry.register(
default_title="List Crowdstrike alerts",
description="Fetch all Crowdstrike alerts from Falcon SIEM.",
namespace="crowdstrike",
display_group="EDR",
namespace="integrations.crowdstrike.alerts",
secrets=["crowdstrike"],
)
async def list_crowdstrike_alerts(
base_url: Annotated[
str, Field(..., description="The base URL for the CrowdStrike API")
],
client_id: Annotated[
str, Field(..., description="The client ID for CrowdStrike API")
],
Expand All @@ -63,40 +62,22 @@ async def list_crowdstrike_alerts(
int, Field(default=9999, description="The maximum number of alerts to return")
] = 9999,
) -> list[dict[str, Any]]:
async with AsyncOAuth2Client(
client_id=client_id, client_secret=client_secret
) as client:
token_response = await client.fetch_token(
url=f"{base_url}/{TOKEN_ENDPOINT}", grant_type="client_credentials"
)
access_token = token_response["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "Tracecat",
}
async with httpx.AsyncClient() as http_client:
response = await http_client.get(
f"{base_url}/{ALERTS_ENDPOINT}",
headers=headers,
params={
"limit": limit,
"filter": f"last_updated_timestamp:>='{start_time.isoformat()}' last_updated_timestamp:<='{end_time.isoformat()}'",
},
)
response.raise_for_status()
return response.json()
falcon = Alerts(client_id=client_id, client_secret=client_secret)
response = falcon.query_alerts_v2(
limit=limit,
filter=f"last_updated_timestamp:>='{start_time.isoformat()}' last_updated_timestamp:<='{end_time.isoformat()}'",
)
return response


@registry.register(
default_title="List Crowdstrike detections",
description="Fetch all Crowdstrike detections from Falcon SIEM.",
namespace="crowdstrike",
display_group="EDR",
namespace="integrations.crowdstrike.detections",
secrets=["crowdstrike"],
)
async def list_crowdstrike_detections(
base_url: Annotated[
str, Field(..., description="The base URL for the CrowdStrike API")
],
client_id: Annotated[
str, Field(..., description="The client ID for CrowdStrike API")
],
Expand All @@ -114,27 +95,9 @@ async def list_crowdstrike_detections(
Field(default=9999, description="The maximum number of detections to return"),
] = 9999,
) -> list[dict[str, Any]]:
async with AsyncOAuth2Client(
client_id=client_id, client_secret=client_secret
) as client:
token_response = await client.fetch_token(
url=f"{base_url}/{TOKEN_ENDPOINT}", grant_type="client_credentials"
)
access_token = token_response["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "Tracecat",
}
async with httpx.AsyncClient() as http_client:
response = await http_client.get(
f"{base_url}/{DETECTS_ENDPOINT}",
headers=headers,
params={
"limit": limit,
"filter": f"date_updated:>='{start_time.isoformat()}' date_updated:<='{end_time.isoformat()}'",
},
)
response.raise_for_status()
return response.json()
falcon = Detects(client_id=client_id, client_secret=client_secret)
response = falcon.query_detects(
limit=limit,
filter=f"date_updated:>='{start_time.isoformat()}' date_updated:<='{end_time.isoformat()}'",
)
return response

0 comments on commit 707b99b

Please sign in to comment.