Comments
diff --git a/nautobot/dcim/templates/dcim/location_migrate_data_to_contact.html b/nautobot/dcim/templates/dcim/location_migrate_data_to_contact.html
new file mode 100644
index 0000000000..7ddcbbe861
--- /dev/null
+++ b/nautobot/dcim/templates/dcim/location_migrate_data_to_contact.html
@@ -0,0 +1,102 @@
+{% extends 'generic/object_create.html' %}
+{% load form_helpers %}
+{% load helpers %}
+
+{% block title %}Migrating contact data from {{ obj_type }} {{ obj }}{% endblock %}
+{% block form %}
+
+
+
Contact Data
+
+ {% render_field form.action %}
+ {% render_field form.location %}
+
+
+
+ {% render_field form.name %}
+ {% render_field form.phone %}
+ {% render_field form.email %}
+ {% render_field form.contact %}
+ {% render_field form.team %}
+ {% render_field form.role %}
+ {% render_field form.status %}
+
+
+
+{% endblock %}
+
+{% block buttons %}
+
+
Cancel
+{% endblock %}
+
+{% block javascript %}
+
+{% endblock %}
\ No newline at end of file
diff --git a/nautobot/dcim/tests/test_views.py b/nautobot/dcim/tests/test_views.py
index 3ae0a42fb6..51251d54db 100644
--- a/nautobot/dcim/tests/test_views.py
+++ b/nautobot/dcim/tests/test_views.py
@@ -25,6 +25,7 @@
InterfaceModeChoices,
InterfaceRedundancyGroupProtocolChoices,
InterfaceTypeChoices,
+ LocationDataToContactActionChoices,
PortTypeChoices,
PowerFeedPhaseChoices,
PowerFeedSupplyChoices,
@@ -93,6 +94,8 @@
from nautobot.extras.choices import CustomFieldTypeChoices, RelationshipTypeChoices
from nautobot.extras.models import (
ConfigContextSchema,
+ Contact,
+ ContactAssociation,
CustomField,
CustomFieldChoice,
ExternalIntegration,
@@ -102,6 +105,7 @@
SecretsGroup,
Status,
Tag,
+ Team,
)
from nautobot.ipam.choices import IPAddressTypeChoices
from nautobot.ipam.models import IPAddress, Namespace, Prefix, VLAN, VLANGroup, VRF
@@ -174,6 +178,8 @@ class LocationTestCase(ViewTestCases.PrimaryObjectViewTestCase):
@classmethod
def setUpTestData(cls):
+ cls.contact_statuses = Status.objects.get_for_model(ContactAssociation)
+ cls.contact_roles = Role.objects.get_for_model(ContactAssociation)
lt1 = LocationType.objects.get(name="Campus")
lt2 = LocationType.objects.get(name="Building")
lt3 = LocationType.objects.get(name="Floor")
@@ -252,6 +258,137 @@ def test_create_child_location_under_a_non_globally_unique_named_parent_location
self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(Location.objects.get(name="Root 3").parent.pk, site_1.pk)
+ @override_settings(EXEMPT_VIEW_PERMISSIONS=["*"])
+ def test_migrate_location_data_from_location_assign(self):
+ self.add_permissions("dcim.change_location")
+ location = Location.objects.first()
+ location.contact_name = "Should be unique Contact Name"
+ location.contact_phone = "123123123"
+ location.contact_email = "helloword@example.com"
+ location.physical_address = "418 Brown Locks Barrettchester, NM 85792"
+ location.shipping_address = "53 blue Locks manchester, NY 12124"
+ similar_contact = Contact.objects.first()
+ role = self.contact_roles.first().pk
+ status = self.contact_statuses.first().pk
+ form_data = {
+ "action": LocationDataToContactActionChoices.ASSOCIATE_EXISTING_CONTACT,
+ "contact": similar_contact.pk,
+ "role": role,
+ "status": status,
+ }
+ request = {
+ "path": reverse("dcim:location_migrate_data_to_contact", kwargs={"pk": location.pk}),
+ "data": post_data(form_data),
+ }
+ # Assert permission checks are triggered
+ self.assertHttpStatus(self.client.post(**request), 200)
+ self.add_permissions("extras.add_contactassociation")
+ self.assertHttpStatus(self.client.post(**request), 302)
+ # assert ContactAssociation is created correctly
+ created_contact_association = ContactAssociation.objects.order_by("created").last()
+ self.assertEqual(created_contact_association.associated_object_id, location.pk)
+ self.assertEqual(created_contact_association.contact.pk, similar_contact.pk)
+ self.assertEqual(created_contact_association.role.pk, role)
+ self.assertEqual(created_contact_association.status.pk, status)
+
+ # assert location data is cleared out
+ location.refresh_from_db()
+ self.assertEqual(location.contact_name, "")
+ self.assertEqual(location.contact_phone, "")
+ self.assertEqual(location.contact_email, "")
+
+ @override_settings(EXEMPT_VIEW_PERMISSIONS=["*"])
+ def test_migrate_location_data_from_location_new_contact(self):
+ self.add_permissions("dcim.change_location")
+ location = Location.objects.first()
+ location.contact_name = "Should be unique Contact Name"
+ location.contact_phone = "123123123"
+ location.contact_email = "helloword@example.com"
+ location.physical_address = "418 Brown Locks Barrettchester, NM 85792"
+ location.shipping_address = "53 blue Locks manchester, NY 12124"
+ role = self.contact_roles.first().pk
+ status = self.contact_statuses.first().pk
+ form_data = {
+ "action": LocationDataToContactActionChoices.CREATE_AND_ASSIGN_NEW_CONTACT,
+ "name": "Should be unique Contact Name",
+ "phone": "123123123",
+ "email": "helloword@example.com",
+ "role": role,
+ "status": status,
+ }
+ request = {
+ "path": reverse("dcim:location_migrate_data_to_contact", kwargs={"pk": location.pk}),
+ "data": post_data(form_data),
+ }
+ # Assert permission checks are triggered
+ self.assertHttpStatus(self.client.post(**request), 200)
+ self.add_permissions("extras.add_contactassociation")
+ self.add_permissions("extras.add_contact")
+ self.assertHttpStatus(self.client.post(**request), 302)
+ # assert a new contact is created successfully
+ contact = Contact.objects.get(name="Should be unique Contact Name")
+ self.assertEqual(contact.name, form_data["name"])
+ self.assertEqual(contact.phone, form_data["phone"])
+ self.assertEqual(contact.email, form_data["email"])
+ # assert ContactAssociation is created correctly
+ created_contact_association = ContactAssociation.objects.order_by("created").last()
+ self.assertEqual(created_contact_association.associated_object_id, location.pk)
+ self.assertEqual(created_contact_association.contact.pk, contact.pk)
+ self.assertEqual(created_contact_association.role.pk, role)
+ self.assertEqual(created_contact_association.status.pk, status)
+
+ # assert location data is cleared out
+ location.refresh_from_db()
+ self.assertEqual(location.contact_name, "")
+ self.assertEqual(location.contact_phone, "")
+ self.assertEqual(location.contact_email, "")
+
+ @override_settings(EXEMPT_VIEW_PERMISSIONS=["*"])
+ def test_migrate_location_data_from_location_new_team(self):
+ self.add_permissions("dcim.change_location")
+ location = Location.objects.first()
+ location.contact_name = "Should be unique Team Name"
+ location.contact_phone = "123123123"
+ location.contact_email = "helloword@example.com"
+ location.physical_address = "418 Brown Locks Barrettchester, NM 85792"
+ location.shipping_address = "53 blue Locks manchester, NY 12124"
+ role = self.contact_roles.first().pk
+ status = self.contact_statuses.first().pk
+ form_data = {
+ "action": LocationDataToContactActionChoices.CREATE_AND_ASSIGN_NEW_TEAM,
+ "name": "Should be unique Team Name",
+ "phone": "123123123",
+ "email": "helloword@example.com",
+ "role": role,
+ "status": status,
+ }
+ request = {
+ "path": reverse("dcim:location_migrate_data_to_contact", kwargs={"pk": location.pk}),
+ "data": post_data(form_data),
+ }
+ # Assert permission checks are triggered
+ self.assertHttpStatus(self.client.post(**request), 200)
+ self.add_permissions("extras.add_contactassociation")
+ self.add_permissions("extras.add_team")
+ self.assertHttpStatus(self.client.post(**request), 302)
+ # assert a new team is created successfully
+ team = Team.objects.get(name="Should be unique Team Name")
+ self.assertEqual(team.name, form_data["name"])
+ self.assertEqual(team.phone, form_data["phone"])
+ self.assertEqual(team.email, form_data["email"])
+ # assert ContactAssociation is created correctly
+ created_contact_association = ContactAssociation.objects.order_by("created").last()
+ self.assertEqual(created_contact_association.associated_object_id, location.pk)
+ self.assertEqual(created_contact_association.team.pk, team.pk)
+ self.assertEqual(created_contact_association.role.pk, role)
+ self.assertEqual(created_contact_association.status.pk, status)
+
+ # assert location data is cleared out
+ location.refresh_from_db()
+ self.assertEqual(location.contact_name, "")
+ self.assertEqual(location.contact_phone, "")
+ self.assertEqual(location.contact_email, "")
+
class RackGroupTestCase(ViewTestCases.OrganizationalObjectViewTestCase):
model = RackGroup
diff --git a/nautobot/dcim/urls.py b/nautobot/dcim/urls.py
index 983cb301f7..990c372df7 100644
--- a/nautobot/dcim/urls.py
+++ b/nautobot/dcim/urls.py
@@ -84,6 +84,11 @@
name="location_notes",
kwargs={"model": Location},
),
+ path(
+ "locations/
/migrate-data-to-contact/",
+ views.MigrateLocationDataToContactView.as_view(),
+ name="location_migrate_data_to_contact",
+ ),
path(
"locations//images/add/",
ImageAttachmentEditView.as_view(),
diff --git a/nautobot/dcim/views.py b/nautobot/dcim/views.py
index 84fa188b0c..98006c65c7 100644
--- a/nautobot/dcim/views.py
+++ b/nautobot/dcim/views.py
@@ -1,8 +1,10 @@
from collections import OrderedDict
+import logging
import uuid
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
+from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.core.paginator import EmptyPage, PageNotAnInteger
from django.db import transaction
from django.db.models import F, Prefetch
@@ -12,15 +14,19 @@
MultipleHiddenInput,
)
from django.shortcuts import get_object_or_404, HttpResponse, redirect, render
+from django.utils.encoding import iri_to_uri
from django.utils.functional import cached_property
from django.utils.html import format_html
+from django.utils.http import url_has_allowed_host_and_scheme
from django.views.generic import View
from django_tables2 import RequestConfig
from nautobot.circuits.models import Circuit
-from nautobot.core.forms import ConfirmationForm
+from nautobot.core.forms import ConfirmationForm, restrict_form_fields
from nautobot.core.models.querysets import count_related
+from nautobot.core.templatetags.helpers import has_perms
from nautobot.core.utils.permissions import get_permission_for_model
+from nautobot.core.utils.requests import normalize_querydict
from nautobot.core.views import generic
from nautobot.core.views.mixins import (
GetReturnURLMixin,
@@ -30,7 +36,10 @@
)
from nautobot.core.views.paginator import EnhancedPaginator, get_paginate_count
from nautobot.core.views.viewsets import NautobotUIViewSet
+from nautobot.dcim.choices import LocationDataToContactActionChoices
+from nautobot.dcim.forms import LocationMigrateDataToContactForm
from nautobot.dcim.utils import get_all_network_driver_mappings, get_network_driver_mapping_tool_names
+from nautobot.extras.models import Contact, ContactAssociation, Role, Status, Team
from nautobot.extras.views import ObjectChangeLogView, ObjectConfigContextView, ObjectDynamicGroupsView
from nautobot.ipam.models import IPAddress, Prefix, Service, VLAN
from nautobot.ipam.tables import InterfaceIPAddressTable, InterfaceVLANTable, VRFDeviceAssignmentTable
@@ -83,6 +92,8 @@
VirtualChassis,
)
+logger = logging.getLogger(__name__)
+
class BulkDisconnectView(GetReturnURLMixin, ObjectPermissionRequiredMixin, View):
"""
@@ -283,6 +294,9 @@ def get_extra_context(self, request, instance):
"children_table": children_table,
"rack_groups": rack_groups,
"stats": stats,
+ "contact_association_permission": ["extras.add_contactassociation"],
+ # show the button if any of these fields have non-empty value.
+ "show_convert_to_contact_button": instance.contact_name or instance.contact_phone or instance.contact_email,
}
@@ -314,6 +328,140 @@ class LocationBulkDeleteView(generic.BulkDeleteView):
table = tables.LocationTable
+class MigrateLocationDataToContactView(generic.ObjectEditView):
+ queryset = Location.objects.all()
+ model_form = LocationMigrateDataToContactForm
+ template_name = "dcim/location_migrate_data_to_contact.html"
+
+ def get(self, request, *args, **kwargs):
+ obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs)
+
+ initial_data = normalize_querydict(request.GET, form_class=self.model_form)
+ # remove status from the location itself
+ initial_data["status"] = None
+ initial_data["location"] = obj.pk
+
+ # populate contact tab fields initial data
+ initial_data["name"] = obj.contact_name
+ initial_data["phone"] = obj.contact_phone
+ initial_data["email"] = obj.contact_email
+ form = self.model_form(instance=obj, initial=initial_data)
+ restrict_form_fields(form, request.user)
+ return render(
+ request,
+ self.template_name,
+ {
+ "obj": obj,
+ "obj_type": self.queryset.model._meta.verbose_name,
+ "form": form,
+ "return_url": self.get_return_url(request, obj),
+ "editing": obj.present_in_database,
+ "active_tab": "assign",
+ **self.get_extra_context(request, obj),
+ },
+ )
+
+ def post(self, request, *args, **kwargs):
+ obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs)
+ form = self.model_form(data=request.POST, files=request.FILES, instance=obj)
+ restrict_form_fields(form, request.user)
+
+ associated_object_id = obj.pk
+ associated_object_content_type = ContentType.objects.get_for_model(Location)
+ action = request.POST.get("action")
+ try:
+ with transaction.atomic():
+ if not has_perms(request.user, ["extras.add_contactassociation"]):
+ raise PermissionDenied(
+ "ObjectPermission extras.add_contactassociation is needed to perform this action"
+ )
+ contact = None
+ team = None
+ if action == LocationDataToContactActionChoices.CREATE_AND_ASSIGN_NEW_CONTACT:
+ if not has_perms(request.user, ["extras.add_contact"]):
+ raise PermissionDenied("ObjectPermission extras.add_contact is needed to perform this action")
+ contact = Contact(
+ name=request.POST.get("name"),
+ phone=request.POST.get("phone"),
+ email=request.POST.get("email"),
+ )
+ contact.validated_save()
+ # Trigger permission check
+ Contact.objects.restrict(request.user, "view").get(pk=contact.pk)
+ elif action == LocationDataToContactActionChoices.CREATE_AND_ASSIGN_NEW_TEAM:
+ if not has_perms(request.user, ["extras.add_team"]):
+ raise PermissionDenied("ObjectPermission extras.add_team is needed to perform this action")
+ team = Team(
+ name=request.POST.get("name"),
+ phone=request.POST.get("phone"),
+ email=request.POST.get("email"),
+ )
+ team.validated_save()
+ # Trigger permission check
+ Team.objects.restrict(request.user, "view").get(pk=team.pk)
+ elif action == LocationDataToContactActionChoices.ASSOCIATE_EXISTING_CONTACT:
+ contact = Contact.objects.restrict(request.user, "view").get(pk=request.POST.get("contact"))
+ elif action == LocationDataToContactActionChoices.ASSOCIATE_EXISTING_TEAM:
+ team = Team.objects.restrict(request.user, "view").get(pk=request.POST.get("team"))
+ else:
+ raise ValueError(f"Invalid action {action} passed from the form")
+
+ association = ContactAssociation(
+ contact=contact,
+ team=team,
+ associated_object_type=associated_object_content_type,
+ associated_object_id=associated_object_id,
+ status=Status.objects.get(pk=request.POST.get("status")),
+ role=Role.objects.get(pk=request.POST.get("role")),
+ )
+ association.validated_save()
+ # Trigger permission check
+ ContactAssociation.objects.restrict(request.user, "view").get(pk=association.pk)
+
+ # Clear out contact fields from location
+ location = self.get_object(kwargs)
+ location.contact_name = ""
+ location.contact_phone = ""
+ location.contact_email = ""
+ location.validated_save()
+
+ object_created = not form.instance.present_in_database
+
+ self.successful_post(request, obj, object_created, logger)
+
+ return_url = request.POST.get("return_url")
+ if url_has_allowed_host_and_scheme(url=return_url, allowed_hosts=request.get_host()):
+ return redirect(iri_to_uri(return_url))
+ else:
+ return redirect(self.get_return_url(request, obj))
+
+ except ObjectDoesNotExist:
+ msg = "Object save failed due to object-level permissions violation"
+ logger.debug(msg)
+ form.add_error(None, msg)
+ except PermissionDenied as e:
+ msg = e
+ logger.debug(msg)
+ form.add_error(None, msg)
+ except ValueError:
+ msg = f"Invalid action {action} passed from the form"
+ logger.debug(msg)
+ form.add_error(None, msg)
+
+ return render(
+ request,
+ self.template_name,
+ {
+ "obj": obj,
+ "obj_type": self.queryset.model._meta.verbose_name,
+ "form": form,
+ "return_url": self.get_return_url(request, obj),
+ "editing": obj.present_in_database,
+ **self.get_extra_context(request, obj),
+ },
+ )
+
+
#
# Rack groups
#
diff --git a/nautobot/extras/views.py b/nautobot/extras/views.py
index 26a1d99685..9b1c13e618 100644
--- a/nautobot/extras/views.py
+++ b/nautobot/extras/views.py
@@ -404,10 +404,8 @@ class ContactAssociationUIViewSet(
non_filter_params = ("export", "page", "per_page", "sort")
-class ObjectNewContactView(generic.ObjectEditView):
- queryset = Contact.objects.all()
- model_form = forms.ObjectNewContactForm
- template_name = "extras/object_new_contact.html"
+class ObjectContactTeamMixin:
+ """Mixin that contains a custom post() method to create a new contact/team and assign it to an existing object"""
def post(self, request, *args, **kwargs):
obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs)
@@ -428,13 +426,22 @@ def post(self, request, *args, **kwargs):
if hasattr(form, "save_note") and callable(form.save_note):
form.save_note(instance=obj, user=request.user)
- association = ContactAssociation(
- contact=obj,
- associated_object_type=ContentType.objects.get(id=request.POST.get("associated_object_type")),
- associated_object_id=request.POST.get("associated_object_id"),
- status=Status.objects.get(id=request.POST.get("status")),
- role=Role.objects.get(id=request.POST.get("role")) if request.POST.get("role") else None,
- )
+ if isinstance(obj, Contact):
+ association = ContactAssociation(
+ contact=obj,
+ associated_object_type=ContentType.objects.get(id=request.POST.get("associated_object_type")),
+ associated_object_id=request.POST.get("associated_object_id"),
+ status=Status.objects.get(id=request.POST.get("status")),
+ role=Role.objects.get(id=request.POST.get("role")) if request.POST.get("role") else None,
+ )
+ else:
+ association = ContactAssociation(
+ team=obj,
+ associated_object_type=ContentType.objects.get(id=request.POST.get("associated_object_type")),
+ associated_object_id=request.POST.get("associated_object_id"),
+ status=Status.objects.get(id=request.POST.get("status")),
+ role=Role.objects.get(id=request.POST.get("role")) if request.POST.get("role") else None,
+ )
association.validated_save()
self.successful_post(request, obj, object_created, logger)
@@ -474,75 +481,17 @@ def post(self, request, *args, **kwargs):
)
-class ObjectNewTeamView(generic.ObjectEditView):
+class ObjectNewContactView(ObjectContactTeamMixin, generic.ObjectEditView):
+ queryset = Contact.objects.all()
+ model_form = forms.ObjectNewContactForm
+ template_name = "extras/object_new_contact.html"
+
+
+class ObjectNewTeamView(ObjectContactTeamMixin, generic.ObjectEditView):
queryset = Team.objects.all()
model_form = forms.ObjectNewTeamForm
template_name = "extras/object_new_team.html"
- def post(self, request, *args, **kwargs):
- obj = self.alter_obj(self.get_object(kwargs), request, args, kwargs)
- form = self.model_form(data=request.POST, files=request.FILES, instance=obj)
- restrict_form_fields(form, request.user)
-
- if form.is_valid():
- logger.debug("Form validation was successful")
-
- try:
- with transaction.atomic():
- object_created = not form.instance.present_in_database
- obj = form.save()
-
- # Check that the new object conforms with any assigned object-level permissions
- self.queryset.get(pk=obj.pk)
-
- if hasattr(form, "save_note") and callable(form.save_note):
- form.save_note(instance=obj, user=request.user)
-
- association = ContactAssociation(
- team=obj,
- associated_object_type=ContentType.objects.get(id=request.POST.get("associated_object_type")),
- associated_object_id=request.POST.get("associated_object_id"),
- status=Status.objects.get(id=request.POST.get("status")),
- role=Role.objects.get(id=request.POST.get("role")) if request.POST.get("role") else None,
- )
- association.validated_save()
- self.successful_post(request, obj, object_created, logger)
-
- if "_addanother" in request.POST:
- # If the object has clone_fields, pre-populate a new instance of the form
- if hasattr(obj, "clone_fields"):
- url = f"{request.path}?{prepare_cloned_fields(obj)}"
- return redirect(url)
-
- return redirect(request.get_full_path())
-
- return_url = form.cleaned_data.get("return_url")
- if url_has_allowed_host_and_scheme(url=return_url, allowed_hosts=request.get_host()):
- return redirect(iri_to_uri(return_url))
- else:
- return redirect(self.get_return_url(request, obj))
-
- except ObjectDoesNotExist:
- msg = "Object save failed due to object-level permissions violation"
- logger.debug(msg)
- form.add_error(None, msg)
-
- else:
- logger.debug("Form validation failed")
-
- return render(
- request,
- self.template_name,
- {
- "obj": obj,
- "obj_type": self.queryset.model._meta.verbose_name,
- "form": form,
- "return_url": self.get_return_url(request, obj),
- "editing": obj.present_in_database,
- **self.get_extra_context(request, obj),
- },
- )
-
class ObjectAssignContactOrTeamView(generic.ObjectEditView):
queryset = ContactAssociation.objects.all()