1
0
Fork 0
mirror of https://github.com/seanmorley15/AdventureLog.git synced 2025-07-24 23:39:37 +02:00

feat: enhance transportation card and modal with image handling

- Added CardCarousel component to TransportationCard for image display.
- Implemented privacy indicator with Eye and EyeOff icons.
- Introduced image upload functionality in TransportationModal, allowing users to upload multiple images.
- Added image management features: remove image and set primary image.
- Updated Transportation and Location types to include images as ContentImage array.
- Enhanced UI for image upload and display in modal, including selected images preview and current images management.
This commit is contained in:
Sean Morley 2025-07-14 18:57:39 -04:00
parent ba162175fe
commit 7a61ba2d22
19 changed files with 3181 additions and 1549 deletions

View file

@ -64,47 +64,138 @@ class CollectionShared(permissions.BasePermission):
class IsOwnerOrSharedWithFullAccess(permissions.BasePermission):
"""
Full access for owners and users shared via collections,
read-only for others if public.
Permission class that provides access control based on ownership and sharing.
Access Rules:
- Object owners have full access (read/write)
- Users shared via collections have full access (read/write)
- Collection owners have full access to objects in their collections
- Users with direct sharing have full access
- Anonymous users get read-only access to public objects
- Authenticated users get read-only access to public objects
Supports multiple sharing patterns:
- obj.collections (many-to-many collections)
- obj.collection (single collection foreign key)
- obj.shared_with (direct sharing many-to-many)
- obj.is_public (public access flag)
"""
def has_object_permission(self, request, view, obj):
"""
Check if the user has permission to access the object.
Args:
request: The HTTP request
view: The view being accessed
obj: The object being accessed
Returns:
bool: True if access is granted, False otherwise
"""
user = request.user
is_safe_method = request.method in permissions.SAFE_METHODS
# Anonymous users only get read access to public objects
if not user or not user.is_authenticated:
return request.method in permissions.SAFE_METHODS and obj.is_public
# If safe method (read), allow if:
if request.method in permissions.SAFE_METHODS:
if obj.is_public:
return True
if obj.user == user:
return True
# If user in shared_with of any collection related to obj
if hasattr(obj, 'collections') and obj.collections.filter(shared_with=user).exists():
return True
# **FIX: Check if user OWNS any collection that contains this object**
if hasattr(obj, 'collections') and obj.collections.filter(user=user).exists():
return True
if hasattr(obj, 'collection') and obj.collection and obj.collection.shared_with.filter(id=user.id).exists():
return True
if hasattr(obj, 'collection') and obj.collection and obj.collection.user == user:
return True
if hasattr(obj, 'shared_with') and obj.shared_with.filter(id=user.id).exists():
return True
return False
# For write methods, allow if owner or shared user
if obj.user == user:
return is_safe_method and getattr(obj, 'is_public', False)
# Owner always has full access
if self._is_owner(obj, user):
return True
if hasattr(obj, 'collections') and obj.collections.filter(shared_with=user).exists():
# Check collection-based access (both ownership and sharing)
if self._has_collection_access(obj, user):
return True
# **FIX: Allow write access if user owns any collection containing this object**
if hasattr(obj, 'collections') and obj.collections.filter(user=user).exists():
# Check direct sharing
if self._has_direct_sharing_access(obj, user):
return True
if hasattr(obj, 'collection') and obj.collection and obj.collection.shared_with.filter(id=user.id).exists():
# For safe methods, check if object is public
if is_safe_method and getattr(obj, 'is_public', False):
return True
if hasattr(obj, 'collection') and obj.collection and obj.collection.user == user:
return True
if hasattr(obj, 'shared_with') and obj.shared_with.filter(id=user.id).exists():
return True
return False
return False
def _is_owner(self, obj, user):
"""
Check if the user is the owner of the object.
Args:
obj: The object to check
user: The user to check ownership for
Returns:
bool: True if user owns the object
"""
return hasattr(obj, 'user') and obj.user == user
def _has_collection_access(self, obj, user):
"""
Check if user has access via collections (either as owner or shared user).
Handles both many-to-many collections and single collection foreign keys.
Args:
obj: The object to check
user: The user to check access for
Returns:
bool: True if user has collection-based access
"""
# Check many-to-many collections (obj.collections)
if hasattr(obj, 'collections'):
collections = obj.collections.all()
if collections.exists():
# User is shared with any collection containing this object
if collections.filter(shared_with=user).exists():
return True
# User owns any collection containing this object
if collections.filter(user=user).exists():
return True
# Check single collection foreign key (obj.collection)
if hasattr(obj, 'collection') and obj.collection:
collection = obj.collection
# User is shared with the collection
if hasattr(collection, 'shared_with') and collection.shared_with.filter(id=user.id).exists():
return True
# User owns the collection
if hasattr(collection, 'user') and collection.user == user:
return True
return False
def _has_direct_sharing_access(self, obj, user):
"""
Check if user has direct sharing access to the object.
Args:
obj: The object to check
user: The user to check access for
Returns:
bool: True if user has direct sharing access
"""
return (hasattr(obj, 'shared_with') and
obj.shared_with.filter(id=user.id).exists())
def has_permission(self, request, view):
"""
Check if the user has permission to access the view.
This is called before has_object_permission and provides a way to
deny access at the view level (e.g., for unauthenticated users).
Args:
request: The HTTP request
view: The view being accessed
Returns:
bool: True if access is granted at the view level
"""
# Allow authenticated users and anonymous users for safe methods
# Individual object permissions are handled in has_object_permission
return (request.user and request.user.is_authenticated) or \
request.method in permissions.SAFE_METHODS

