Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Slack webhook exposed key handling #605

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions canarytokens/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import annotations

import datetime
from typing import Any, Coroutine, List, Optional, Union
from typing import Any, Coroutine, Optional, Union

import twisted.internet.reactor
from twisted.internet import threads
Expand All @@ -24,8 +24,6 @@
GoogleChatHeader,
GoogleChatSection,
Memo,
SlackAttachment,
SlackField,
DiscordDetails,
DiscordEmbeds,
DiscordAuthorField,
Expand All @@ -34,35 +32,13 @@
MsTeamsPotentialAction,
TokenAlertDetails,
TokenAlertDetailsGoogleChat,
TokenAlertDetailsSlack,
TokenAlertDetailsDiscord,
TokenAlertDetailsMsTeams,
)

log = Logger()


def format_as_slack_canaryalert(details: TokenAlertDetails) -> TokenAlertDetailsSlack:
"""
Transforms `TokenAlertDetails` to `TokenAlertDetailsSlack`.
"""
fields: List[SlackField] = [
SlackField(title="Channel", value=details.channel),
SlackField(title="Memo", value=details.memo),
SlackField(
title="time",
value=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"),
),
SlackField(title="Manage", value=details.manage_url),
]

attchments = [SlackAttachment(title_link=details.manage_url, fields=fields)]
return TokenAlertDetailsSlack(
# channel="#general",
attachments=attchments,
)


def format_as_googlechat_canaryalert(
details: TokenAlertDetails,
) -> TokenAlertDetailsGoogleChat:
Expand Down
39 changes: 6 additions & 33 deletions canarytokens/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@
CANARY_IMAGE_URL,
MEMO_MAX_CHARACTERS,
)
from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent
from canarytokens.utils import (
json_safe_dict,
prettify_snake_case,
dict_to_csv,
get_src_ip_continent,
)

CANARYTOKEN_RE = re.compile(
".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*",
Expand Down Expand Up @@ -401,10 +406,6 @@ def __str__(self) -> str:
]


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


class TokenRequest(BaseModel):
"""
TokenRequest holds fields needed to create a Canarytoken.
Expand Down Expand Up @@ -2138,25 +2139,6 @@ class Config:
}


class SlackField(BaseModel):
title: str
value: str
short: bool = True


class SlackAttachment(BaseModel):
title: str = "Canarytoken Triggered"
title_link: HttpUrl
mrkdwn_in: List[str] = ["title"]
fallback: str = ""
fields: List[SlackField]

def __init__(__pydantic_self__, **data: Any) -> None:
# HACK: We can do better here.
data["fallback"] = f"Canarytoken Triggered: {data['title_link']}"
super().__init__(**data)


class GoogleChatDecoratedText(BaseModel):
topLabel: str = ""
text: str = ""
Expand Down Expand Up @@ -2301,15 +2283,6 @@ def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class TokenAlertDetailsSlack(BaseModel):
"""Details that are sent to slack webhooks."""

attachments: List[SlackAttachment]

def json_safe_dict(self) -> Dict[str, str]:
return json_safe_dict(self)


class MsTeamsDetailsSection(BaseModel):
canarytoken: Canarytoken
token_reminder: Memo
Expand Down
25 changes: 2 additions & 23 deletions canarytokens/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import secrets
from ipaddress import IPv4Address
from typing import Dict, List, Literal, Optional, Tuple, Union
from typing import Dict, List, Literal, Optional, Tuple

import advocate
import requests
Expand Down Expand Up @@ -859,29 +859,8 @@ def validate_webhook(url, token_type: models.TokenTypes):
if len(url) > constants.MAX_WEBHOOK_URL_LENGTH:
raise WebhookTooLongError()

payload: Union[
models.TokenAlertDetails,
models.TokenAlertDetailsSlack,
models.TokenAlertDetailsGoogleChat,
models.TokenAlertDetailsDiscord,
models.TokenAlertDetailsMsTeams,
]
webhook_type = get_webhook_type(url)
if webhook_type == WebhookType.SLACK:
payload = models.TokenAlertDetailsSlack(
attachments=[
models.SlackAttachment(
title_link=HttpUrl("https://test.com/check", scheme="https"),
fields=[
models.SlackField(
title="test",
value="Working",
)
],
)
]
)
elif webhook_type == WebhookType.GOOGLE_CHAT:
if webhook_type == WebhookType.GOOGLE_CHAT:
# construct google chat alert card
card = models.GoogleChatCard(
header=models.GoogleChatHeader(
Expand Down
8 changes: 7 additions & 1 deletion canarytokens/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import json
import subprocess
from pathlib import Path
from typing import Any, Literal, Union
from typing import Any, Dict, Literal, Tuple, Union

import pycountry_convert
from pydantic import BaseModel


def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]:
return json.loads(m.json(exclude_none=True, exclude=set(exclude)))


def dict_to_csv(d: dict) -> str:
Expand Down
Loading
Loading