diff --git a/canarytokens/channel.py b/canarytokens/channel.py index bf5baa4d7..655e76e60 100644 --- a/canarytokens/channel.py +++ b/canarytokens/channel.py @@ -5,7 +5,7 @@ from __future__ import annotations import datetime -from typing import Any, Coroutine, List, Optional, Union +from typing import Any, Coroutine, Optional, Union import twisted.internet.reactor from twisted.internet import threads @@ -24,8 +24,6 @@ GoogleChatHeader, GoogleChatSection, Memo, - SlackAttachment, - SlackField, DiscordDetails, DiscordEmbeds, DiscordAuthorField, @@ -34,7 +32,6 @@ MsTeamsPotentialAction, TokenAlertDetails, TokenAlertDetailsGoogleChat, - TokenAlertDetailsSlack, TokenAlertDetailsDiscord, TokenAlertDetailsMsTeams, ) @@ -42,27 +39,6 @@ log = Logger() -def format_as_slack_canaryalert(details: TokenAlertDetails) -> TokenAlertDetailsSlack: - """ - Transforms `TokenAlertDetails` to `TokenAlertDetailsSlack`. - """ - fields: List[SlackField] = [ - SlackField(title="Channel", value=details.channel), - SlackField(title="Memo", value=details.memo), - SlackField( - title="time", - value=details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)"), - ), - SlackField(title="Manage", value=details.manage_url), - ] - - attchments = [SlackAttachment(title_link=details.manage_url, fields=fields)] - return TokenAlertDetailsSlack( - # channel="#general", - attachments=attchments, - ) - - def format_as_googlechat_canaryalert( details: TokenAlertDetails, ) -> TokenAlertDetailsGoogleChat: diff --git a/canarytokens/models.py b/canarytokens/models.py index 993267852..05b12b8b4 100644 --- a/canarytokens/models.py +++ b/canarytokens/models.py @@ -49,7 +49,12 @@ CANARY_IMAGE_URL, MEMO_MAX_CHARACTERS, ) -from canarytokens.utils import prettify_snake_case, dict_to_csv, get_src_ip_continent +from canarytokens.utils import ( + json_safe_dict, + prettify_snake_case, + dict_to_csv, + get_src_ip_continent, +) CANARYTOKEN_RE = re.compile( ".*([" + "".join(CANARYTOKEN_ALPHABET) + "]{" + str(CANARYTOKEN_LENGTH) + "}).*", @@ -401,10 +406,6 @@ def __str__(self) -> str: ] -def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]: - return json.loads(m.json(exclude_none=True, exclude=set(exclude))) - - class TokenRequest(BaseModel): """ TokenRequest holds fields needed to create a Canarytoken. @@ -2138,25 +2139,6 @@ class Config: } -class SlackField(BaseModel): - title: str - value: str - short: bool = True - - -class SlackAttachment(BaseModel): - title: str = "Canarytoken Triggered" - title_link: HttpUrl - mrkdwn_in: List[str] = ["title"] - fallback: str = "" - fields: List[SlackField] - - def __init__(__pydantic_self__, **data: Any) -> None: - # HACK: We can do better here. - data["fallback"] = f"Canarytoken Triggered: {data['title_link']}" - super().__init__(**data) - - class GoogleChatDecoratedText(BaseModel): topLabel: str = "" text: str = "" @@ -2301,15 +2283,6 @@ def json_safe_dict(self) -> Dict[str, str]: return json_safe_dict(self) -class TokenAlertDetailsSlack(BaseModel): - """Details that are sent to slack webhooks.""" - - attachments: List[SlackAttachment] - - def json_safe_dict(self) -> Dict[str, str]: - return json_safe_dict(self) - - class MsTeamsDetailsSection(BaseModel): canarytoken: Canarytoken token_reminder: Memo diff --git a/canarytokens/queries.py b/canarytokens/queries.py index 5befac0b4..a365961e7 100644 --- a/canarytokens/queries.py +++ b/canarytokens/queries.py @@ -7,7 +7,7 @@ import re import secrets from ipaddress import IPv4Address -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import Dict, List, Literal, Optional, Tuple import advocate import requests @@ -859,29 +859,8 @@ def validate_webhook(url, token_type: models.TokenTypes): if len(url) > constants.MAX_WEBHOOK_URL_LENGTH: raise WebhookTooLongError() - payload: Union[ - models.TokenAlertDetails, - models.TokenAlertDetailsSlack, - models.TokenAlertDetailsGoogleChat, - models.TokenAlertDetailsDiscord, - models.TokenAlertDetailsMsTeams, - ] webhook_type = get_webhook_type(url) - if webhook_type == WebhookType.SLACK: - payload = models.TokenAlertDetailsSlack( - attachments=[ - models.SlackAttachment( - title_link=HttpUrl("https://test.com/check", scheme="https"), - fields=[ - models.SlackField( - title="test", - value="Working", - ) - ], - ) - ] - ) - elif webhook_type == WebhookType.GOOGLE_CHAT: + if webhook_type == WebhookType.GOOGLE_CHAT: # construct google chat alert card card = models.GoogleChatCard( header=models.GoogleChatHeader( diff --git a/canarytokens/utils.py b/canarytokens/utils.py index a7e77963e..f702929bf 100644 --- a/canarytokens/utils.py +++ b/canarytokens/utils.py @@ -1,8 +1,14 @@ +import json import subprocess from pathlib import Path -from typing import Any, Literal, Union +from typing import Any, Dict, Literal, Tuple, Union import pycountry_convert +from pydantic import BaseModel + + +def json_safe_dict(m: BaseModel, exclude: Tuple = ()) -> Dict[str, str]: + return json.loads(m.json(exclude_none=True, exclude=set(exclude))) def dict_to_csv(d: dict) -> str: diff --git a/canarytokens/webhook_formatting.py b/canarytokens/webhook_formatting.py index d2dfd5974..d5d4f7bb6 100644 --- a/canarytokens/webhook_formatting.py +++ b/canarytokens/webhook_formatting.py @@ -1,28 +1,50 @@ from __future__ import annotations -from typing import Union +import json +from typing import Union, Literal from enum import Enum import re from functools import partial from datetime import datetime -from pydantic import HttpUrl, parse_obj_as +from pydantic import BaseModel, HttpUrl, parse_obj_as from canarytokens import constants from canarytokens.channel import ( format_as_discord_canaryalert, format_as_googlechat_canaryalert, format_as_ms_teams_canaryalert, - format_as_slack_canaryalert, ) from canarytokens.models import ( + readable_token_type_names, Memo, TokenTypes, TokenAlertDetails, TokenExposedDetails, ) +from canarytokens.utils import json_safe_dict, prettify_snake_case +CANARY_LOGO_ROUND_PUBLIC_URL = parse_obj_as( + HttpUrl, + constants.CANARY_IMAGE_URL, +) WEBHOOK_TEST_URL = parse_obj_as(HttpUrl, "http://example.com/test/url/for/webhook") +TOKEN_EXPOSED_DESCRIPTION = "One of your {readable_type} Canarytokens has been found on the internet. A publicly exposed token will provide very low quality alerts. We recommend that you disable and replace this token on private infrastructure." +MAX_INLINE_LENGTH = 40 # Max length of content to share a line with other content + + +class HexColor(Enum): + WARNING = "#ed6c02" + ERROR = "#d32f2f" + CANARY_GREEN = "#3ad47f" + + @property + def decimal_value(self): + return int(self.value_without_hash, 16) + + @property + def value_without_hash(self): + return self.value[1:] class WebhookType(Enum): @@ -78,7 +100,7 @@ def _format_alert_details_for_webhook( webhook_type: WebhookType, details: TokenAlertDetails ): if webhook_type == WebhookType.SLACK: - return format_as_slack_canaryalert(details) + return _format_as_slack_canaryalert(details) elif webhook_type == WebhookType.GOOGLE_CHAT: return format_as_googlechat_canaryalert(details) elif webhook_type == WebhookType.DISCORD: @@ -97,9 +119,7 @@ def _format_exposed_details_for_webhook( webhook_type: WebhookType, details: TokenExposedDetails ): if webhook_type == WebhookType.SLACK: - raise NotImplementedError( - f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" - ) + return _format_as_slack_token_exposed(details) elif webhook_type == WebhookType.GOOGLE_CHAT: raise NotImplementedError( f"_format_exposed_details_for_webhook not implemented for webhook type: {webhook_type}" @@ -122,8 +142,15 @@ def _format_exposed_details_for_webhook( def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTypes): if webhook_type == WebhookType.SLACK: - raise NotImplementedError( - "generate_webhook_test_payload not implemented for SLACK" + return TokenAlertDetailsSlack( + blocks=[ + SlackHeader( + text=SlackTextObject( + text="Validating new Canarytokens webhook", type="plain_text" + ) + ), + SlackFooter(), + ] ) elif webhook_type == WebhookType.GOOGLE_CHAT: raise NotImplementedError( @@ -159,9 +186,220 @@ def generate_webhook_test_payload(webhook_type: WebhookType, token_type: TokenTy ) +def _format_as_slack_canaryalert(details: TokenAlertDetails) -> TokenAlertDetailsSlack: + """ + Transforms `TokenAlertDetails` to `TokenAlertDetailsSlack`. + """ + blocks: list[SlackBlock] = [] + blocks.append( + SlackHeader( + text=SlackTextObject( + type="plain_text", + text=":red_circle: Canarytoken Triggered :red_circle:", + ) + ) + ) + + blocks.append( + SlackSection( + fields=[ + SlackTextWithLabel("Channel", details.channel), + SlackTextWithLabel("Token Reminder", details.memo), + ] + ) + ) + + blocks.append( + SlackSection( + fields=[ + SlackTextWithLabel( + "Time", details.time.strftime("%Y-%m-%d %H:%M:%S (UTC)") + ) + ] + ) + ) + + if details.src_data: + blocks.extend(_data_to_slack_blocks(details.src_data)) + if details.additional_data: + blocks.extend(_data_to_slack_blocks(details.additional_data)) + + blocks.append( + SlackSectionText( + text=SlackTextObject(text=f":gear: *<{details.manage_url}|Manage token>*") + ) + ) + blocks.append(SlackFooter()) + + return TokenAlertDetailsSlack(blocks=blocks) + + +def _format_as_slack_token_exposed( + details: TokenExposedDetails, +) -> TokenAlertDetailsSlack: + blocks: list[SlackBlock] = [] + + blocks.append( + SlackHeader( + text=SlackTextObject( + type="plain_text", + text=":large_orange_circle: Canarytoken Exposed :large_orange_circle:", + ) + ) + ) + + blocks.append( + SlackRichText(text=_get_exposed_token_description(details.token_type)) + ) + blocks.append(SlackDivider()) + + blocks.append( + SlackSection( + fields=[ + SlackTextWithLabel("Token Reminder", details.memo), + SlackTextWithLabel("Key ID", details.key_id), + ] + ) + ) + + blocks.append( + SlackSection( + fields=[ + SlackTextWithLabel( + "Key exposed at", + details.exposed_time.strftime("%Y-%m-%d %H:%M:%S (UTC)"), + ) + ] + ) + ) + + fields = [SlackTextObject(text=f":gear: *<{details.manage_url}|Manage token>*")] + if details.public_location: + fields.insert( + 0, + SlackTextObject( + text=f":link: *<{details.public_location}|View exposed key>*" + ), + ) + + blocks.append(SlackSection(fields=fields)) + blocks.append(SlackFooter()) + + return TokenAlertDetailsSlack(blocks=blocks) + + +def _data_to_slack_blocks(data: dict[str, Union[str, dict]]) -> list[SlackBlock]: + blocks: list[SlackBlock] = [] + for label, value in data.items(): + if ( + not label or not value or label in ["time_hm", "time_ymd"] + ): # We already display the time + continue + + blocks.append(SlackRichText(text=prettify_snake_case(label)).set_bold()) + if isinstance(value, dict): + blocks.append(SlackRichText(text=json.dumps(value)).set_code_block()) + else: + blocks.append(SlackRichText(text=value)) + + return blocks + + +class SlackTextObject(BaseModel): + type: Union[Literal["plain_text"], Literal["mrkdwn"]] = "mrkdwn" + text: str + + +class SlackTextWithLabel(SlackTextObject): + def __init__(self, label: str, text: str): + super().__init__(text=f"*{label}*\n{text}") + + +class SlackBlock(BaseModel): + ... + + +class SlackHeader(SlackBlock): + type = "header" + text: SlackTextObject + + +class SlackRichText(SlackBlock): + text: str + bold = False + rich_text_type: Union[ + Literal["rich_text_section"], Literal["rich_text_preformatted"] + ] = "rich_text_section" + + def set_code_block(self): + self.rich_text_type = "rich_text_preformatted" + return self + + def set_bold(self): + self.bold = True + return self + + def dict(self, *_args, **__kwargs): + text = {"type": "text", "text": self.text} + if self.bold is True: + text["style"] = {"bold": True} + + return { + "type": "rich_text", + "elements": [{"type": self.rich_text_type, "elements": [text]}], + } + + +class SlackFooter(SlackBlock): + def dict(self, *_args, **_kwargs): + return { + "type": "context", + "elements": [ + { + "type": "image", + "image_url": CANARY_LOGO_ROUND_PUBLIC_URL, + "alt_text": "images", + }, + { + "type": "mrkdwn", + "text": "", + }, + ], + } + + +class SlackDivider(SlackBlock): + type = "divider" + + +class SlackSection(SlackBlock): + type = "section" + fields: list[SlackTextObject] + + +class SlackSectionText(SlackBlock): + type = "section" + text: SlackTextObject + + +class TokenAlertDetailsSlack(BaseModel): + """Details that are sent to slack webhooks.""" + + blocks: list[SlackBlock] + + def json_safe_dict(self) -> dict[str, str]: + return json_safe_dict(self) + + class TokenAlertDetailGeneric(TokenAlertDetails): ... class TokenExposedDetailGeneric(TokenExposedDetails): ... + + +def _get_exposed_token_description(token_type: TokenTypes) -> str: + return TOKEN_EXPOSED_DESCRIPTION.format( + readable_type=readable_token_type_names[token_type] + ) diff --git a/tests/units/test_webhook_formatting.py b/tests/units/test_webhook_formatting.py index 852079f1f..ebd607ff7 100644 --- a/tests/units/test_webhook_formatting.py +++ b/tests/units/test_webhook_formatting.py @@ -3,6 +3,7 @@ import pytest from canarytokens.models import Memo, TokenAlertDetails, TokenExposedDetails, TokenTypes from canarytokens.webhook_formatting import ( + TokenAlertDetailsSlack, WebhookType, format_details_for_webhook, get_webhook_type, @@ -42,6 +43,8 @@ def test_get_webhook_type(url: str, expected_type: WebhookType): [ ("alert", WebhookType.GENERIC, TokenAlertDetailGeneric), ("exposed", WebhookType.GENERIC, TokenExposedDetailGeneric), + ("alert", WebhookType.SLACK, TokenAlertDetailsSlack), + ("exposed", WebhookType.SLACK, TokenAlertDetailsSlack), ], ) def test_format_details_for_webhook_alert_type(