View file

@ -128,6 +128,7 @@ class LocationSerializer(CustomModelSerializer):
def validate_collections(self, collections):
"""Validate that collections are compatible with the location being created/updated"""
if not collections:
return collections
@ -136,17 +137,54 @@ class LocationSerializer(CustomModelSerializer):
# Get the location being updated (if this is an update operation)
location_owner = getattr(self.instance, 'user', None) if self.instance else user
for collection in collections:
# For updates, we need to check if collections are being added or removed
current_collections = set(self.instance.collections.all()) if self.instance else set()
new_collections_set = set(collections)
collections_to_add = new_collections_set - current_collections
collections_to_remove = current_collections - new_collections_set
# Validate collections being added
for collection in collections_to_add:
# Check if user has permission to use this collection
if collection.user != user and not collection.shared_with.filter(id=user.id).exists():
user_has_shared_access = collection.shared_with.filter(id=user.id).exists()
if collection.user != user and not user_has_shared_access:
raise serializers.ValidationError(
f"Collection '{collection.name}' does not belong to the current user."
f"The requested collection does not belong to the current user."
)
# Check if the location owner is compatible with the collection
if collection.user != location_owner and not collection.shared_with.filter(id=location_owner.id).exists():
# Check location owner compatibility - both directions
if collection.user != location_owner:
# If user owns the collection but not the location, location owner must have shared access
if collection.user == user:
location_owner_has_shared_access = collection.shared_with.filter(id=location_owner.id).exists() if location_owner else False
if not location_owner_has_shared_access:
raise serializers.ValidationError(
f"Locations must be associated with collections owned by the same user or shared collections. Collection owner: {collection.user.username} Location owner: {location_owner.username if location_owner else 'None'}"
)
# If using someone else's collection, location owner must have shared access
else:
location_owner_has_shared_access = collection.shared_with.filter(id=location_owner.id).exists() if location_owner else False
if not location_owner_has_shared_access:
raise serializers.ValidationError(
"Location cannot be added to collection unless the location owner has shared access to the collection."
)
# Validate collections being removed - allow if user owns the collection OR owns the location
for collection in collections_to_remove:
user_owns_collection = collection.user == user
user_owns_location = location_owner == user if location_owner else False
user_has_shared_access = collection.shared_with.filter(id=user.id).exists()
if not (user_owns_collection or user_owns_location or user_has_shared_access):
raise serializers.ValidationError(
f"Location owned by '{location_owner.username}' cannot be added to collection '{collection.name}' owned by '{collection.user.username}' unless the location owner has shared access to the collection."
"You don't have permission to remove this location from one of the collections it's linked to."
)
return collections
@ -267,6 +305,7 @@ class LocationSerializer(CustomModelSerializer):
class TransportationSerializer(CustomModelSerializer):
distance = serializers.SerializerMethodField()
images = serializers.SerializerMethodField()
class Meta:
model = Transportation
@ -275,10 +314,15 @@ class TransportationSerializer(CustomModelSerializer):
'link', 'date', 'flight_number', 'from_location', 'to_location',
'is_public', 'collection', 'created_at', 'updated_at', 'end_date',
'origin_latitude', 'origin_longitude', 'destination_latitude', 'destination_longitude',
'start_timezone', 'end_timezone', 'distance'
'start_timezone', 'end_timezone', 'distance', 'images'
]
read_only_fields = ['id', 'created_at', 'updated_at', 'user', 'distance']
def get_images(self, obj):
serializer = ContentImageSerializer(obj.images.all(), many=True, context=self.context)
# Filter out None values from the serialized data
return [image for image in serializer.data if image is not None]
def get_distance(self, obj):
if (
obj.origin_latitude and obj.origin_longitude and
@ -293,16 +337,22 @@ class TransportationSerializer(CustomModelSerializer):
return None
class LodgingSerializer(CustomModelSerializer):
images = serializers.SerializerMethodField()
class Meta:
model = Lodging
fields = [
'id', 'user', 'name', 'description', 'rating', 'link', 'check_in', 'check_out',
'reservation_number', 'price', 'latitude', 'longitude', 'location', 'is_public',
'collection', 'created_at', 'updated_at', 'type', 'timezone'
'reservation_number', 'price', 'latitude', 'longitude', 'location', 'is_public',
'collection', 'created_at', 'updated_at', 'type', 'timezone', 'images'
]
read_only_fields = ['id', 'created_at', 'updated_at', 'user']
def get_images(self, obj):
serializer = ContentImageSerializer(obj.images.all(), many=True, context=self.context)
# Filter out None values from the serialized data
return [image for image in serializer.data if image is not None]
class NoteSerializer(CustomModelSerializer):
class Meta:

View file

@ -1,5 +1,7 @@
from adventures.models import ContentImage, ContentAttachment
from adventures.models import Visit
protected_paths = ['images/', 'attachments/']
def checkFilePermission(fileId, user, mediaType):
@ -14,7 +16,14 @@ def checkFilePermission(fileId, user, mediaType):
# Get the content object (could be Location, Transportation, Note, etc.)
content_object = content_image.content_object
# handle differently when content_object is a Visit, get the location instead
if isinstance(content_object, Visit):
# check visit.location
if content_object.location:
# continue with the location check
content_object = content_object.location
# Check if content object is public
if hasattr(content_object, 'is_public') and content_object.is_public:
return True

View file

@ -15,6 +15,7 @@ from rest_framework.response import Response
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import IsAuthenticated
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from adventures.models import (
Location, Collection, Transportation, Note, Checklist, ChecklistItem,
@ -128,7 +129,7 @@ class BackupViewSet(viewsets.ViewSet):
image_data = {
'immich_id': image.immich_id,
'is_primary': image.is_primary,
'filename': None
'filename': None,
}
if image.image:
image_data['filename'] = image.image.name.split('/')[-1]
@ -348,7 +349,7 @@ class BackupViewSet(viewsets.ViewSet):
# Delete location-related data
user.contentimage_set.all().delete()
user.attachment_set.all().delete()
user.contentattachment_set.all().delete()
# Visits are deleted via cascade when locations are deleted
user.location_set.all().delete()
@ -468,7 +469,7 @@ class BackupViewSet(viewsets.ViewSet):
)
location.save(_skip_geocode=True) # Skip geocoding for now
# Add to collections using export_ids
# Add to collections using export_ids - MUST be done after save()
for collection_export_id in adv_data.get('collection_export_ids', []):
if collection_export_id in collection_map:
location.collections.add(collection_map[collection_export_id])
@ -484,14 +485,17 @@ class BackupViewSet(viewsets.ViewSet):
)
# Import images
content_type = ContentType.objects.get(model='location')
for img_data in adv_data.get('images', []):
immich_id = img_data.get('immich_id')
if immich_id:
ContentImage.objects.create(
user=user,
location=location,
immich_id=immich_id,
is_primary=img_data.get('is_primary', False)
is_primary=img_data.get('is_primary', False),
content_type=content_type,
object_id=location.id
)
summary['images'] += 1
else:
@ -502,9 +506,10 @@ class BackupViewSet(viewsets.ViewSet):
img_file = ContentFile(img_content, name=filename)
ContentImage.objects.create(
user=user,
location=location,
image=img_file,
is_primary=img_data.get('is_primary', False)
is_primary=img_data.get('is_primary', False),
content_type=content_type,
object_id=location.id
)
summary['images'] += 1
except KeyError:
@ -519,9 +524,10 @@ class BackupViewSet(viewsets.ViewSet):
att_file = ContentFile(att_content, name=filename)
ContentAttachment.objects.create(
user=user,
location=location,
file=att_file,
name=att_data.get('name')
name=att_data.get('name'),
content_type=content_type,
object_id=location.id
)
summary['attachments'] += 1
except KeyError:

