Skip to content

Commit

Permalink
Merge pull request #545 from GhostManager/hotfix/fixed-dupe-ip-checks
Browse files Browse the repository at this point in the history
Hotfix/fixed dupe ip checks
  • Loading branch information
chrismaddalena authored Oct 24, 2024
2 parents d627a92 + 62e0d2c commit ef58d5f
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 31 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

# CHANGELOG

# [4.3.3] - 21 October 2024
## [4.3.4] - 24 October 2024

### Changed

* Adjusted the duplicate IP address checks for cloud servers on a project to make them more robust to catch more edge cases

### Fixed

* Fixed an issue with creating a new cloud server on a project

## [4.3.3] - 21 October 2024

### Added

Expand Down
4 changes: 2 additions & 2 deletions VERSION
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
v4.3.3
21 October 2024
v4.3.4
24 October 2024
4 changes: 2 additions & 2 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
# 3rd Party Libraries
import environ

__version__ = "4.3.3"
__version__ = "4.3.4"
VERSION = __version__
RELEASE_DATE = "21 October 2024"
RELEASE_DATE = "24 October 2024"

ROOT_DIR = Path(__file__).resolve(strict=True).parent.parent.parent
APPS_DIR = ROOT_DIR / "ghostwriter"
Expand Down
73 changes: 73 additions & 0 deletions ghostwriter/shepherd/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import force_str
from django.contrib.messages import get_messages

# Ghostwriter Libraries
from ghostwriter.factories import (
Expand All @@ -30,6 +31,8 @@
UserFactory,
VirusTotalConfigurationFactory,
)
from ghostwriter.shepherd.forms_server import TransientServerForm
from ghostwriter.shepherd.views import ServerHistoryCreate

logging.disable(logging.CRITICAL)

Expand Down Expand Up @@ -1177,6 +1180,7 @@ def setUpTestData(cls):
cls.mgr_user = UserFactory(password=PASSWORD, role="manager")
cls.project = ProjectFactory()
cls.uri = reverse("shepherd:vps_create", kwargs={"pk": cls.project.id})
cls.redirect_uri = "{}#infrastructure".format(reverse("rolodex:project_detail", kwargs={"pk": cls.project.id}))

def setUp(self):
self.client = Client()
Expand Down Expand Up @@ -1213,6 +1217,75 @@ def test_custom_context_exists(self):
"{}#infrastructure".format(reverse("rolodex:project_detail", kwargs={"pk": self.project.id})),
)

def form_data(
self,
ip_address=None,
aux_address=None,
name=None,
activity_type_id=None,
server_role_id=None,
server_provider_id=None,
note=None,
**kwargs,
):
return TransientServerForm(
data={
"ip_address": ip_address,
"aux_address": aux_address,
"aux_address_0": aux_address[0],
"aux_address_1": aux_address[1],
"aux_address_2": aux_address[2],
"name": name,
"activity_type": activity_type_id,
"server_role": server_role_id,
"server_provider": server_provider_id,
"note": note,
},
)

def test_duplicate_server_submission(self):
dupe_ip = "1.2.3.4"
dupe_aux_ip = ["1.2.3.5", "1.2.3.4.6", "1.2.3.4.7"]

# Create a base cloud server
vps_server = TransientServerFactory(project=self.project)

# Create a static server with the same IP address and check it out to the project
static_server = StaticServerFactory()
_ = AuxServerAddressFactory(static_server=static_server, ip_address=dupe_ip)
ServerHistoryFactory(server=static_server, project=self.project)

vps_server_dict = vps_server.__dict__.copy()
vps_server_dict["ip_address"] = dupe_ip
vps_server_dict["aux_address"] = dupe_aux_ip
data = self.form_data(**vps_server_dict)

response = self.client_mgr.post(self.uri, data.data)
messages = list(get_messages(response.wsgi_request))

