Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync new Django admin commands #1821

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions siteapp/management/commands/import_control_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import os.path
import json
import sys

from controls.models import Element
from controls.oscal import CatalogData

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
#from django.db.utils import OperationalError
#from django.conf import settings

class Command(BaseCommand):
help = 'Load Control Catalog into Database'

def add_arguments(self, parser):
parser.add_argument('--username', nargs='?')
parser.add_argument('--catalog_key', nargs='?')
parser.add_argument('--catalog_file', nargs='?')
parser.add_argument('--baseline', nargs='?')
parser.add_argument('--debug', nargs='?')

@transaction.atomic
def handle(self, *args, **options):
"""Load control catalog data into database"""
debug = True
try:
# Get catalog/baseline from command line argparse
catalog_key = options['catalog_key']
catalog_json = options['catalog_file']
baselines_json = options['baseline']
if (options['debug']):
debug = eval(options['debug'])
print(f'Request parameters catalog_key: {catalog_key}, catalog_file: catalog_json, '
f'baseline: baselines_json, debug: {debug}') if debug else False
if catalog_key == None:
raise Exception('No catalog_key specified')
if catalog_json == None:
raise Exception('No catalog_file specified')
if baselines_json == None:
raise Exception('No baseline specified')
except Exception as e:
print(f'Exception: {e}')
exit()

try:
for cf in (catalog_json, baselines_json):
# It's json, but is it really a catalog?
is_json = json.dumps(cf)
if not is_json:
raise Exception(f'{is_json}')
except Exception as e:
print(f'Parsing Exception: {e}')
exit()

try:
catalog, created = CatalogData.objects.get_or_create(
catalog_key = catalog_key,
catalog_json = catalog_json,
baselines_json = baselines_json
)
if created:
print(f"{catalog_key} record created into database")
print(f"CATALOG created: {catalog} ")
else:
print(f"{catalog_key} record found in database")
except Exception as e:
print(f'Catalog Exception: {e}')
exit()
114 changes: 114 additions & 0 deletions siteapp/management/commands/load_app_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
load_app_template.py
Create an initial SSP project using the organization's default template.

Usage: manage.py load_app_template username path/to/template project_name
Example: manage.py load_app_template govready laurasia/JSIG_SSP "System Security Plan"
"""

import sys
import os.path

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.db.utils import OperationalError
from django.conf import settings

from guidedmodules.models import AppSource, Module, AppVersion
from siteapp.models import User, Organization, Portfolio
from django.contrib.auth.management.commands import createsuperuser
from siteapp.models import User, Project, Organization, Portfolio, Folder
from controls.models import System, Element, ElementControl

import fs, fs.errors


class Command(BaseCommand):
help = 'Load Initial SSP Project'

def add_arguments(self, parser):
parser.add_argument('username', nargs='?')
parser.add_argument('template', nargs='?')
parser.add_argument('project_name', nargs='?')

def handle(self, *args, **options):
try:
# Get user from command line argparse
user = User.objects.get(username=options['username'])

# Get user portfolio
portfolio = Portfolio.objects.get(id=user.default_portfolio_id)

# Get SSP project name or use default
project_name = options['project_name']
if not project_name:
new_name = "System Security Plan"
print(f'Using project name default {new_name}')
else:
new_name = project_name

# Verify valid template exists or use default
template = options['template']
if not template:
template = 'laurasia/JSIG_SSP'
template_exists = ()

for app in AppVersion.objects.all():
this_app = str(app.source) + "/" + str(app.appname)
if this_app == template:
template_exists = 1
if not template_exists:
print(f'No such template {template}')
exit()

except Exception as e:
print(f'(Exception: {e}')
exit()

# Check if project already exists
project_exists = ()
for p in Project.objects.all():
if p.title == new_name:
project_exists = 1
existing_project_id = p.id
if project_exists:
print(f'Unique contraint violation. Project #{existing_project_id} with name "{new_name}" already exists.')
# We're done
else:
# Our project does not exist, so load default Project
# Set defaults for testing
self.org = Organization.objects.first()
#print(self.org.slug)
username = user

from loadtesting.web import WebClient
client = WebClient(username, "main")

# Create project
print("Adding project to portfolio: {} (#{}).".format(portfolio.title, portfolio.id))
client.post("/store/{}?portfolio={}".format(template, portfolio.id), {"organization":self.org.slug})
#print(client.response.url)

# Get newly created project
project = Project.objects.get(id=client.response.url.split('/')[2])
print(f'Project created as: {project}')

# Rename project
if project:
# Double check project name does not exist
project_exists = ()
for p in Project.objects.all():
if p.title == new_name:
project_exists = 1
if not project_exists:
project.root_task.title_override = new_name
project.root_task.save()
project.root_task.on_answer_changed()
if project.system is not None:
project.system.root_element.name = new_name
project.system.root_element.save()
print(f"Project renamed to {new_name}.")
else:
print(f'Unique contraint violation. Project with name "{new_name}" already exists. \n\
Not renaming "{project.title}"')
123 changes: 123 additions & 0 deletions siteapp/management/commands/load_component_from_library.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
load_a component from library

Move to controls/management/commands
"""
import sys
import os.path

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, models
from django.db.utils import OperationalError
from django.conf import settings