View file

@ -1,6 +1,5 @@
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from django.db.models import Q
from django.core.files.base import ContentFile
@ -8,12 +7,96 @@ from django.contrib.contenttypes.models import ContentType
from adventures.models import Location, Transportation, Note, Lodging, Visit, ContentImage
from adventures.serializers import ContentImageSerializer
from integrations.models import ImmichIntegration
import uuid
from adventures.permissions import IsOwnerOrSharedWithFullAccess # Your existing permission class
import requests
class ContentImagePermission(IsOwnerOrSharedWithFullAccess):
"""
Specialized permission for ContentImage objects that checks permissions
on the related content object.
"""
def has_object_permission(self, request, view, obj):
"""
For ContentImage objects, check permissions on the related content object.
"""
if not request.user or not request.user.is_authenticated:
return False
# Get the related content object
content_object = obj.content_object
if not content_object:
return False
# Use the parent permission class to check access to the content object
return super().has_object_permission(request, view, content_object)
class ContentImageViewSet(viewsets.ModelViewSet):
serializer_class = ContentImageSerializer
permission_classes = [IsAuthenticated]
permission_classes = [ContentImagePermission]
def get_queryset(self):
"""Get all images the user has access to"""
if not self.request.user.is_authenticated:
return ContentImage.objects.none()
# Import here to avoid circular imports
from adventures.models import Location, Transportation, Note, Lodging, Visit
# Build a single query with all conditions
return ContentImage.objects.filter(
# User owns the image directly (if user field exists on ContentImage)
Q(user=self.request.user) |
# Or user has access to the content object
(
# Locations owned by user
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Shared locations
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(collections__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Collections owned by user containing locations
Q(content_type=ContentType.objects.get_for_model(Location)) &
Q(object_id__in=Location.objects.filter(collections__user=self.request.user).values_list('id', flat=True))
) |
(
# Transportation owned by user
Q(content_type=ContentType.objects.get_for_model(Transportation)) &
Q(object_id__in=Transportation.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Notes owned by user
Q(content_type=ContentType.objects.get_for_model(Note)) &
Q(object_id__in=Note.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Lodging owned by user
Q(content_type=ContentType.objects.get_for_model(Lodging)) &
Q(object_id__in=Lodging.objects.filter(user=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through location's user
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__user=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through shared locations
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__collections__shared_with=self.request.user).values_list('id', flat=True))
) |
(
# Visits - access through collections owned by user
Q(content_type=ContentType.objects.get_for_model(Visit)) &
Q(object_id__in=Visit.objects.filter(location__collections__user=self.request.user).values_list('id', flat=True))
)
).distinct()
@action(detail=True, methods=['post'])
def image_delete(self, request, *args, **kwargs):
@ -21,19 +104,14 @@ class ContentImageViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=['post'])
def toggle_primary(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=status.HTTP_401_UNAUTHORIZED)
instance = self.get_object()
content_object = instance.content_object
# Check ownership based on content type
if hasattr(content_object, 'user') and content_object.user != request.user:
return Response({"error": "User does not own this content"}, status=status.HTTP_403_FORBIDDEN)
# Check if the image is already the primary image
if instance.is_primary:
return Response({"error": "Image is already the primary image"}, status=status.HTTP_400_BAD_REQUEST)
return Response(
{"error": "Image is already the primary image"},
status=status.HTTP_400_BAD_REQUEST
)
# Set other images of the same content object to not primary
ContentImage.objects.filter(
@ -48,9 +126,6 @@ class ContentImageViewSet(viewsets.ModelViewSet):
return Response({"success": "Image set as primary image"})
def create(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=status.HTTP_401_UNAUTHORIZED)
# Get content type and object ID from request
content_type_name = request.data.get('content_type')
object_id = request.data.get('object_id')
@ -60,6 +135,26 @@ class ContentImageViewSet(viewsets.ModelViewSet):
"error": "content_type and object_id are required"
}, status=status.HTTP_400_BAD_REQUEST)
# Get the content object and validate permissions
content_object = self._get_and_validate_content_object(content_type_name, object_id)
if isinstance(content_object, Response): # Error response
return content_object
content_type = ContentType.objects.get_for_model(content_object.__class__)
# Handle Immich ID for shared users by downloading the image
if (hasattr(content_object, 'user') and
request.user != content_object.user and
'immich_id' in request.data and
request.data.get('immich_id')):
return self._handle_immich_image_creation(request, content_object, content_type, object_id)
# Standard image creation
return self._create_standard_image(request, content_object, content_type, object_id)
def _get_and_validate_content_object(self, content_type_name, object_id):
"""Get and validate the content object exists and user has access"""
# Map content type names to model classes
content_type_map = {
'location': Location,
@ -74,119 +169,100 @@ class ContentImageViewSet(viewsets.ModelViewSet):
"error": f"Invalid content_type. Must be one of: {', '.join(content_type_map.keys())}"
}, status=status.HTTP_400_BAD_REQUEST)
# Get the content type and object
# Get the content object
try:
content_type = ContentType.objects.get_for_model(content_type_map[content_type_name])
content_object = content_type_map[content_type_name].objects.get(id=object_id)
except (ValueError, content_type_map[content_type_name].DoesNotExist):
return Response({
"error": f"{content_type_name} not found"
}, status=status.HTTP_404_NOT_FOUND)
# Check permissions based on content type
if hasattr(content_object, 'user'):
if content_object.user != request.user:
# For Location, check if user has shared access
if content_type_name == 'location':
if content_object.collections.exists():
user_has_access = False
for collection in content_object.collections.all():
if collection.shared_with.filter(id=request.user.id).exists() or collection.user == request.user:
user_has_access = True
break
if not user_has_access:
return Response({
"error": "User does not have permission to access this content"
}, status=status.HTTP_403_FORBIDDEN)
else:
return Response({
"error": "User does not own this content"
}, status=status.HTTP_403_FORBIDDEN)
else:
return Response({
"error": "User does not own this content"
}, status=status.HTTP_403_FORBIDDEN)
# Check permissions using the permission class
permission_checker = IsOwnerOrSharedWithFullAccess()
if not permission_checker.has_object_permission(self.request, self, content_object):
return Response({
"error": "User does not have permission to access this content"
}, status=status.HTTP_403_FORBIDDEN)
# Handle Immich ID for shared users by downloading the image
if (hasattr(content_object, 'user') and
request.user != content_object.user and
'immich_id' in request.data and
request.data.get('immich_id')):
return content_object
def _handle_immich_image_creation(self, request, content_object, content_type, object_id):
"""Handle creation of image from Immich for shared users"""
immich_id = request.data.get('immich_id')
# Get the shared user's Immich integration
try:
user_integration = ImmichIntegration.objects.get(user=request.user)
except ImmichIntegration.DoesNotExist:
return Response({
"error": "No Immich integration found for your account. Please set up Immich integration first.",
"code": "immich_integration_not_found"
}, status=status.HTTP_400_BAD_REQUEST)
# Download the image from the shared user's Immich server
try:
immich_response = requests.get(
f'{user_integration.server_url}/assets/{immich_id}/thumbnail?size=preview',
headers={'x-api-key': user_integration.api_key},
timeout=10
)
immich_response.raise_for_status()
immich_id = request.data.get('immich_id')
# Get the shared user's Immich integration
try:
user_integration = ImmichIntegration.objects.get(user=request.user)
except ImmichIntegration.DoesNotExist:
# Create a temporary file with the downloaded content
content_type_header = immich_response.headers.get('Content-Type', 'image/jpeg')
if not content_type_header.startswith('image/'):
return Response({
"error": "No Immich integration found for your account. Please set up Immich integration first.",
"code": "immich_integration_not_found"
"error": "Invalid content type returned from Immich server.",
"code": "invalid_content_type"
}, status=status.HTTP_400_BAD_REQUEST)
# Download the image from the shared user's Immich server
try:
immich_response = requests.get(
f'{user_integration.server_url}/assets/{immich_id}/thumbnail?size=preview',
headers={'x-api-key': user_integration.api_key},
timeout=10
)
immich_response.raise_for_status()
# Create a temporary file with the downloaded content
content_type_header = immich_response.headers.get('Content-Type', 'image/jpeg')
if not content_type_header.startswith('image/'):
return Response({
"error": "Invalid content type returned from Immich server.",
"code": "invalid_content_type"
}, status=status.HTTP_400_BAD_REQUEST)
# Determine file extension from content type
ext_map = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/webp': '.webp',
'image/gif': '.gif'
}
file_ext = ext_map.get(content_type_header, '.jpg')
filename = f"immich_{immich_id}{file_ext}"
# Create a Django ContentFile from the downloaded image
image_file = ContentFile(immich_response.content, name=filename)
# Modify request data to use the downloaded image instead of immich_id
request_data = request.data.copy()
request_data.pop('immich_id', None) # Remove immich_id
request_data['image'] = image_file # Add the image file
request_data['content_type'] = content_type.id
request_data['object_id'] = object_id
# Create the serializer with the modified data
serializer = self.get_serializer(data=request_data)
serializer.is_valid(raise_exception=True)
# Save with the downloaded image
serializer.save(
user=content_object.user if hasattr(content_object, 'user') else request.user,
image=image_file,
content_type=content_type,
object_id=object_id
)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except requests.exceptions.RequestException:
return Response({
"error": f"Failed to fetch image from Immich server",
"code": "immich_fetch_failed"
}, status=status.HTTP_502_BAD_GATEWAY)
except Exception:
return Response({
"error": f"Unexpected error processing Immich image",
"code": "immich_processing_error"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Determine file extension from content type
ext_map = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/webp': '.webp',
'image/gif': '.gif'
}
file_ext = ext_map.get(content_type_header, '.jpg')
filename = f"immich_{immich_id}{file_ext}"
# Create a Django ContentFile from the downloaded image
image_file = ContentFile(immich_response.content, name=filename)
# Modify request data to use the downloaded image instead of immich_id
request_data = request.data.copy()
request_data.pop('immich_id', None) # Remove immich_id
request_data['image'] = image_file # Add the image file
request_data['content_type'] = content_type.id
request_data['object_id'] = object_id
# Create the serializer with the modified data
serializer = self.get_serializer(data=request_data)
serializer.is_valid(raise_exception=True)
# Save with the downloaded image
serializer.save(
user=content_object.user if hasattr(content_object, 'user') else request.user,
image=image_file,
content_type=content_type,
object_id=object_id
)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except requests.exceptions.RequestException:
return Response({
"error": f"Failed to fetch image from Immich server",
"code": "immich_fetch_failed"
}, status=status.HTTP_502_BAD_GATEWAY)
except Exception:
return Response({
"error": f"Unexpected error processing Immich image",
"code": "immich_processing_error"
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _create_standard_image(self, request, content_object, content_type, object_id):
"""Handle standard image creation"""
# Add content type and object ID to request data
request_data = request.data.copy()
request_data['content_type'] = content_type.id
@ -205,63 +281,6 @@ class ContentImageViewSet(viewsets.ModelViewSet):
return Response(serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=status.HTTP_401_UNAUTHORIZED)
instance = self.get_object()
content_object = instance.content_object
# Check ownership
if hasattr(content_object, 'user') and content_object.user != request.user:
return Response({"error": "User does not own this content"}, status=status.HTTP_403_FORBIDDEN)
return super().update(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=status.HTTP_401_UNAUTHORIZED)
instance = self.get_object()
content_object = instance.content_object
# Check ownership
if hasattr(content_object, 'user') and content_object.user != request.user:
return Response({"error": "User does not own this content"}, status=status.HTTP_403_FORBIDDEN)
return super().destroy(request, *args, **kwargs)
def partial_update(self, request, *args, **kwargs):
if not request.user.is_authenticated:
return Response({"error": "User is not authenticated"}, status=status.HTTP_401_UNAUTHORIZED)
instance = self.get_object()
content_object = instance.content_object
# Check ownership
if hasattr(content_object, 'user') and content_object.user != request.user:
return Response({"error": "User does not own this content"}, status=status.HTTP_403_FORBIDDEN)
return super().partial_update(request, *args, **kwargs)
def get_queryset(self):
"""Get all images the user has access to"""
if not self.request.user.is_authenticated:
return ContentImage.objects.none()
# Get content type for Location to handle shared access
location_content_type = ContentType.objects.get_for_model(Location)
# Build queryset with proper permissions
queryset = ContentImage.objects.filter(
Q(content_object__user=self.request.user) | # User owns the content
Q(content_type=location_content_type, content_object__collections__shared_with=self.request.user) # Shared locations
).distinct()
return queryset
def perform_create(self, serializer):
# The content_type and object_id are already set in the create method
# Just ensure the user is set correctly

View file

@ -263,7 +263,7 @@ class LocationViewSet(viewsets.ModelViewSet):
# Check if user has shared access to the collection
if not collection.shared_with.filter(uuid=self.request.user.uuid).exists():
raise PermissionDenied(
f"You don't have permission to remove location from collection '{collection.name}'"
f"You don't have permission to remove this location from one of the collections it's linked to.'"
)
def _validate_collection_permissions(self, collections):

View file

@ -255,7 +255,7 @@ class ImmichIntegrationView(viewsets.ViewSet):
Access levels (in order of priority):
1. Public locations: accessible by anyone
2. Private locations in public collections: accessible by anyone
3. Private locations in private collections shared with user: accessible by shared users
3. Private locations in private collections shared with user: accessible by shared users, and the collection owner
4. Private locations: accessible only to the owner
5. No ContentImage: owner can still view via integration
"""
@ -333,10 +333,11 @@ class ImmichIntegrationView(viewsets.ViewSet):
elif request.user.is_authenticated and request.user == owner_id:
is_authorized = True
# Level 4: Shared collection access
# Level 4: Shared collection access or collection owner access
elif (request.user.is_authenticated and
any(collection.shared_with.filter(id=request.user.id).exists()
for collection in collections)):
(any(collection.shared_with.filter(id=request.user.id).exists()
for collection in collections) or
any(collection.user == request.user for collection in collections))):
is_authorized = True
else:
# Location without collections - owner access only