mirror of
https://github.com/seanmorley15/AdventureLog.git
synced 2025-07-23 14:59:36 +02:00
Commit 937c3c6a68
introduced a Python
syntax error, breaking the server. This fixes the issue by restoring the
probably accidental removal of one line of code.
224 lines
9.8 KiB
Python
224 lines
9.8 KiB
Python
from django.db.models import Q
|
|
from django.db.models.functions import Lower
|
|
from django.db import transaction
|
|
from rest_framework import viewsets
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from adventures.models import Collection, Adventure, Transportation, Note
|
|
from adventures.permissions import CollectionShared
|
|
from adventures.serializers import CollectionSerializer
|
|
from users.models import CustomUser as User
|
|
from adventures.utils import pagination
|
|
|
|
class CollectionViewSet(viewsets.ModelViewSet):
|
|
serializer_class = CollectionSerializer
|
|
permission_classes = [CollectionShared]
|
|
pagination_class = pagination.StandardResultsSetPagination
|
|
|
|
# def get_queryset(self):
|
|
# return Collection.objects.filter(Q(user_id=self.request.user.id) & Q(is_archived=False))
|
|
|
|
def apply_sorting(self, queryset):
|
|
order_by = self.request.query_params.get('order_by', 'name')
|
|
order_direction = self.request.query_params.get('order_direction', 'asc')
|
|
|
|
valid_order_by = ['name', 'updated_at', 'start_date']
|
|
if order_by not in valid_order_by:
|
|
order_by = 'updated_at'
|
|
|
|
if order_direction not in ['asc', 'desc']:
|
|
order_direction = 'asc'
|
|
|
|
# Apply case-insensitive sorting for the 'name' field
|
|
if order_by == 'name':
|
|
queryset = queryset.annotate(lower_name=Lower('name'))
|
|
ordering = 'lower_name'
|
|
if order_direction == 'desc':
|
|
ordering = f'-{ordering}'
|
|
elif order_by == 'start_date':
|
|
ordering = 'start_date'
|
|
if order_direction == 'asc':
|
|
ordering = 'start_date'
|
|
else:
|
|
ordering = '-start_date'
|
|
else:
|
|
order_by == 'updated_at'
|
|
ordering = 'updated_at'
|
|
if order_direction == 'asc':
|
|
ordering = '-updated_at'
|
|
|
|
#print(f"Ordering by: {ordering}") # For debugging
|
|
|
|
return queryset.order_by(ordering)
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
# make sure the user is authenticated
|
|
if not request.user.is_authenticated:
|
|
return Response({"error": "User is not authenticated"}, status=400)
|
|
queryset = Collection.objects.filter(user_id=request.user.id)
|
|
queryset = self.apply_sorting(queryset)
|
|
collections = self.paginate_and_respond(queryset, request)
|
|
return collections
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def all(self, request):
|
|
if not request.user.is_authenticated:
|
|
return Response({"error": "User is not authenticated"}, status=400)
|
|
|
|
queryset = Collection.objects.filter(
|
|
Q(user_id=request.user.id)
|
|
)
|
|
|
|
queryset = self.apply_sorting(queryset)
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
|
|
return Response(serializer.data)
|
|
|
|
@action(detail=False, methods=['get'])
|
|
def archived(self, request):
|
|
if not request.user.is_authenticated:
|
|
return Response({"error": "User is not authenticated"}, status=400)
|
|
|
|
queryset = Collection.objects.filter(
|
|
Q(user_id=request.user.id) & Q(is_archived=True)
|
|
)
|
|
|
|
queryset = self.apply_sorting(queryset)
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
|
|
return Response(serializer.data)
|
|
|
|
# this make the is_public field of the collection cascade to the adventures
|
|
@transaction.atomic
|
|
def update(self, request, *args, **kwargs):
|
|
partial = kwargs.pop('partial', False)
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance, data=request.data, partial=partial)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
if 'collection' in serializer.validated_data:
|
|
new_collection = serializer.validated_data['collection']
|
|
# if the new collection is different from the old one and the user making the request is not the owner of the new collection return an error
|
|
if new_collection != instance.collection and new_collection.user_id != request.user:
|
|
return Response({"error": "User does not own the new collection"}, status=400)
|
|
|
|
# Check if the 'is_public' field is present in the update data
|
|
if 'is_public' in serializer.validated_data:
|
|
new_public_status = serializer.validated_data['is_public']
|
|
|
|
# if is_publuc has changed and the user is not the owner of the collection return an error
|
|
if new_public_status != instance.is_public and instance.user_id != request.user:
|
|
print(f"User {request.user.id} does not own the collection {instance.id} that is owned by {instance.user_id}")
|
|
return Response({"error": "User does not own the collection"}, status=400)
|
|
|
|
# Update associated adventures to match the collection's is_public status
|
|
Adventure.objects.filter(collection=instance).update(is_public=new_public_status)
|
|
|
|
# do the same for transportations
|
|
Transportation.objects.filter(collection=instance).update(is_public=new_public_status)
|
|
|
|
# do the same for notes
|
|
Note.objects.filter(collection=instance).update(is_public=new_public_status)
|
|
|
|
# Log the action (optional)
|
|
action = "public" if new_public_status else "private"
|
|
print(f"Collection {instance.id} and its adventures were set to {action}")
|
|
|
|
self.perform_update(serializer)
|
|
|
|
if getattr(instance, '_prefetched_objects_cache', None):
|
|
# If 'prefetch_related' has been applied to a queryset, we need to
|
|
# forcibly invalidate the prefetch cache on the instance.
|
|
instance._prefetched_objects_cache = {}
|
|
|
|
return Response(serializer.data)
|
|
|
|
# make an action to retreive all adventures that are shared with the user
|
|
@action(detail=False, methods=['get'])
|
|
def shared(self, request):
|
|
if not request.user.is_authenticated:
|
|
return Response({"error": "User is not authenticated"}, status=400)
|
|
queryset = Collection.objects.filter(
|
|
shared_with=request.user
|
|
)
|
|
queryset = self.apply_sorting(queryset)
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
# Adds a new user to the shared_with field of an adventure
|
|
@action(detail=True, methods=['post'], url_path='share/(?P<uuid>[^/.]+)')
|
|
def share(self, request, pk=None, uuid=None):
|
|
collection = self.get_object()
|
|
if not uuid:
|
|
return Response({"error": "User UUID is required"}, status=400)
|
|
try:
|
|
user = User.objects.get(uuid=uuid, public_profile=True)
|
|
except User.DoesNotExist:
|
|
return Response({"error": "User not found"}, status=404)
|
|
|
|
if user == request.user:
|
|
return Response({"error": "Cannot share with yourself"}, status=400)
|
|
|
|
if collection.shared_with.filter(id=user.id).exists():
|
|
return Response({"error": "Adventure is already shared with this user"}, status=400)
|
|
|
|
collection.shared_with.add(user)
|
|
collection.save()
|
|
return Response({"success": f"Shared with {user.username}"})
|
|
|
|
@action(detail=True, methods=['post'], url_path='unshare/(?P<uuid>[^/.]+)')
|
|
def unshare(self, request, pk=None, uuid=None):
|
|
if not request.user.is_authenticated:
|
|
return Response({"error": "User is not authenticated"}, status=400)
|
|
collection = self.get_object()
|
|
if not uuid:
|
|
return Response({"error": "User UUID is required"}, status=400)
|
|
try:
|
|
user = User.objects.get(uuid=uuid, public_profile=True)
|
|
except User.DoesNotExist:
|
|
return Response({"error": "User not found"}, status=404)
|
|
|
|
if user == request.user:
|
|
return Response({"error": "Cannot unshare with yourself"}, status=400)
|
|
|
|
if not collection.shared_with.filter(id=user.id).exists():
|
|
return Response({"error": "Collection is not shared with this user"}, status=400)
|
|
|
|
collection.shared_with.remove(user)
|
|
collection.save()
|
|
return Response({"success": f"Unshared with {user.username}"})
|
|
|
|
def get_queryset(self):
|
|
if self.action == 'destroy':
|
|
return Collection.objects.filter(user_id=self.request.user.id)
|
|
|
|
if self.action in ['update', 'partial_update']:
|
|
return Collection.objects.filter(
|
|
Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)
|
|
).distinct()
|
|
|
|
if self.action == 'retrieve':
|
|
if not self.request.user.is_authenticated:
|
|
return Collection.objects.filter(is_public=True)
|
|
return Collection.objects.filter(
|
|
Q(is_public=True) | Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)
|
|
).distinct()
|
|
|
|
# For list action, include collections owned by the user or shared with the user, that are not archived
|
|
return Collection.objects.filter(
|
|
(Q(user_id=self.request.user.id) | Q(shared_with=self.request.user)) & Q(is_archived=False)
|
|
).distinct()
|
|
|
|
|
|
def perform_create(self, serializer):
|
|
# This is ok because you cannot share a collection when creating it
|
|
serializer.save(user_id=self.request.user)
|
|
|
|
def paginate_and_respond(self, queryset, request):
|
|
paginator = self.pagination_class()
|
|
page = paginator.paginate_queryset(queryset, request)
|
|
if page is not None:
|
|
serializer = self.get_serializer(page, many=True)
|
|
return paginator.get_paginated_response(serializer.data)
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|