self.assertEqual(response.status_code, 302)
self.assertRedirects(response, self.redirect_uri)
self.assertEqual(len(messages), 2)
self.assertTrue(any(
"Server successfully added to the project." in str(message) for message in
messages))
self.assertTrue(any("You have 2 server(s) that share one or more of the provided IP addresses." in str(message) for message in messages))

# Try to add a cloud server with shared IP addresses
vps_server_dict = vps_server.__dict__.copy()
vps_server_dict["ip_address"] = dupe_ip
data = self.form_data(**vps_server_dict)

response = self.client_mgr.post(self.uri, data.data)
messages = list(get_messages(response.wsgi_request))

self.assertEqual(response.status_code, 302)
self.assertRedirects(response, self.redirect_uri)
self.assertEqual(len(messages), 2)
# The total is now 4 because the cloud server shares an IP with the static server, the original VPS, and the previous test VPS
self.assertTrue(any(
"You have 4 server(s) that share one or more of the provided IP addresses." in str(message) for message in
messages))

class TransientServerUpdateViewTests(TestCase):
"""Collection of tests for :view:`shepherd.TransientServerUpdate`."""
Expand Down
50 changes: 24 additions & 26 deletions ghostwriter/shepherd/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,28 @@ def get_context_data(self, **kwargs):
return ctx


def check_duplicate_ip(request, project, ip_address, aux_address):
"""Check for duplicate IP addresses in other servers and cloud servers."""
# Put all IP addresses into a list to cover checking any appearances (as the IP or an aux address)
all_ips = [ip_address]
for address in aux_address:
all_ips.append(address)

# Query servers that share one or more of the IPs in the list
cloud_servers = TransientServer.objects.filter(Q(project=project) & Q(ip_address__in=all_ips) | Q(aux_address__overlap=all_ips))
static_servers = ServerHistory.objects.filter(
Q(project=project) & (Q(server__ip_address__in=all_ips) | Q(server__auxserveraddress__ip_address__in=all_ips))
)

# Add a warning to the request if any servers share the IP addresses
sharing_servers = len(cloud_servers) + len(static_servers)
if sharing_servers >= 1:
messages.warning(
request,
f'You have {sharing_servers} server(s) that share one or more of the provided IP addresses.',
)


class TransientServerCreate(RoleBasedAccessControlMixin, CreateView):
"""
Create an individual :model:`shepherd.TransientServer`.
Expand Down Expand Up @@ -1703,18 +1725,7 @@ def form_valid(self, form):
obj.project = self.project
obj.operator = self.request.user
obj.save()
cloud_servers = TransientServer.objects.filter(project=self.project, ip_address=obj.ip_address)
if len(cloud_servers) > 1:
messages.warning(
self.request,
f'You have {len(cloud_servers)} cloud servers sharing the IP address "{obj.ip_address}" for this project.',
)
static_servers = StaticServer.objects.filter(project=self.project, ip_address=obj.ip_address)
if len(static_servers) > 1:
messages.warning(
self.request,
f'You have checked out server that shares the provided IP address "{obj.ip_address}" on this project.',
)
check_duplicate_ip(self.request, self.project, obj.ip_address, obj.aux_address)
return super().form_valid(form)

def get_context_data(self, **kwargs):
Expand Down Expand Up @@ -1766,20 +1777,7 @@ def get_context_data(self, **kwargs):
return ctx

def form_valid(self, form):
cloud_servers = TransientServer.objects.filter(project=self.object.project, ip_address=self.object.ip_address)
if len(cloud_servers) > 1:
messages.warning(
self.request,
f'You have {len(cloud_servers)} cloud servers sharing the IP address "{self.object.ip_address}" for this project',
)
static_servers = ServerHistory.objects.filter(
project=self.object.project, server__ip_address=self.object.ip_address
)
if len(static_servers) > 1:
messages.warning(
self.request,
f'You have checked out server that shares the provided IP address "{self.object.ip_address}" on this project.',
)
check_duplicate_ip(self.request, self.object.project, self.object.ip_address, self.object.aux_address)
return super().form_valid(form)


Expand Down

0 comments on commit ef58d5f

Please sign in to comment.