1
0
Fork 0
mirror of https://github.com/seanmorley15/AdventureLog.git synced 2025-07-19 12:59:36 +02:00
AdventureLog/backend/server/adventures/views/collection_view.py

225 lines
9.8 KiB
Python
Raw Normal View History

from django.db.models import Q
from django.db.models.functions import Lower
from django.db import transaction
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from adventures.models import Collection, Adventure, Transportation, Note
from adventures.permissions import CollectionShared
from adventures.serializers import CollectionSerializer
from users.models import CustomUser as User
from adventures.utils import pagination
class CollectionViewSet(viewsets.ModelViewSet):
serializer_class = CollectionSerializer
permission_classes = [CollectionShared]
pagination_class = pagination.StandardResultsSetPagination
# def get_queryset(self):
# return Collection.objects.filter(Q(user_id=self.request.user.id) & Q(is_archived=False))
def apply_sorting(self, queryset):
order_by = self.request.query_params.get('order_by', 'name')
order_direction = self.request.query_params.get('order_direction', 'asc')
valid_order_by = ['name', 'updated_at', 'start_date']
if order_by not in valid_order_by:
order_by = 'updated_at'
if order_direction not in ['asc', 'desc']:
order_direction = 'asc'
# Apply case-insensitive sorting for the 'name' field
if order_by == 'name':
queryset = queryset.annotate(lower_name=Lower('name'))
ordering = 'lower_name'
if order_direction == 'desc':
ordering = f'-{ordering}'
elif order_by == 'start_date':
ordering = 'start_date'
if order_direction == 'asc':
ordering = 'start_date'
else:
ordering = '-start_date'
else:
order_by == 'updated_at'
ordering = 'updated_at'
if order_direction == 'asc':
ordering = '-updated_at'
#print(f"Ordering by: {ordering}") # For debugging
return queryset.order_by(ordering)
def list(self, request, *args, **kwargs):
# make sure the user is authenticated
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
queryset = Collection.objects.filter(user_id=request.user.id)
queryset = self.apply_sorting(queryset)
collections = self.paginate_and_respond(queryset, request)
return collections
@action(detail=False, methods=['get'])
def all(self, request):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
queryset = Collection.objects.filter(
Q(user_id=request.user.id)
)
queryset = self.apply_sorting(queryset)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@action(detail=False, methods=['get'])
def archived(self, request):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
queryset = Collection.objects.filter(
Q(user_id=request.user.id) & Q(is_archived=True)
)
queryset = self.apply_sorting(queryset)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# this make the is_public field of the collection cascade to the adventures
@transaction.atomic
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
if 'collection' in serializer.validated_data:
new_collection = serializer.validated_data['collection']
# if the new collection is different from the old one and the user making the request is not the owner of the new collection return an error
if new_collection != instance.collection and new_collection.user_id != request.user:
return Response({"error": "User does not own the new collection"}, status=400)
# Check if the 'is_public' field is present in the update data
if 'is_public' in serializer.validated_data:
new_public_status = serializer.validated_data['is_public']
# if is_publuc has changed and the user is not the owner of the collection return an error
if new_public_status != instance.is_public and instance.user_id != request.user:
print(f"User {request.user.id} does not own the collection {instance.id} that is owned by {instance.user_id}")
return Response({"error": "User does not own the collection"}, status=400)
# Update associated adventures to match the collection's is_public status
Adventure.objects.filter(collection=instance).update(is_public=new_public_status)
# do the same for transportations
Transportation.objects.filter(collection=instance).update(is_public=new_public_status)
# do the same for notes
Note.objects.filter(collection=instance).update(is_public=new_public_status)
# Log the action (optional)
action = "public" if new_public_status else "private"
print(f"Collection {instance.id} and its adventures were set to {action}")
self.perform_update(serializer)
if getattr(instance, '_prefetched_objects_cache', None):
# If 'prefetch_related' has been applied to a queryset, we need to
# forcibly invalidate the prefetch cache on the instance.
instance._prefetched_objects_cache = {}
return Response(serializer.data)
# make an action to retreive all adventures that are shared with the user
@action(detail=False, methods=['get'])
def shared(self, request):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
queryset = Collection.objects.filter(
shared_with=request.user
)
queryset = self.apply_sorting(queryset)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# Adds a new user to the shared_with field of an adventure
@action(detail=True, methods=['post'], url_path='share/(?P<uuid>[^/.]+)')
def share(self, request, pk=None, uuid=None):
collection = self.get_object()
if not uuid:
return Response({"error": "User UUID is required"}, status=400)
try:
user = User.objects.get(uuid=uuid, public_profile=True)
except User.DoesNotExist:
return Response({"error": "User not found"}, status=404)
if user == request.user:
return Response({"error": "Cannot share with yourself"}, status=400)
if collection.shared_with.filter(id=user.id).exists():
return Response({"error": "Adventure is already shared with this user"}, status=400)
collection.shared_with.add(user)
collection.save()
return Response({"success": f"Shared with {user.username}"})
@action(detail=True, methods=['post'], url_path='unshare/(?P<uuid>[^/.]+)')
def unshare(self, request, pk=None, uuid=None):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=400)
collection = self.get_object()
if not uuid:
return Response({"error": "User UUID is required"}, status=400)
try:
user = User.objects.get(uuid=uuid, public_profile=True)
except User.DoesNotExist:
return Response({"error": "User not found"}, status=404)
if user == request.user:
return Response({"error": "Cannot unshare with yourself"}, status=400)
if not collection.shared_with.filter(id=user.id).exists():
return Response({"error": "Collection is not shared with this user"}, status=400)
collection.shared_with.remove(user)
collection.save()
return Response({"success": f"Unshared with {user.username}"})
def get_queryset(self):
if self.action == 'destroy':
return Collection.objects.filter(user_id=self.request.user.id)
if self.action in ['update', 'partial_update']:
return Collection.objects.filter(
Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)
).distinct()
if self.action == 'retrieve':
if not self.request.user.is_authenticated:
return Collection.objects.filter(is_public=True)
return Collection.objects.filter(
Q(is_public=True) | Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)
).distinct()
# For list action, include collections owned by the user or shared with the user, that are not archived
return Collection.objects.filter(
(Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)) & Q(is_archived=False)
).distinct()
def perform_create(self, serializer):
# This is ok because you cannot share a collection when creating it
serializer.save(user_id=self.request.user)
def paginate_and_respond(self, queryset, request):
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
if page is not None:
serializer = self.get_serializer(page, many=True)
return paginator.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)