Skip to content

Commit

Permalink
feat[bckend-RBAS-ABAS]:Added Role based permission system and integra…
Browse files Browse the repository at this point in the history
…ted action based access for services.
  • Loading branch information
shikharpa committed May 13, 2024
1 parent 1454f00 commit 4c1b161
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 41 deletions.
2 changes: 1 addition & 1 deletion services/api/db/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class ProfileAPIView(APIView):
permission_classes = [IsAuthenticated, HasRolePermission]
permission_name = 'calender'
service_name = 'blogs'

def get(self, request):
try:
Expand Down
77 changes: 62 additions & 15 deletions services/api/permission/models.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,95 @@
from django.db import models

class CustomPermission(models.Model):

class AssociatedViews(models.Model):
name = models.CharField(max_length=100, unique=True)
def __str__(self):
return self.name
class Meta:
verbose_name = 'AssociatedViews'
verbose_name_plural = 'AssociatedViews'
db_table = "AssociatedViews"

class Service(models.Model):
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')
parent_service = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')

def __str__(self):
return self.name

class Meta:
verbose_name = 'Custom Permission'
verbose_name_plural = 'Custom Permissions'
db_table = "custom_permission"
verbose_name = 'Service'
verbose_name_plural = 'Services'
db_table = "service"

def get_all_children(self):
children = list(self.children.all())
for child in self.children.all():
children.extend(child.get_all_children())
return children

def get_all_parents(self):
parents = []
parent = self.parent_service
while parent is not None:
parents.insert(0, parent)
parent = parent.parent_service
parents.insert(0, self) # Insert the current service at the beginning
return parents

class CustomPermission(models.Model):
name = models.CharField(max_length=100, unique=True)
service = models.ForeignKey(Service, on_delete=models.CASCADE)
AssociatedViews = models.ManyToManyField(AssociatedViews, blank=True)

def __str__(self):
return self.name

class Meta:
verbose_name = 'CustomPermission'
verbose_name_plural = 'CustomPermissions'
db_table = "CustomPermission"

def add_AssociatedViews(self, AccessViews):
for AccessView in AccessViews:
if AssociatedViews.objects.filter(name=AccessView).exists():
method = AssociatedViews.objects.get(name=AccessView)
self.AssociatedViews.add(method)
else:
raise ValueError("name of class view does not exist in code base.")

class Role(models.Model):
name = models.CharField(max_length=100, unique=True)
services = models.ManyToManyField(Service, blank=True)
custom_permissions = models.ManyToManyField(CustomPermission, blank=True)

def __str__(self):
return self.name

def add_custom_permission(self, permission_data):
for permission in permission_data:
permission_name = permission.get('name')
for permission_name in permission_data:
if CustomPermission.objects.filter(name=permission_name).exists():
permission = CustomPermission.objects.get(name=permission_name)
self.custom_permissions.add(permission)
else:
raise ValueError("Custom Permission does not exist.")

def add_services(self, services_data):
for service_name in services_data:
if Service.objects.filter(name=service_name).exists():
service = Service.objects.get(name=service_name)
self.services.add(service)
else:
raise ValueError("Service does not exist.")

def has_permission_recursive(self, permission_name):
# Check if the current role has the permission
if self.custom_permissions.filter(name=permission_name).exists():
def has_service_recursive(self, service_name):
# Check if the current role has the services
if self.services.filter(name=service_name).exists():
return True
# Check permissions from children roles
for child_permission in self.custom_permissions.all():
for child in child_permission.get_all_children():
if child.name == permission_name:
# Check services from children roles
for child_service in self.services.all():
for child in child_service.get_all_children():
if child.name == service_name:
return True
return False

Expand Down
31 changes: 24 additions & 7 deletions services/api/permission/permission.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
from rest_framework.permissions import BasePermission
from rest_framework.exceptions import PermissionDenied

from permission.models import Service
class HasRolePermission(BasePermission):
"""
Custom permission class that checks if a user has a specific role permission.
"""
def has_permission(self, request, view):
permission_name = getattr(view, 'permission_name', None)
if not permission_name:
raise ValueError("Permission name not provided in the view.")
view_class_name = view.__class__.__name__
service_name = getattr(view, 'service_name', None)
if not service_name:
return True
if request.user.is_anonymous:
return False
if request.user.role and not request.user.role.has_permission_recursive(permission_name):
raise PermissionDenied("You do not have permission to access this view.")
return True
if request.user.role:
role = request.user.role
if role.has_service_recursive(service_name):
service = Service.objects.get(name=service_name)
parent_services = service.get_all_parents()
for parent_service in parent_services:
# Check if the requested view is in role.custom_permissions.AssociatedViews
for permission in role.custom_permissions.filter(service__name=parent_service.name):
for AssociatedView in permission.AssociatedViews.all():
if view_class_name == AssociatedView.name:
return True
return False