from guidedmodules.models import AppSource, Module, AppVersion
from siteapp.models import User, Organization, Portfolio
from django.contrib.auth.management.commands import createsuperuser
from siteapp.models import User, Project, Organization, Portfolio, Folder
from controls.models import System, Element, ElementControl, Statement

#schaadm additions
from django.contrib import messages
from controls.enums.statements import StatementTypeEnum
from django.db import transaction


class Command(BaseCommand):
help = 'Load Library Components into SSP'

def add_arguments(self, parser):
parser.add_argument('--username', nargs='?')
parser.add_argument('--component', nargs='?')
parser.add_argument('--project_name', nargs='?')
parser.add_argument('--debug', nargs='?')

@transaction.atomic
def handle(self, *args, **options):
debug = False
try:
# Get user from command line argparse
user = User.objects.get(username=options['username'])
component = options['component']
project_name = options['project_name']
if (options['debug']):
debug = eval(options['debug'])
print(f'Request parameters username: {user}, component: {component}, project_name: {project_name}, debug: {debug}') if debug else False
except Exception as e:
print(f'Exception missing parameter: {e}')
exit()

"""Add an existing element and its statements to a system"""

# extract producer_elment.id and require_approval boolean val
try:
producer_element = Element.objects.filter(name=component).first() # or [0]
if producer_element == None:
raise Exception(component)
except Exception as e:
print(f'Exception: component not found: {e}')
exit()

# Does requested project match an existing project name?
try:
project = Project.objects.filter(system__root_element__name=project_name).first()
system = project.system
#system = System.objects.get(pk=system_id)
except Exception as e:
print(f'Exception finding project name: {e}')
exit()

# Does user have permission to add element?
# Check user permissions
try:
project_member = False
members = User.objects.filter(projectmembership__project=project)
for member in members:
print(f'Comparing project membership for {member.username}:{user}') if debug else False
if member.username == user.username:
project_member = member.username
if not project_member:
raise Exception(user.username)
except Exception as e:
print(f"Forbidden: user is not a member of project. {e}")
exit()

# DEBUG
print(f"Atempting to add {producer_element.name} (id:{producer_element.id}) to system_id {system.id}") if debug else False

# Get system's existing components selected
elements_selected = system.producer_elements
elements_selected_ids = [e.id for e in elements_selected]

# Add element to system's selected components
# Look up the element rto add
# producer_element = Element.objects.get(pk=producer_element_id)

# Component already added to system. Do not add the component (element) to the system again.
if producer_element.id in elements_selected_ids:
print(f'Component "{producer_element.name}" already exists in selected components.')
exit()

smts = Statement.objects.filter(producer_element_id = producer_element.id, statement_type=StatementTypeEnum.CONTROL_IMPLEMENTATION_PROTOTYPE.name)

# Component does not have any statements of type control_implementation_prototype to
# add to system. So we cannot add the component (element) to the system.
if len(smts) == 0:
print(f"Add component error: {producer_element.name} does not have any control implementation statements.")
exit()

# Loop through all element's prototype statements and add to control implementation statements.
# System's selected controls will filter what controls and control statements to display.
for smt in smts:
smt.create_system_control_smt_from_component_prototype_smt(system.root_element.id)

# Make sure some controls were added to the system. Report error otherwise.
smts_added = Statement.objects.filter(producer_element_id = producer_element.id, consumer_element_id = system.root_element.id, statement_type=StatementTypeEnum.CONTROL_IMPLEMENTATION.name)

smts_added_count = len(smts_added)
if smts_added_count > 0:
print(f'Added "{producer_element.name}" and its {smts_added_count} control implementation statements to the system.')
else:
print(f'Error: 0 controls added for component "{producer_element.name}".')

Loading