diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 70ff8a7b160..3b515c7468a 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -28,7 +28,6 @@ from openpyxl.styles import Font import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper import dojo.risk_acceptance.helper as ra_helper from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.authorization_decorators import user_is_authorized @@ -653,7 +652,15 @@ def add_tests(request, eid): "Test added successfully.", extra_tags="alert-success") - notifications_helper.notify_test_created(new_test) + create_notification( + event="test_added", + title=f"Test created for {new_test.engagement.product}: {new_test.engagement.name}: {new_test}", + test=new_test, + engagement=new_test.engagement, + product=new_test.engagement.product, + url=reverse("view_test", args=(new_test.id,)), + url_api=reverse("test-detail", args=(new_test.id,)), + ) if "_Add Another Test" in request.POST: return HttpResponseRedirect( diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index c9a77fbb95b..cab58fd718b 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -5,6 +5,7 @@ from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.core.files.uploadedfile import TemporaryUploadedFile +from django.urls import reverse from django.utils.timezone import make_aware import dojo.finding.helper as finding_helper @@ -28,6 +29,7 @@ Test_Type, Vulnerability_Id, ) +from dojo.notifications.helper import create_notification from dojo.tools.factory import get_parser from dojo.utils import max_safe @@ -719,3 +721,38 @@ def mitigate_finding( finding.save(dedupe_option=False) else: finding.save(dedupe_option=False, push_to_jira=self.push_to_jira) + + def notify_scan_added( + self, + test, + updated_count, + new_findings=[], + findings_mitigated=[], + findings_reactivated=[], + findings_untouched=[], + ): + logger.debug("Scan added notifications") + + new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) + findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) + findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) + findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) + + title = ( + f"Created/Updated {updated_count} findings for {test.engagement.product}: {test.engagement.name}: {test}" + ) + + create_notification( + event="scan_added_empty" if updated_count == 0 else "scan_added", + title=title, + findings_new=new_findings, + findings_mitigated=findings_mitigated, + findings_reactivated=findings_reactivated, + finding_count=updated_count, + test=test, + engagement=test.engagement, + product=test.engagement.product, + findings_untouched=findings_untouched, + url=reverse("view_test", args=(test.id,)), + url_api=reverse("test-detail", args=(test.id,)), + ) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 95254ef59b8..3ac31143792 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -3,10 +3,10 @@ from django.core.files.uploadedfile import TemporaryUploadedFile from django.core.serializers import deserialize, serialize from django.db.models.query_utils import Q +from django.urls import reverse import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -15,6 +15,7 @@ Test, Test_Import, ) +from dojo.notifications.helper import create_notification logger = logging.getLogger(__name__) deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication") @@ -126,9 +127,17 @@ def process_scan( ) # Send out some notifications to the user logger.debug("IMPORT_SCAN: Generating notifications") - notifications_helper.notify_test_created(self.test) + create_notification( + event="test_added", + title=f"Test created for {self.test.engagement.product}: {self.test.engagement.name}: {self.test}", + test=self.test, + engagement=self.test.engagement, + product=self.test.engagement.product, + url=reverse("view_test", args=(self.test.id,)), + url_api=reverse("test-detail", args=(self.test.id,)), + ) updated_count = len(new_findings) + len(closed_findings) - notifications_helper.notify_scan_added( + self.notify_scan_added( self.test, updated_count, new_findings=new_findings, diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 9debf4aabaa..0c4159ed669 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -6,7 +6,6 @@ import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -128,7 +127,7 @@ def process_scan( updated_count = ( len(closed_findings) + len(reactivated_findings) + len(new_findings) ) - notifications_helper.notify_scan_added( + self.notify_scan_added( self.test, updated_count, new_findings=new_findings, diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index 55281901192..3e0a0295de2 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -1,5 +1,7 @@ +import importlib import json import logging +from contextlib import suppress from datetime import timedelta import requests @@ -7,7 +9,7 @@ from django.conf import settings from django.core.exceptions import FieldDoesNotExist from django.core.mail import EmailMessage -from django.db.models import Count, Prefetch, Q +from django.db.models import Count, Prefetch, Q, QuerySet from django.template import TemplateDoesNotExist from django.template.loader import render_to_string from django.urls import reverse @@ -20,213 +22,282 @@ from dojo.models import ( Alerts, Dojo_User, + Engagement, + Finding, Notification_Webhooks, Notifications, + Product, + Product_Type, System_Settings, + Test, UserContactInfo, get_current_datetime, ) -from dojo.user.queries import get_authorized_users_for_product_and_product_type, get_authorized_users_for_product_type +from dojo.user.queries import ( + get_authorized_users_for_product_and_product_type, + get_authorized_users_for_product_type, +) logger = logging.getLogger(__name__) -def create_notification(event=None, **kwargs): - system_settings = System_Settings.objects.get() - kwargs["system_settings"] = system_settings - # System notifications - try: - system_notifications = Notifications.objects.get(user=None, template=False) - except Exception: - system_notifications = Notifications() - - if "recipients" in kwargs: - # mimic existing code so that when recipients is specified, no other system or personal notifications are sent. - logger.debug("creating notifications for recipients: %s", kwargs["recipients"]) - for recipient_notifications in Notifications.objects.filter(user__username__in=kwargs["recipients"], user__is_active=True, product=None): - if event in settings.NOTIFICATIONS_SYSTEM_LEVEL_TRUMP: - # merge the system level notifications with the personal level - # this allows for system to trump the personal - merged_notifications = Notifications.merge_notifications_list([system_notifications, recipient_notifications]) - merged_notifications.user = recipient_notifications.user - logger.debug("Sent notification to %s", merged_notifications.user) - process_notifications(event, merged_notifications, **kwargs) - else: - # Do not trump user preferences and send notifications as usual - logger.debug("Sent notification to %s", recipient_notifications.user) - process_notifications(event, recipient_notifications, **kwargs) - - else: - logger.debug("creating system notifications for event: %s", event) - # send system notifications to all admin users - - # parse kwargs before converting them to dicts - product_type = None - if "product_type" in kwargs: - product_type = kwargs.get("product_type") - logger.debug("Defined product type %s", product_type) +def create_notification( + event: str | None = None, + title: str | None = None, + finding: Finding | None = None, + test: Test | None = None, + engagement: Engagement | None = None, + product: Product | None = None, + requested_by: Dojo_User | None = None, + reviewers: list[Dojo_User] | list[str] | None = None, + recipients: list[Dojo_User] | list[str] | None = None, + no_users: bool = False, # noqa: FBT001 + url: str | None = None, + url_api: str | None = None, + **kwargs: dict, +) -> None: + """Create an instance of a NotificationManager and dispatch the notification.""" + default_manager = NotificationManager + notification_manager_class = default_manager + if isinstance( + ( + notification_manager := getattr( + settings, + "NOTIFICATION_MANAGER", + default_manager, + ) + ), + str, + ): + with suppress(ModuleNotFoundError): + module_name, _separator, class_name = notification_manager.rpartition(".") + module = importlib.import_module(module_name) + notification_manager_class = getattr(module, class_name) + notification_manager_class().create_notification( + event=event, + title=title, + finding=finding, + test=test, + engagement=engagement, + product=product, + requested_by=requested_by, + reviewers=reviewers, + recipients=recipients, + no_users=no_users, + url=url, + url_api=url_api, + **kwargs, + ) - product = None - if "product" in kwargs: - product = kwargs.get("product") - logger.debug("Defined product %s", product) - elif "engagement" in kwargs: - product = kwargs["engagement"].product - logger.debug("Defined product of engagement %s", product) +class NotificationManagerHelpers: - elif "test" in kwargs: - product = kwargs["test"].engagement.product - logger.debug("Defined product of test %s", product) + """Common functions for use in the Mangers.""" - elif "finding" in kwargs: - product = kwargs["finding"].test.engagement.product - logger.debug("Defined product of finding %s", product) + def __init__( + self, + *_args: list, + system_notifications: Notifications | None = None, + system_settings: System_Settings | None = None, + **_kwargs: dict, + ) -> None: + self.system_notifications = system_notifications or self._get_notifications_object() + self.system_settings = system_settings or self._get_system_settings() - elif "obj" in kwargs: - from dojo.utils import get_product - product = get_product(kwargs["obj"]) - logger.debug("Defined product of obj %s", product) - - # System notifications are sent one with user=None, which will trigger email to configured system email, to global slack channel, etc. - process_notifications(event, system_notifications, **kwargs) - - # All admins will also receive system notifications, but as part of the person global notifications section below - # This time user is set, so will trigger email to personal email, to personal slack channel (mention), etc. - # only retrieve users which have at least one notification type enabled for this event type. - logger.debug("creating personal notifications for event: %s", event) - - # There are notification like deleting a product type that shall not be sent to users. - # These notifications will have the parameter no_users=True - if not ("no_users" in kwargs and kwargs["no_users"] is True): - # get users with either global notifications, or a product specific noditiciation - # and all admin/superuser, they will always be notified - users = Dojo_User.objects.filter(is_active=True).prefetch_related(Prefetch( - "notifications_set", - queryset=Notifications.objects.filter(Q(product_id=product) | Q(product__isnull=True)), - to_attr="applicable_notifications", - )).annotate(applicable_notifications_count=Count("notifications__id", filter=Q(notifications__product_id=product) | Q(notifications__product__isnull=True)))\ - .filter(Q(applicable_notifications_count__gt=0) | Q(is_superuser=True)) - - # only send to authorized users or admin/superusers - logger.debug("Filtering users for the product %s", product) - - if product: - users = get_authorized_users_for_product_and_product_type(users, product, Permissions.Product_View) - - elif product_type: - users = get_authorized_users_for_product_type(users, product_type, Permissions.Product_Type_View) + def _get_notifications_object(self) -> Notifications: + """Set the system Notifications object on the class.""" + try: + return Notifications.objects.get(user=None, template=False) + except Exception: + return Notifications() + + def _get_system_settings(self) -> System_Settings: + """Set the system settings object in the class.""" + return System_Settings.objects.get() + + def _create_description(self, event: str, kwargs: dict) -> str: + if kwargs.get("description") is None: + if event == "product_added": + kwargs["description"] = _("Product %s has been created successfully.") % kwargs["title"] + elif event == "product_type_added": + kwargs["description"] = _("Product Type %s has been created successfully.") % kwargs["title"] else: - # nor product_type nor product defined, we should not make noise and send only notifications to admins - logger.debug("Product is not specified, making it silent") - users = users.filter(is_superuser=True) - - for user in users: - logger.debug("Authorized user for the product %s", user) - # send notifications to user after merging possible multiple notifications records (i.e. personal global + personal product) - # kwargs.update({'user': user}) - applicable_notifications = user.applicable_notifications - if user.is_superuser: - logger.debug("User %s is superuser", user) - # admin users get all system notifications - applicable_notifications.append(system_notifications) - - notifications_set = Notifications.merge_notifications_list(applicable_notifications) - notifications_set.user = user - process_notifications(event, notifications_set, **kwargs) - - -def create_description(event, *args, **kwargs): - if "description" not in kwargs: - if event == "product_added": - kwargs["description"] = _("Product %s has been created successfully.") % kwargs["title"] - elif event == "product_type_added": - kwargs["description"] = _("Product Type %s has been created successfully.") % kwargs["title"] - else: - kwargs["description"] = _("Event %s has occurred.") % str(event) - - return kwargs["description"] + kwargs["description"] = _("Event %s has occurred.") % str(event) + return kwargs["description"] -def create_notification_message(event, user, notification_type, *args, **kwargs): - template = f"notifications/{notification_type}/{event.replace('/', '')}.tpl" - kwargs.update({"user": user}) + def _create_notification_message( + self, + event: str, + user: Dojo_User, + notification_type: str, + kwargs: dict, + ) -> str: + template = f"notifications/{notification_type}/{event.replace('/', '')}.tpl" + kwargs.update({"user": user}) + notification_message = None - notification_message = None + # TODO: This may be deleted + # if (title := kwargs.get("title")) is not None: + # kwargs.update({"title": title}) - if (title := kwargs.get("title")) is not None: - kwargs.update({"title": title}) - - if kwargs.get("description") is None: - kwargs.update({"description": create_description(event, *args, **kwargs)}) - - try: - notification_message = render_to_string(template, kwargs) - logger.debug("Rendering from the template %s", template) - except TemplateDoesNotExist as e: - # In some cases, template includes another templates, if the interior one is missing, we will see it in "specifically" section - logger.debug(f"template not found or not implemented yet: {template} (specifically: {e.args})") - except Exception as e: - logger.error("error during rendering of template %s exception is %s", template, e) - finally: - if not notification_message: - kwargs["description"] = create_description(event, *args, **kwargs) - notification_message = render_to_string(f"notifications/{notification_type}/other.tpl", kwargs) - - return notification_message or "" - - -def process_notifications(event, notifications=None, **kwargs): - from dojo.utils import get_system_setting - - if not notifications: - logger.warning("no notifications!") - return - - logger.debug("sending notification " + ("asynchronously" if we_want_async() else "synchronously")) - logger.debug("process notifications for %s", notifications.user) - logger.debug("notifications: %s", vars(notifications)) - - slack_enabled = get_system_setting("enable_slack_notifications") - msteams_enabled = get_system_setting("enable_msteams_notifications") - mail_enabled = get_system_setting("enable_mail_notifications") - webhooks_enabled = get_system_setting("enable_webhooks_notifications") - - if slack_enabled and "slack" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Slack Notification") - send_slack_notification(event, notifications.user, **kwargs) - - if msteams_enabled and "msteams" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending MSTeams Notification") - send_msteams_notification(event, notifications.user, **kwargs) - - if mail_enabled and "mail" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Mail Notification") - send_mail_notification(event, notifications.user, **kwargs) - - if webhooks_enabled and "webhooks" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Webhooks Notification") - send_webhooks_notification(event, notifications.user, **kwargs) - - if "alert" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug(f"Sending Alert to {notifications.user}") - send_alert_notification(event, notifications.user, **kwargs) + if kwargs.get("description") is None: + kwargs.update({"description": self._create_description(event, kwargs)}) + try: + notification_message = render_to_string(template, kwargs) + logger.debug("Rendering from the template %s", template) + except TemplateDoesNotExist as e: + # In some cases, template includes another templates, if the interior one is missing, we will see it in "specifically" section + logger.debug( + f"template not found or not implemented yet: {template} (specifically: {e.args})", + ) + except Exception as e: + logger.error( + "error during rendering of template %s exception is %s", + template, + e, + ) + finally: + if not notification_message: + kwargs["description"] = self._create_description(event, kwargs) + notification_message = render_to_string( + f"notifications/{notification_type}/other.tpl", + kwargs, + ) -@dojo_async_task -@app.task -def send_slack_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting + return notification_message or "" + + def _log_alert( + self, + exception: Exception, + notification_type: str | None = None, + **kwargs: dict, + ) -> None: + # no try catch here, if this fails we need to show an error + for user in Dojo_User.objects.filter(is_superuser=True): + alert = Alerts( + user_id=user, + url=kwargs.get("url", reverse("alerts")), + title=kwargs.get("title", "Notification issue")[:250], + description=kwargs.get("description", str(exception))[:2000], + icon="exclamation-triangle", + source=notification_type[:100] if notification_type else kwargs.get("source", "unknown")[:100], + ) + # relative urls will fail validation + alert.clean_fields(exclude=["url"]) + alert.save() + + +class SlackNotificationManger(NotificationManagerHelpers): + + """Manger for slack notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_slack_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + try: + # If the user has slack information on profile and chooses to receive slack notifications + # Will receive a DM + if user is not None: + logger.debug("personal notification to slack for user %s", user) + if hasattr(user, "usercontactinfo") and user.usercontactinfo.slack_username is not None: + slack_user_id = user.usercontactinfo.slack_user_id + if not slack_user_id: + # Lookup the slack userid the first time, then save it. + slack_user_id = self._get_slack_user_id( + user.usercontactinfo.slack_username, + ) + if slack_user_id: + slack_user_save = UserContactInfo.objects.get( + user_id=user.id, + ) + slack_user_save.slack_user_id = slack_user_id + slack_user_save.save() + # only send notification if we managed to find the slack_user_id + if slack_user_id: + channel = f"@{slack_user_id}" + self._post_slack_message(event, user, channel, **kwargs) + else: + logger.info( + "The user %s does not have a email address informed for Slack in profile.", + user, + ) + else: + # System scope slack notifications, and not personal would still see this go through + if self.system_settings.slack_channel is not None: + channel = self.system_settings.slack_channel + logger.info( + f"Sending system notification to system channel {channel}.", + ) + self._post_slack_message(event, user, channel, **kwargs) + else: + logger.debug( + "slack_channel not configured: skipping system notification", + ) + + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Slack Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs.get("url"), + ) + + def _get_slack_user_id(self, user_email: str) -> str: + user_id = None + res = requests.request( + method="POST", + url="https://slack.com/api/users.lookupByEmail", + data={"token": self.system_settings.slack_token, "email": user_email}, + timeout=settings.REQUESTS_TIMEOUT, + ) - def _post_slack_message(channel): + user = json.loads(res.text) + slack_user_is_found = False + if user: + if "error" in user: + logger.error("Slack is complaining. See error message below.") + logger.error(user) + raise RuntimeError("Error getting user list from Slack: " + res.text) + if "email" in user["user"]["profile"]: + if user_email == user["user"]["profile"]["email"]: + if "id" in user["user"]: + user_id = user["user"]["id"] + logger.debug(f"Slack user ID is {user_id}") + slack_user_is_found = True + else: + logger.warning( + f"A user with email {user_email} could not be found in this Slack workspace.", + ) + + if not slack_user_is_found: + logger.warning("The Slack user was not found.") + + return user_id + + def _post_slack_message( + self, + event: str, + user: Dojo_User, + channel: str, + **kwargs: dict, + ) -> None: res = requests.request( method="POST", url="https://slack.com/api/chat.postMessage", data={ - "token": get_system_setting("slack_token"), + "token": self.system_settings.slack_token, "channel": channel, - "username": get_system_setting("slack_username"), - "text": create_notification_message(event, user, "slack", *args, **kwargs), + "username": self.system_settings.slack_username, + "text": self._create_notification_message(event, user, "slack", kwargs), }, timeout=settings.REQUESTS_TIMEOUT, ) @@ -236,155 +307,560 @@ def _post_slack_message(channel): logger.error(res.text) raise RuntimeError("Error posting message to Slack: " + res.text) - try: - # If the user has slack information on profile and chooses to receive slack notifications - # Will receive a DM - if user is not None: - logger.debug("personal notification to slack for user %s", user) - if hasattr(user, "usercontactinfo") and user.usercontactinfo.slack_username is not None: - slack_user_id = user.usercontactinfo.slack_user_id - if not slack_user_id: - # Lookup the slack userid the first time, then save it. - slack_user_id = get_slack_user_id( - user.usercontactinfo.slack_username) - if slack_user_id: - slack_user_save = UserContactInfo.objects.get(user_id=user.id) - slack_user_save.slack_user_id = slack_user_id - slack_user_save.save() - - # only send notification if we managed to find the slack_user_id - if slack_user_id: - channel = f"@{slack_user_id}" - _post_slack_message(channel) - else: - logger.info("The user %s does not have a email address informed for Slack in profile.", user) +class MSTeamsNotificationManger(NotificationManagerHelpers): + + """Manger for Microsoft Teams notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_msteams_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + try: + # Microsoft Teams doesn't offer direct message functionality, so no MS Teams PM functionality here... + if user is None: + if self.system_settings.msteams_url is not None: + logger.debug("sending MSTeams message") + res = requests.request( + method="POST", + url=self.system_settings.msteams_url, + data=self._create_notification_message( + event, + None, + "msteams", + kwargs, + ), + timeout=settings.REQUESTS_TIMEOUT, + ) + if res.status_code != 200: + logger.error("Error when sending message to Microsoft Teams") + logger.error(res.status_code) + logger.error(res.text) + raise RuntimeError( + "Error posting message to Microsoft Teams: " + res.text, + ) + else: + logger.info( + "Webhook URL for Microsoft Teams not configured: skipping system notification", + ) + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Microsoft Teams Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class EmailNotificationManger(NotificationManagerHelpers): + + """Manger for email notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_mail_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + # Attempt to get the "to" address + if (recipient := kwargs.get("recipient")) is not None: + address = recipient + elif user: + address = user.email else: - # System scope slack notifications, and not personal would still see this go through - if get_system_setting("slack_channel") is not None: - channel = get_system_setting("slack_channel") - logger.info(f"Sending system notification to system channel {channel}.") - _post_slack_message(channel) + address = self.system_settings.mail_notifications_to + + logger.debug("notification email for user %s to %s", user, address) + + try: + subject = f"{self.system_settings.team_name} notification" + if (title := kwargs.get("title")) is not None: + subject += f": {title}" + + email = EmailMessage( + subject, + self._create_notification_message(event, user, "mail", kwargs), + self.system_settings.email_from, + [address], + headers={"From": f"{self.system_settings.email_from}"}, + ) + email.content_subtype = "html" + logger.debug("sending email alert") + email.send(fail_silently=False) + + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Email Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class WebhookNotificationManger(NotificationManagerHelpers): + + """Manger for webhook notifications and their helpers.""" + + ERROR_PERMANENT = "permanent" + ERROR_TEMPORARY = "temporary" + + @dojo_async_task + @app.task + def send_webhooks_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + for endpoint in self._get_webhook_endpoints(user=user): + error = None + if endpoint.status not in [ + Notification_Webhooks.Status.STATUS_ACTIVE, + Notification_Webhooks.Status.STATUS_ACTIVE_TMP, + ]: + logger.info( + f"URL for Webhook '{endpoint.name}' is not active: {endpoint.get_status_display()} ({endpoint.status})", + ) + continue + + try: + logger.debug(f"Sending webhook message to endpoint '{endpoint.name}'") + res = self._webhooks_notification_request(endpoint, event, **kwargs) + if 200 <= res.status_code < 300: + logger.debug( + f"Message sent to endpoint '{endpoint.name}' successfully.", + ) + continue + # HTTP request passed successfully but we still need to check status code + if 500 <= res.status_code < 600 or res.status_code == 429: + error = self.ERROR_TEMPORARY + else: + error = self.ERROR_PERMANENT + + endpoint.note = f"Response status code: {res.status_code}" + logger.error( + f"Error when sending message to Webhooks '{endpoint.name}' (status: {res.status_code}): {res.text}", + ) + except requests.exceptions.Timeout as e: + error = self.ERROR_TEMPORARY + endpoint.note = f"Requests exception: {e}" + logger.error( + f"Timeout when sending message to Webhook '{endpoint.name}'", + ) + except Exception as exception: + error = self.ERROR_PERMANENT + endpoint.note = f"Exception: {exception}"[:1000] + logger.exception(exception) + self._log_alert(exception, "Webhooks Notification") + + now = get_current_datetime() + if error == self.ERROR_TEMPORARY: + # If endpoint is unstable for more then one day, it needs to be deactivated + if endpoint.first_error is not None and (now - endpoint.first_error).total_seconds() > 60 * 60 * 24: + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT + else: + # We need to monitor when outage started + if endpoint.status == Notification_Webhooks.Status.STATUS_ACTIVE: + endpoint.first_error = now + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_TMP + # In case of failure within one day, endpoint can be deactivated temporally only for one minute + self._webhook_reactivation.apply_async( + args=[self], + kwargs={"endpoint_id": endpoint.pk}, + countdown=60, + ) + # There is no reason to keep endpoint active if it is returning 4xx errors else: - logger.debug("slack_channel not configured: skipping system notification") - - except Exception as e: - logger.exception(e) - log_alert(e, "Slack Notification", title=kwargs["title"], description=str(e), url=kwargs.get("url")) - - -@dojo_async_task -@app.task -def send_msteams_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting - - try: - # Microsoft Teams doesn't offer direct message functionality, so no MS Teams PM functionality here... - if user is None: - if get_system_setting("msteams_url") is not None: - logger.debug("sending MSTeams message") - res = requests.request( - method="POST", - url=get_system_setting("msteams_url"), - data=create_notification_message(event, None, "msteams", *args, **kwargs), - timeout=settings.REQUESTS_TIMEOUT, + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT + endpoint.first_error = now + + endpoint.last_error = now + endpoint.save() + + def _get_webhook_endpoints( + self, + user: Dojo_User | None = None, + ) -> QuerySet[Notification_Webhooks]: + endpoints = Notification_Webhooks.objects.filter(owner=user) + if not endpoints.exists(): + if user: + logger.info( + f"URLs for Webhooks not configured for user '{user}': skipping user notification", ) - if res.status_code != 200: - logger.error("Error when sending message to Microsoft Teams") - logger.error(res.status_code) - logger.error(res.text) - raise RuntimeError("Error posting message to Microsoft Teams: " + res.text) else: - logger.info("Webhook URL for Microsoft Teams not configured: skipping system notification") - except Exception as e: - logger.exception(e) - log_alert(e, "Microsoft Teams Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -@dojo_async_task -@app.task -def send_mail_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting - email_from_address = get_system_setting("email_from") - # Attempt to get the "to" address - if "recipient" in kwargs: - address = kwargs.get("recipient") - elif user: - address = user.email - else: - address = get_system_setting("mail_notifications_to") - - logger.debug("notification email for user %s to %s", user, address) - - try: - subject = f"{get_system_setting('team_name')} notification" - if "title" in kwargs: - subject += f": {kwargs['title']}" - - email = EmailMessage( - subject, - create_notification_message(event, user, "mail", *args, **kwargs), - email_from_address, - [address], - headers={"From": f"{email_from_address}"}, + logger.info( + "URLs for Webhooks not configured: skipping system notification", + ) + return Notification_Webhooks.objects.none() + return endpoints + + def _generate_request_details( + self, + endpoint: Notification_Webhooks, + event: str | None = None, + **kwargs: dict, + ) -> tuple[dict, dict]: + headers = { + "User-Agent": f"DefectDojo-{dd_version}", + "X-DefectDojo-Event": event, + "X-DefectDojo-Instance": settings.SITE_URL, + "Accept": "application/json", + } + if endpoint.header_name is not None: + headers[endpoint.header_name] = endpoint.header_value + yaml_data = self._create_notification_message( + event, + endpoint.owner, + "webhooks", + kwargs, + ) + data = yaml.safe_load(yaml_data) + + return headers, data + + def _webhooks_notification_request( + self, + endpoint: Notification_Webhooks, + event: str | None = None, + **kwargs: dict, + ) -> requests.Response: + headers, data = self._generate_request_details(endpoint, event=event, **kwargs) + return requests.request( + method="POST", + url=endpoint.url, + headers=headers, + json=data, + timeout=self.system_settings.webhooks_notifications_timeout, ) - email.content_subtype = "html" - logger.debug("sending email alert") - # logger.info(create_notification_message(event, user, 'mail', *args, **kwargs)) - email.send(fail_silently=False) - - except Exception as e: - logger.exception(e) - log_alert(e, "Email Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -def webhooks_notification_request(endpoint, event, *args, **kwargs): - from dojo.utils import get_system_setting - - headers = { - "User-Agent": f"DefectDojo-{dd_version}", - "X-DefectDojo-Event": event, - "X-DefectDojo-Instance": settings.SITE_URL, - "Accept": "application/json", - } - if endpoint.header_name is not None: - headers[endpoint.header_name] = endpoint.header_value - yaml_data = create_notification_message(event, endpoint.owner, "webhooks", *args, **kwargs) - data = yaml.safe_load(yaml_data) - - timeout = get_system_setting("webhooks_notifications_timeout") - - return requests.request( - method="POST", - url=endpoint.url, - headers=headers, - json=data, - timeout=timeout, - ) + def _test_webhooks_notification(self, endpoint: Notification_Webhooks) -> None: + res = self._webhooks_notification_request( + endpoint, + "ping", + description="Test webhook notification", + ) + res.raise_for_status() + # in "send_webhooks_notification", we are doing deeper analysis, why it failed + # for now, "raise_for_status" should be enough + + @app.task(ignore_result=True) + def _webhook_reactivation(self, endpoint_id: int, **_kwargs: dict): + endpoint = Notification_Webhooks.objects.get(pk=endpoint_id) + # User already changed status of endpoint + if endpoint.status != Notification_Webhooks.Status.STATUS_INACTIVE_TMP: + return + endpoint.status = Notification_Webhooks.Status.STATUS_ACTIVE_TMP + endpoint.save() + logger.debug( + f"Webhook endpoint '{endpoint.name}' reactivated to '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}'", + ) -def test_webhooks_notification(endpoint): - res = webhooks_notification_request(endpoint, "ping", description="Test webhook notification") - res.raise_for_status() - # in "send_webhooks_notification", we are doing deeper analysis, why it failed - # for now, "raise_for_status" should be enough +class AlertNotificationManger(NotificationManagerHelpers): -@app.task(ignore_result=True) -def webhook_reactivation(endpoint_id: int, *args, **kwargs): - endpoint = Notification_Webhooks.objects.get(pk=endpoint_id) + """Manger for alert notifications and their helpers.""" - # User already changed status of endpoint - if endpoint.status != Notification_Webhooks.Status.STATUS_INACTIVE_TMP: - return + def send_alert_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + logger.debug("sending alert notification to %s", user) + try: + # no need to differentiate between user/no user + icon = kwargs.get("icon", "info-circle") + try: + source = Notifications._meta.get_field(event).verbose_name.title()[:100] + except FieldDoesNotExist: + source = event.replace("_", " ").title()[:100] + alert = Alerts( + user_id=user, + title=kwargs.get("title")[:250], + description=self._create_notification_message( + event, + user, + "alert", + kwargs, + )[:2000], + url=kwargs.get("url", reverse("alerts")), + icon=icon[:25], + source=source, + ) + # relative urls will fail validation + alert.clean_fields(exclude=["url"]) + alert.save() + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Alert Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class NotificationManager(NotificationManagerHelpers): + + """Manage the construction and dispatch of notifications.""" + + def __init__(self, *args: list, **kwargs: dict) -> None: + NotificationManagerHelpers.__init__(self, *args, **kwargs) + + def create_notification(self, event: str | None = None, **kwargs: dict) -> None: + # Process the notifications for a given list of recipients + if kwargs.get("recipients") is not None: + self._process_recipients(event=event, **kwargs) + else: + logger.debug("creating system notifications for event: %s", event) + # send system notifications to all admin users + self._process_objects(**kwargs) + # System notifications are sent one with user=None, which will trigger email to configured system email, to global slack channel, etc. + self._process_notifications( + event, + notifications=self.system_notifications, + **kwargs, + ) + # All admins will also receive system notifications, but as part of the person global notifications section below + # This time user is set, so will trigger email to personal email, to personal slack channel (mention), etc. + # only retrieve users which have at least one notification type enabled for this event type. + logger.debug("creating personal notifications for event: %s", event) + # There are notification like deleting a product type that shall not be sent to users. + # These notifications will have the parameter no_users=True + if kwargs.get("no_users", False) is False: + # get users with either global notifications, or a product specific notification + # and all admin/superuser, they will always be notified + for user in self._get_user_to_send_notifications_to(): + self._send_single_notification_to_user(user, event=event, **kwargs) + + def _process_recipients(self, event: str | None = None, **kwargs: dict) -> None: + # mimic existing code so that when recipients is specified, no other system or personal notifications are sent. + logger.debug("creating notifications for recipients: %s", kwargs["recipients"]) + for recipient_notifications in Notifications.objects.filter( + user__username__in=kwargs["recipients"], + user__is_active=True, + product=None, + ): + if event in settings.NOTIFICATIONS_SYSTEM_LEVEL_TRUMP: + # merge the system level notifications with the personal level + # this allows for system to trump the personal + merged_notifications = Notifications.merge_notifications_list( + [self.system_notifications, recipient_notifications], + ) + merged_notifications.user = recipient_notifications.user + logger.debug("Sent notification to %s", merged_notifications.user) + self._process_notifications( + event, + notifications=merged_notifications, + **kwargs, + ) + else: + # Do not trump user preferences and send notifications as usual + logger.debug("Sent notification to %s", recipient_notifications.user) + self._process_notifications( + event, + notifications=recipient_notifications, + **kwargs, + ) - endpoint.status = Notification_Webhooks.Status.STATUS_ACTIVE_TMP - endpoint.save() - logger.debug(f"Webhook endpoint '{endpoint.name}' reactivated to '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}'") + def _process_objects(self, **kwargs: dict) -> None: + """Extract the product and product type from the kwargs.""" + self.product_type: Product_Type = None + self.product: Product = None + if (product_type := kwargs.get("product_type")) is not None: + self.product_type = product_type + logger.debug("Defined product type %s", self.product_type) + if (product := kwargs.get("product")) is not None: + self.product = product + logger.debug("Defined product %s", self.product) + elif (engagement := kwargs.get("engagement")) is not None: + self.product = engagement.product + logger.debug("Defined product of engagement %s", self.product) + elif (test := kwargs.get("test")) is not None: + self.product = test.engagement.product + logger.debug("Defined product of test %s", self.product) + elif (finding := kwargs.get("finding")) is not None: + self.product = finding.test.engagement.product + logger.debug("Defined product of finding %s", self.product) + elif (obj := kwargs.get("obj")) is not None: + from dojo.utils import get_product + + self.product = get_product(obj) + logger.debug("Defined product of obj %s", self.product) + + def _get_user_to_send_notifications_to( + self, + ) -> QuerySet[Dojo_User]: + """Determine the users we should send notifications to based on product and product type permissions.""" + users = ( + Dojo_User.objects.filter(is_active=True) + .prefetch_related( + Prefetch( + "notifications_set", + queryset=Notifications.objects.filter( + Q(product_id=self.product) | Q(product__isnull=True), + ), + to_attr="applicable_notifications", + ), + ) + .annotate( + applicable_notifications_count=Count( + "notifications__id", + filter=Q(notifications__product_id=self.product) | Q(notifications__product__isnull=True), + ), + ) + .filter(Q(applicable_notifications_count__gt=0) | Q(is_superuser=True)) + ) + # only send to authorized users or admin/superusers + logger.debug("Filtering users for the product %s", self.product) + if self.product is not None: + users = get_authorized_users_for_product_and_product_type( + users, + self.product, + Permissions.Product_View, + ) + elif self.product_type is not None: + users = get_authorized_users_for_product_type( + users, + self.product_type, + Permissions.Product_Type_View, + ) + else: + # nor product_type nor product defined, we should not make noise and send only notifications to admins + logger.debug("Product is not specified, making it silent") + users = users.filter(is_superuser=True) + return users + + def _send_single_notification_to_user( + self, + user: Dojo_User, + event: str | None = None, + **kwargs: dict, + ) -> None: + """Send a notification to a single user.""" + logger.debug("Authorized user for the product %s", user) + # send notifications to user after merging possible multiple notifications records (i.e. personal global + personal product) + # kwargs.update({'user': user}) + applicable_notifications = user.applicable_notifications + if user.is_superuser: + # admin users get all system notifications + logger.debug("User %s is superuser", user) + applicable_notifications.append(self.system_notifications) + + notifications_set = Notifications.merge_notifications_list( + applicable_notifications, + ) + notifications_set.user = user + self._process_notifications(event, notifications=notifications_set, **kwargs) + + def _get_manager_instance( + self, + alert_type: str, + ) -> type[NotificationManagerHelpers]: + kwargs = { + "system_notifications": self.system_notifications, + "system_settings": self.system_settings, + } + if alert_type == "slack": + return SlackNotificationManger(**kwargs) + if alert_type == "msteams": + return MSTeamsNotificationManger(**kwargs) + if alert_type == "mail": + return EmailNotificationManger(**kwargs) + if alert_type == "webhooks": + return WebhookNotificationManger(**kwargs) + if alert_type == "alert": + return AlertNotificationManger(**kwargs) + + msg = f"Unsupported alert type: {alert_type}" + raise TypeError(msg) + + def _process_notifications( + self, + event: str | None, + notifications: Notifications | None = None, + **kwargs: dict, + ) -> None: + # Quick break out if we do not have any work to do + if not notifications: + logger.warning("no notifications!") + return + + logger.debug( + "sending notification " + ("asynchronously" if we_want_async() else "synchronously"), + ) + logger.debug("process notifications for %s", notifications.user) + + if self.system_settings.enable_slack_notifications and "slack" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Slack Notification") + self._get_manager_instance("slack").send_slack_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_msteams_notifications and "msteams" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending MSTeams Notification") + self._get_manager_instance("msteams").send_msteams_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_mail_notifications and "mail" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Mail Notification") + self._get_manager_instance("mail").send_mail_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Webhooks Notification") + self._get_manager_instance("webhooks").send_webhooks_notification( + event, + user=notifications.user, + **kwargs, + ) + + if "alert" in getattr(notifications, event, getattr(notifications, "other")): + logger.debug(f"Sending Alert to {notifications.user}") + self._get_manager_instance("alert").send_alert_notification( + event, + user=notifications.user, + **kwargs, + ) @app.task(ignore_result=True) -def webhook_status_cleanup(*args, **kwargs): +def webhook_status_cleanup(*_args: list, **_kwargs: dict): # If some endpoint was affected by some outage (5xx, 429, Timeout) but it was clean during last 24 hours, # we consider this endpoint as healthy so need to reset it endpoints = Notification_Webhooks.objects.filter( @@ -397,7 +873,9 @@ def webhook_status_cleanup(*args, **kwargs): endpoint.last_error = None endpoint.note = f"Reactivation from {Notification_Webhooks.Status.STATUS_ACTIVE_TMP}" endpoint.save() - logger.debug(f"Webhook endpoint '{endpoint.name}' reactivated from '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}' to '{Notification_Webhooks.Status.STATUS_ACTIVE}'") + logger.debug( + f"Webhook endpoint '{endpoint.name}' reactivated from '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}' to '{Notification_Webhooks.Status.STATUS_ACTIVE}'", + ) # Reactivation of STATUS_INACTIVE_TMP endpoints. # They should reactive automatically in 60s, however in case of some unexpected event (e.g. start of whole stack), @@ -407,180 +885,5 @@ def webhook_status_cleanup(*args, **kwargs): last_error__lt=get_current_datetime() - timedelta(minutes=5), ) for endpoint in broken_endpoints: - webhook_reactivation(endpoint_id=endpoint.pk) - - -@dojo_async_task -@app.task -def send_webhooks_notification(event, user=None, *args, **kwargs): - - ERROR_PERMANENT = "permanent" - ERROR_TEMPORARY = "temporary" - - endpoints = Notification_Webhooks.objects.filter(owner=user) - - if not endpoints: - if user: - logger.info(f"URLs for Webhooks not configured for user '{user}': skipping user notification") - else: - logger.info("URLs for Webhooks not configured: skipping system notification") - return - - for endpoint in endpoints: - - error = None - if endpoint.status not in [Notification_Webhooks.Status.STATUS_ACTIVE, Notification_Webhooks.Status.STATUS_ACTIVE_TMP]: - logger.info(f"URL for Webhook '{endpoint.name}' is not active: {endpoint.get_status_display()} ({endpoint.status})") - continue - - try: - logger.debug(f"Sending webhook message to endpoint '{endpoint.name}'") - res = webhooks_notification_request(endpoint, event, *args, **kwargs) - - if 200 <= res.status_code < 300: - logger.debug(f"Message sent to endpoint '{endpoint.name}' successfully.") - continue - - # HTTP request passed successfully but we still need to check status code - error = ERROR_TEMPORARY if 500 <= res.status_code < 600 or res.status_code == 429 else ERROR_PERMANENT - - endpoint.note = f"Response status code: {res.status_code}" - logger.error(f"Error when sending message to Webhooks '{endpoint.name}' (status: {res.status_code}): {res.text}") - - except requests.exceptions.Timeout as e: - error = ERROR_TEMPORARY - endpoint.note = f"Requests exception: {e}" - logger.error(f"Timeout when sending message to Webhook '{endpoint.name}'") - - except Exception as e: - error = ERROR_PERMANENT - endpoint.note = f"Exception: {e}"[:1000] - logger.exception(e) - log_alert(e, "Webhooks Notification") - - now = get_current_datetime() - - if error == ERROR_TEMPORARY: - - # If endpoint is unstable for more then one day, it needs to be deactivated - if endpoint.first_error is not None and (now - endpoint.first_error).total_seconds() > 60 * 60 * 24: - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT - - else: - # We need to monitor when outage started - if endpoint.status == Notification_Webhooks.Status.STATUS_ACTIVE: - endpoint.first_error = now - - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_TMP - - # In case of failure within one day, endpoint can be deactivated temporally only for one minute - webhook_reactivation.apply_async(kwargs={"endpoint_id": endpoint.pk}, countdown=60) - - # There is no reason to keep endpoint active if it is returning 4xx errors - else: - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT - endpoint.first_error = now - - endpoint.last_error = now - endpoint.save() - - -def send_alert_notification(event, user=None, *args, **kwargs): - logger.debug("sending alert notification to %s", user) - try: - # no need to differentiate between user/no user - icon = kwargs.get("icon", "info-circle") - try: - source = Notifications._meta.get_field(event).verbose_name.title()[:100] - except FieldDoesNotExist: - source = event.replace("_", " ").title()[:100] - alert = Alerts( - user_id=user, - title=kwargs.get("title")[:250], - description=create_notification_message(event, user, "alert", *args, **kwargs)[:2000], - url=kwargs.get("url", reverse("alerts")), - icon=icon[:25], - source=source, - ) - # relative urls will fail validation - alert.clean_fields(exclude=["url"]) - alert.save() - except Exception as e: - logger.exception(e) - log_alert(e, "Alert Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -def get_slack_user_id(user_email): - - from dojo.utils import get_system_setting - - user_id = None - - res = requests.request( - method="POST", - url="https://slack.com/api/users.lookupByEmail", - data={"token": get_system_setting("slack_token"), "email": user_email}, - timeout=settings.REQUESTS_TIMEOUT, - ) - - user = json.loads(res.text) - - slack_user_is_found = False - if user: - if "error" in user: - logger.error("Slack is complaining. See error message below.") - logger.error(user) - raise RuntimeError("Error getting user list from Slack: " + res.text) - if "email" in user["user"]["profile"]: - if user_email == user["user"]["profile"]["email"]: - if "id" in user["user"]: - user_id = user["user"]["id"] - logger.debug(f"Slack user ID is {user_id}") - slack_user_is_found = True - else: - logger.warning(f"A user with email {user_email} could not be found in this Slack workspace.") - - if not slack_user_is_found: - logger.warning("The Slack user was not found.") - - return user_id - - -def log_alert(e, notification_type=None, *args, **kwargs): - # no try catch here, if this fails we need to show an error - - users = Dojo_User.objects.filter(is_superuser=True) - for user in users: - alert = Alerts( - user_id=user, - url=kwargs.get("url", reverse("alerts")), - title=kwargs.get("title", "Notification issue")[:250], - description=kwargs.get("description", str(e))[:2000], - icon="exclamation-triangle", - source=notification_type[:100] if notification_type else kwargs.get("source", "unknown")[:100]) - # relative urls will fail validation - alert.clean_fields(exclude=["url"]) - alert.save() - - -def notify_test_created(test): - title = "Test created for " + str(test.engagement.product) + ": " + str(test.engagement.name) + ": " + str(test) - create_notification(event="test_added", title=title, test=test, engagement=test.engagement, product=test.engagement.product, - url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,))) - - -def notify_scan_added(test, updated_count, new_findings=[], findings_mitigated=[], findings_reactivated=[], findings_untouched=[]): - logger.debug("Scan added notifications") - - new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) - findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) - findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) - findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) - - title = "Created/Updated " + str(updated_count) + " findings for " + str(test.engagement.product) + ": " + str(test.engagement.name) + ": " + str(test) - - event = "scan_added_empty" if updated_count == 0 else "scan_added" - - create_notification(event=event, title=title, findings_new=new_findings, findings_mitigated=findings_mitigated, findings_reactivated=findings_reactivated, - finding_count=updated_count, test=test, engagement=test.engagement, product=test.engagement.product, findings_untouched=findings_untouched, - url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,))) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=endpoint.pk) diff --git a/dojo/notifications/views.py b/dojo/notifications/views.py index 7fe5562ee7e..7fc58803fc4 100644 --- a/dojo/notifications/views.py +++ b/dojo/notifications/views.py @@ -11,7 +11,7 @@ from dojo.forms import DeleteNotificationsWebhookForm, NotificationsForm, NotificationsWebhookForm from dojo.models import Notification_Webhooks, Notifications -from dojo.notifications.helper import test_webhooks_notification +from dojo.notifications.helper import NotificationManagerHelpers from dojo.utils import add_breadcrumb, get_enabled_notifications_list, get_system_setting logger = logging.getLogger(__name__) @@ -136,6 +136,9 @@ def set_breadcrumbs(self, request: HttpRequest): class NotificationWebhooksView(View): + def get_webhook_manager_instance(self) -> type[NotificationManagerHelpers]: + return Notification_Webhooks() + def check_webhooks_enabled(self): if not get_system_setting("enable_webhooks_notifications"): raise Http404 @@ -216,7 +219,7 @@ def process_form(self, request: HttpRequest, context: dict): form = context["form"] if form.is_valid(): try: - test_webhooks_notification(form.instance) + self.get_webhook_manager_instance().test_webhooks_notification(form.instance) except requests.exceptions.RequestException as e: messages.add_message( request, @@ -305,7 +308,7 @@ def process_form(self, request: HttpRequest, nwh: Notification_Webhooks, context if form.is_valid(): try: - test_webhooks_notification(form.instance) + self.get_webhook_manager_instance().test_webhooks_notification(form.instance) except requests.exceptions.RequestException as e: messages.add_message( request, diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index cccdb2e3d6b..860c2168599 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -1,6 +1,6 @@ import datetime import logging -from unittest.mock import patch +from unittest.mock import Mock, patch from auditlog.context import set_actor from crum import impersonate @@ -10,11 +10,12 @@ from rest_framework.authtoken.models import Token from rest_framework.test import APIClient, APITestCase -import dojo.notifications.helper as notifications_helper from dojo import __version__ as dd_version +from dojo.importers.base_importer import BaseImporter from dojo.models import ( DEFAULT_NOTIFICATION, Alerts, + Development_Environment, Dojo_User, Endpoint, Engagement, @@ -31,10 +32,9 @@ get_current_datetime, ) from dojo.notifications.helper import ( + AlertNotificationManger, + WebhookNotificationManger, create_notification, - send_alert_notification, - send_webhooks_notification, - webhook_reactivation, webhook_status_cleanup, ) @@ -90,107 +90,115 @@ def test_merge_notifications_list(self): self.assertEqual(len(merged_notifications.other), 3) self.assertEqual(merged_notifications.other, {"alert", "mail", "slack"}) - @patch("dojo.notifications.helper.send_alert_notification", wraps=send_alert_notification) - def test_notifications_system_level_trump(self, mock): + # @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) + @patch("dojo.notifications.helper.NotificationManager._get_manager_instance") + def test_notifications_system_level_trump(self, mock_get_manager_instance): + mock_manager = Mock(wraps=AlertNotificationManger()) + mock_get_manager_instance.return_value = mock_manager + notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system off"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system on"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) # Small note for this test-cast: Trump works only in positive direction - system is not able to disable some kind of notification if user enabled it - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system off"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system on"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) - last_count = mock.call_count + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) + last_count = mock_manager.send_alert_notification.call_count + + # @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) + @patch("dojo.notifications.helper.NotificationManager._get_manager_instance") + def test_non_default_other_notifications(self, mock_get_manager_instance): + mock_manager = Mock(wraps=AlertNotificationManger()) + mock_get_manager_instance.return_value = mock_manager - @patch("dojo.notifications.helper.send_alert_notification", wraps=send_alert_notification) - def test_non_default_other_notifications(self, mock): notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("do not notify other"): notif_user.other = () # no alert notif_user.save() create_notification(event="dummy_bar_event", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("notify other"): notif_user.other = DEFAULT_NOTIFICATION # alert only notif_user.save() create_notification(event="dummy_foo_event", title="title_for_dummy_foo_event", description="description_for_dummy_foo_event", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) - self.assertEqual(mock.call_args_list[0].args[0], "dummy_foo_event") + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_args_list[0].args[0], "dummy_foo_event") alert = Alerts.objects.get(title="title_for_dummy_foo_event") self.assertEqual(alert.source, "Dummy Foo Event") - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system off"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 0) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 0) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system on"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) # Small note for this test-cast: Trump works only in positive direction - system is not able to disable some kind of notification if user enabled it - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system off"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system on"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) class TestNotificationTriggers(DojoTestCase): @@ -199,7 +207,7 @@ class TestNotificationTriggers(DojoTestCase): def setUp(self): self.notification_tester = Dojo_User.objects.get(username="admin") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_product_types(self, mock): last_count = mock.call_count @@ -219,7 +227,7 @@ def test_product_types(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product type "notif prod type" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/product/type") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_products(self, mock): last_count = mock.call_count @@ -240,7 +248,7 @@ def test_products(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product "prod name" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/product") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_engagements(self, mock): last_count = mock.call_count @@ -300,7 +308,7 @@ def test_engagements(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The engagement "Testing engagement" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/{prod2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_endpoints(self, mock): prod_type = Product_Type.objects.first() prod1, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name 1") @@ -323,7 +331,7 @@ def test_endpoints(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The endpoint "host2" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/endpoint") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_tests(self, mock): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") @@ -347,7 +355,7 @@ def test_tests(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The test "Acunetix Scan" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_finding_groups(self, mock): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") @@ -372,7 +380,7 @@ def test_finding_groups(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The finding group "fg test" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/test/{test2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=True) def test_auditlog_on(self, mock): prod_type = Product_Type.objects.create(name="notif prod type") @@ -380,7 +388,7 @@ def test_auditlog_on(self, mock): prod_type.delete() self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product type "notif prod type" was deleted by admin') - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=False) def test_auditlog_off(self, mock): prod_type = Product_Type.objects.create(name="notif prod type") @@ -397,7 +405,7 @@ def setUp(self): self.client = APIClient() self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=True) def test_auditlog_on(self, mock): prod_type = Product_Type.objects.create(name="notif prod type API") @@ -427,26 +435,30 @@ def test_missing_system_webhook(self): # test data contains 2 entries but we need to test missing definition Notification_Webhooks.objects.all().delete() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("URLs for Webhooks not configured: skipping system notification", cm.output[0]) def test_missing_personal_webhook(self): # test data contains 2 entries but we need to test missing definition Notification_Webhooks.objects.all().delete() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy", user=Dojo_User.objects.get(username="admin")) + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", user=Dojo_User.objects.get(username="admin")) self.assertIn("URLs for Webhooks not configured for user '(admin)': skipping user notification", cm.output[0]) def test_system_webhook_inactive(self): self.sys_wh.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("URL for Webhook 'My webhook endpoint' is not active: Permanently inactive (inactive_permanent)", cm.output[0]) def test_system_webhook_sucessful(self): with self.assertLogs("dojo.notifications.helper", level="DEBUG") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("Message sent to endpoint 'My webhook endpoint' successfully.", cm.output[-1]) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() @@ -459,7 +471,8 @@ def test_system_webhook_4xx(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") self.assertIn("Error when sending message to Webhooks 'My webhook endpoint' (status: 400)", cm.output[-1]) updated_wh = Notification_Webhooks.objects.all().filter(owner=None).first() @@ -472,7 +485,8 @@ def test_system_webhook_first_5xx(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -490,7 +504,8 @@ def test_system_webhook_second_5xx_within_one_day(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -510,7 +525,8 @@ def test_system_webhook_third_5xx_after_more_then_day(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT) @@ -522,7 +538,8 @@ def test_system_webhook_third_5xx_after_more_then_day(self): def test_webhook_reactivation(self): with self.subTest("active"): wh = Notification_Webhooks.objects.filter(owner=None).first() - webhook_reactivation(endpoint_id=wh.pk) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE) @@ -540,7 +557,8 @@ def test_webhook_reactivation(self): wh.save() with self.assertLogs("dojo.notifications.helper", level="DEBUG") as cm: - webhook_reactivation(endpoint_id=wh.pk) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE_TMP) @@ -640,7 +658,8 @@ def test_system_webhook_timeout(self): system_settings.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -655,7 +674,8 @@ def test_system_webhook_wrong_fqdn(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT) @@ -751,7 +771,15 @@ def test_events_messages(self, mock): with self.subTest("test_added"): test = Test.objects.create(title="notif test", engagement=eng, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) - notifications_helper.notify_test_created(test) + create_notification( + event="test_added", + title=f"Test created for {test.engagement.product}: {test.engagement.name}: {test}", + test=test, + engagement=test.engagement, + product=test.engagement.product, + url=reverse("view_test", args=(test.id,)), + url_api=reverse("test-detail", args=(test.id,)), + ) self.assertEqual(mock.call_args.kwargs["headers"]["X-DefectDojo-Event"], "test_added") self.maxDiff = None self.assertEqual(mock.call_args.kwargs["json"], { @@ -787,7 +815,10 @@ def test_events_messages(self, mock): }) with self.subTest("scan_added_empty"): - notifications_helper.notify_scan_added(test, updated_count=0) + BaseImporter( + environment=Development_Environment.objects.get_or_create(name="Development")[0], + scan_type="ZAP Scan", + ).notify_scan_added(test, updated_count=0) self.assertEqual(mock.call_args.kwargs["headers"]["X-DefectDojo-Event"], "scan_added_empty") self.maxDiff = None self.assertEqual(mock.call_args.kwargs["json"], { @@ -830,7 +861,11 @@ def test_events_messages(self, mock): }) with self.subTest("scan_added"): - notifications_helper.notify_scan_added(test, + BaseImporter( + environment=Development_Environment.objects.get_or_create(name="Development")[0], + scan_type="ZAP Scan", + ).notify_scan_added( + test, updated_count=4, new_findings=[ Finding.objects.create(test=test, title="New Finding", severity="Critical"),