def has_object_permission(self, request, view, obj):
# Check if the object has an owner field
if not hasattr(obj, 'owner'):
return False
# Allow access only if the request user is the object's owner
return obj.owner == request.user or request.user.is_superuser
40 changes: 30 additions & 10 deletions services/api/permission/serializers.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
from rest_framework import serializers
from django.contrib.auth.models import User
from permission.models import Role, CustomPermission
from permission.models import Role, CustomPermission, Service, AssociatedViews

class CustomPermissionSerializer(serializers.ModelSerializer):
parent_name = serializers.CharField(write_only=True, required=False)
class AssociatedViewsSerializer(serializers.ModelSerializer):
class Meta:
model = AssociatedViews
fields = '__all__'

class ServiceSerializer(serializers.ModelSerializer):
parent_service = serializers.CharField(write_only=True, required=False)

class Meta:
model = CustomPermission
fields = ['id', 'name', 'description', 'parent_name']
model = Service
fields = '__all__'

def create(self, validated_data):
parent_name = validated_data.pop('parent_name', None)
if parent_name:
parent = CustomPermission.objects.get(name=parent_name)
validated_data['parent'] = parent
parent_service = validated_data.pop('parent_service', None)
if parent_service:
parent = Service.objects.get(name=parent_service)
validated_data['parent_service'] = parent
return super().create(validated_data)

class CustomPermissionSerializer(serializers.ModelSerializer):
service = serializers.CharField(write_only=True, required=False)
AssociatedViews = AssociatedViewsSerializer(many=True, required=False, allow_empty=True)
class Meta:
model = CustomPermission
fields = '__all__'

def create(self, validated_data):
service = validated_data.pop('service', None)
if service:
service = Service.objects.get(name=service)
validated_data['service'] = service
return super().create(validated_data)

class RoleSerializer(serializers.ModelSerializer):
services = ServiceSerializer(many=True, required=False, allow_empty=True)
custom_permissions = CustomPermissionSerializer(many=True, required=False, allow_empty=True)

class Meta:
model = Role
fields = ('id', 'name', 'custom_permissions')
fields = '__all__'

6 changes: 5 additions & 1 deletion services/api/permission/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from django.urls import path
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView, ServiceViewSet, AssociatedViewsViewSet

urlpatterns = [
path('permissions/', CustomPermissionViewSet.as_view({'get': 'list', 'post': 'create'}), name='permissions-list'),
path('permissions/<int:pk>/', CustomPermissionViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='permission-detail'),
path('roles/', RoleViewSet.as_view({'get': 'list', 'post': 'create'}), name='roles-list'),
path('roles/<int:pk>/', RoleViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='role-detail'),
path('services/', ServiceViewSet.as_view({'get': 'list', 'post': 'create'}), name='services-list'),
path('services/<int:pk>/', ServiceViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='service-detail'),
path('AccessControlViews/', AssociatedViewsViewSet.as_view({'get': 'list', 'post': 'create'}), name='http-methods-list'),
path('AccessControlViews/<int:pk>/', AssociatedViewsViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='http-method-detail'),
path('users/<int:user_id>/assign-role/', AssignRoleToUserView.as_view(), name='assign-role')
]
53 changes: 46 additions & 7 deletions services/api/permission/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,81 @@
from rest_framework.views import APIView
from django.db import transaction
from db.models import User
from permission.models import CustomPermission, Role
from permission.serializers import CustomPermissionSerializer, RoleSerializer
from permission.models import CustomPermission, Role, AssociatedViews, Service
from permission.serializers import CustomPermissionSerializer, RoleSerializer, AssociatedViewsSerializer, ServiceSerializer
from db.serializers import UserProfileSerialiser

class AssociatedViewsViewSet(viewsets.ModelViewSet):
queryset = AssociatedViews.objects.all()
serializer_class = AssociatedViewsSerializer

class ServiceViewSet(viewsets.ModelViewSet):
queryset = Service.objects.all()
serializer_class = ServiceSerializer

class CustomPermissionViewSet(viewsets.ModelViewSet):
queryset = CustomPermission.objects.all()
serializer_class = CustomPermissionSerializer
# permission_classes = [IsAdminUser] # Only authenticated users can manage permissions
def create(self, request, *args, **kwargs):
AccessViews = request.data.pop('AccessViews', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
role = serializer.save()
role.add_AssociatedViews(AccessViews)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
AccessViews = request.data.pop('AccessViews', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_AssociatedViews(AccessViews)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class RoleViewSet(viewsets.ModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer

def create(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
role = serializer.save()
role.add_custom_permission(custom_permissions_data)
role.add_custom_permission(permissions_data)
role.add_services(services_data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_custom_permission(custom_permissions_data)
instance.add_custom_permission(permissions_data)
instance.add_services(services_data)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
Expand Down

0 comments on commit 4c1b161

Please sign in to comment.