diff --git a/api/dataservices/edgestack/edgestack.go b/api/dataservices/edgestack/edgestack.go index 52ffd23a3..fb434f388 100644 --- a/api/dataservices/edgestack/edgestack.go +++ b/api/dataservices/edgestack/edgestack.go @@ -5,7 +5,6 @@ import ( "sync" portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/internal/edge/cache" "github.com/rs/zerolog/log" ) @@ -17,9 +16,10 @@ const ( // Service represents a service for managing Edge stack data. type Service struct { - connection portainer.Connection - idxVersion map[portainer.EdgeStackID]int - mu sync.RWMutex + connection portainer.Connection + idxVersion map[portainer.EdgeStackID]int + mu sync.RWMutex + cacheInvalidationFn func(portainer.EdgeStackID) } func (service *Service) BucketName() string { @@ -27,15 +27,20 @@ func (service *Service) BucketName() string { } // NewService creates a new instance of a service. -func NewService(connection portainer.Connection) (*Service, error) { +func NewService(connection portainer.Connection, cacheInvalidationFn func(portainer.EdgeStackID)) (*Service, error) { err := connection.SetServiceName(BucketName) if err != nil { return nil, err } s := &Service{ - connection: connection, - idxVersion: make(map[portainer.EdgeStackID]int), + connection: connection, + idxVersion: make(map[portainer.EdgeStackID]int), + cacheInvalidationFn: cacheInvalidationFn, + } + + if s.cacheInvalidationFn == nil { + s.cacheInvalidationFn = func(portainer.EdgeStackID) {} } es, err := s.EdgeStacks() @@ -109,12 +114,9 @@ func (service *Service) Create(id portainer.EdgeStackID, edgeStack *portainer.Ed service.mu.Lock() service.idxVersion[id] = edgeStack.Version + service.cacheInvalidationFn(id) service.mu.Unlock() - for endpointID := range edgeStack.Status { - cache.Del(endpointID) - } - return nil } @@ -123,37 +125,15 @@ func (service *Service) UpdateEdgeStack(ID portainer.EdgeStackID, edgeStack *por service.mu.Lock() defer service.mu.Unlock() - prevEdgeStack, err := service.EdgeStack(ID) - if err != nil { - return err - } - identifier := service.connection.ConvertToKey(int(ID)) - err = service.connection.UpdateObject(BucketName, identifier, edgeStack) + err := service.connection.UpdateObject(BucketName, identifier, edgeStack) if err != nil { return err } service.idxVersion[ID] = edgeStack.Version - - // Invalidate cache for removed environments - for endpointID := range prevEdgeStack.Status { - if _, ok := edgeStack.Status[endpointID]; !ok { - cache.Del(endpointID) - } - } - - // Invalidate cache when version changes and for added environments - for endpointID := range edgeStack.Status { - if prevEdgeStack.Version == edgeStack.Version { - if _, ok := prevEdgeStack.Status[endpointID]; ok { - continue - } - } - - cache.Del(endpointID) - } + service.cacheInvalidationFn(ID) return nil } @@ -167,35 +147,10 @@ func (service *Service) UpdateEdgeStackFunc(ID portainer.EdgeStackID, updateFunc defer service.mu.Unlock() return service.connection.UpdateObjectFunc(BucketName, id, edgeStack, func() { - prevEndpoints := make(map[portainer.EndpointID]struct{}, len(edgeStack.Status)) - for endpointID := range edgeStack.Status { - if _, ok := edgeStack.Status[endpointID]; !ok { - prevEndpoints[endpointID] = struct{}{} - } - } - updateFunc(edgeStack) - prevVersion := service.idxVersion[ID] service.idxVersion[ID] = edgeStack.Version - - // Invalidate cache for removed environments - for endpointID := range prevEndpoints { - if _, ok := edgeStack.Status[endpointID]; !ok { - cache.Del(endpointID) - } - } - - // Invalidate cache when version changes and for added environments - for endpointID := range edgeStack.Status { - if prevVersion == edgeStack.Version { - if _, ok := prevEndpoints[endpointID]; ok { - continue - } - } - - cache.Del(endpointID) - } + service.cacheInvalidationFn(ID) }) } @@ -204,23 +159,16 @@ func (service *Service) DeleteEdgeStack(ID portainer.EdgeStackID) error { service.mu.Lock() defer service.mu.Unlock() - edgeStack, err := service.EdgeStack(ID) - if err != nil { - return err - } - identifier := service.connection.ConvertToKey(int(ID)) - err = service.connection.DeleteObject(BucketName, identifier) + err := service.connection.DeleteObject(BucketName, identifier) if err != nil { return err } delete(service.idxVersion, ID) - for endpointID := range edgeStack.Status { - cache.Del(endpointID) - } + service.cacheInvalidationFn(ID) return nil } diff --git a/api/dataservices/endpointrelation/endpointrelation.go b/api/dataservices/endpointrelation/endpointrelation.go index 7f76ffd17..afb5643ed 100644 --- a/api/dataservices/endpointrelation/endpointrelation.go +++ b/api/dataservices/endpointrelation/endpointrelation.go @@ -78,7 +78,7 @@ func (service *Service) Create(endpointRelation *portainer.EndpointRelation) err return err } -// UpdateEndpointRelation updates an Environment(Endpoint) relation object +// Deprecated: Use UpdateEndpointRelationFunc instead. func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error { identifier := service.connection.ConvertToKey(int(endpointID)) err := service.connection.UpdateObject(BucketName, identifier, endpointRelation) @@ -87,6 +87,18 @@ func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID, return err } +// UpdateEndpointRelationFunc updates an Environment(Endpoint) relation object +func (service *Service) UpdateEndpointRelationFunc(endpointID portainer.EndpointID, updateFunc func(endpointRelation *portainer.EndpointRelation)) error { + id := service.connection.ConvertToKey(int(endpointID)) + endpointRelation := &portainer.EndpointRelation{} + + return service.connection.UpdateObjectFunc(BucketName, id, endpointRelation, func() { + updateFunc(endpointRelation) + + cache.Del(endpointID) + }) +} + // DeleteEndpointRelation deletes an Environment(Endpoint) relation object func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID) error { identifier := service.connection.ConvertToKey(int(endpointID)) @@ -95,3 +107,19 @@ func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID) return err } + +func (service *Service) InvalidateEdgeCacheForEdgeStack(edgeStackID portainer.EdgeStackID) { + rels, err := service.EndpointRelations() + if err != nil { + log.Error().Err(err).Msg("cannot retrieve endpoint relations") + return + } + + for _, rel := range rels { + for id := range rel.EdgeStacks { + if edgeStackID == id { + cache.Del(rel.EndpointID) + } + } + } +} diff --git a/api/dataservices/interface.go b/api/dataservices/interface.go index 452993ad0..2d53c1800 100644 --- a/api/dataservices/interface.go +++ b/api/dataservices/interface.go @@ -126,6 +126,7 @@ type ( EndpointRelation(EndpointID portainer.EndpointID) (*portainer.EndpointRelation, error) Create(endpointRelation *portainer.EndpointRelation) error UpdateEndpointRelation(EndpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error + UpdateEndpointRelationFunc(EndpointID portainer.EndpointID, updateFunc func(*portainer.EndpointRelation)) error DeleteEndpointRelation(EndpointID portainer.EndpointID) error BucketName() string } diff --git a/api/datastore/services.go b/api/datastore/services.go index 749067a22..e687fd0e6 100644 --- a/api/datastore/services.go +++ b/api/datastore/services.go @@ -93,7 +93,13 @@ func (store *Store) initServices() error { } store.DockerHubService = dockerhubService - edgeStackService, err := edgestack.NewService(store.connection) + endpointRelationService, err := endpointrelation.NewService(store.connection) + if err != nil { + return err + } + store.EndpointRelationService = endpointRelationService + + edgeStackService, err := edgestack.NewService(store.connection, endpointRelationService.InvalidateEdgeCacheForEdgeStack) if err != nil { return err } @@ -123,12 +129,6 @@ func (store *Store) initServices() error { } store.EndpointService = endpointService - endpointRelationService, err := endpointrelation.NewService(store.connection) - if err != nil { - return err - } - store.EndpointRelationService = endpointRelationService - extensionService, err := extension.NewService(store.connection) if err != nil { return err diff --git a/api/http/handler/edgegroups/edgegroup_update.go b/api/http/handler/edgegroups/edgegroup_update.go index a0ce97860..94cba9e06 100644 --- a/api/http/handler/edgegroups/edgegroup_update.go +++ b/api/http/handler/edgegroups/edgegroup_update.go @@ -163,11 +163,6 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) } func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) error { - relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID) - if err != nil { - return err - } - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) if err != nil { return err @@ -195,9 +190,9 @@ func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) er edgeStackSet[edgeStackID] = true } - relation.EdgeStacks = edgeStackSet - - return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, relation) + return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) { + relation.EdgeStacks = edgeStackSet + }) } func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpointID portainer.EndpointID, edgeJobs []portainer.EdgeJob, operation string) error { diff --git a/api/http/handler/edgestacks/edgestack_status_delete.go b/api/http/handler/edgestacks/edgestack_status_delete.go index 1ffdabe83..f12b65db7 100644 --- a/api/http/handler/edgestacks/edgestack_status_delete.go +++ b/api/http/handler/edgestacks/edgestack_status_delete.go @@ -38,17 +38,14 @@ func (handler *Handler) edgeStackStatusDelete(w http.ResponseWriter, r *http.Req return httperror.Forbidden("Permission denied to access environment", err) } - stack, err := handler.DataStore.EdgeStack().EdgeStack(portainer.EdgeStackID(stackID)) + var edgeStack *portainer.EdgeStack + err = handler.DataStore.EdgeStack().UpdateEdgeStackFunc(portainer.EdgeStackID(stackID), func(stack *portainer.EdgeStack) { + delete(stack.Status, endpoint.ID) + edgeStack = stack + }) if err != nil { - return handler.handlerDBErr(err, "Unable to find a stack with the specified identifier inside the database") + return handler.handlerDBErr(err, "Unable to persist the stack changes inside the database") } - delete(stack.Status, endpoint.ID) - - err = handler.DataStore.EdgeStack().UpdateEdgeStack(stack.ID, stack) - if err != nil { - return httperror.InternalServerError("Unable to persist the stack changes inside the database", err) - } - - return response.JSON(w, stack) + return response.JSON(w, edgeStack) } diff --git a/api/http/handler/edgestacks/edgestack_update.go b/api/http/handler/edgestacks/edgestack_update.go index eb4603aa0..6e7161d45 100644 --- a/api/http/handler/edgestacks/edgestack_update.go +++ b/api/http/handler/edgestacks/edgestack_update.go @@ -94,15 +94,12 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request) } for endpointID := range endpointsToRemove { - relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID) - if err != nil { + err = handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) { + delete(relation.EdgeStacks, stack.ID) + }) + if handler.DataStore.IsErrObjectNotFound(err) { return httperror.InternalServerError("Unable to find environment relation in database", err) - } - - delete(relation.EdgeStacks, stack.ID) - - err = handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation) - if err != nil { + } else if err != nil { return httperror.InternalServerError("Unable to persist environment relation in database", err) } } @@ -114,15 +111,12 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request) } for endpointID := range endpointsToAdd { - relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID) - if err != nil { + err = handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) { + relation.EdgeStacks[stack.ID] = true + }) + if handler.DataStore.IsErrObjectNotFound(err) { return httperror.InternalServerError("Unable to find environment relation in database", err) - } - - relation.EdgeStacks[stack.ID] = true - - err = handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation) - if err != nil { + } else if err != nil { return httperror.InternalServerError("Unable to persist environment relation in database", err) } } diff --git a/api/http/handler/endpointgroups/endpoints.go b/api/http/handler/endpointgroups/endpoints.go index c1757d3c7..36d162f4e 100644 --- a/api/http/handler/endpointgroups/endpoints.go +++ b/api/http/handler/endpointgroups/endpoints.go @@ -19,11 +19,6 @@ func (handler *Handler) updateEndpointRelations(endpoint *portainer.Endpoint, en endpointGroup = unassignedGroup } - endpointRelation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpoint.ID) - if err != nil { - return err - } - edgeGroups, err := handler.DataStore.EdgeGroup().EdgeGroups() if err != nil { return err @@ -39,7 +34,8 @@ func (handler *Handler) updateEndpointRelations(endpoint *portainer.Endpoint, en for _, edgeStackID := range endpointStacks { stacksSet[edgeStackID] = true } - endpointRelation.EdgeStacks = stacksSet - return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation) + return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) { + relation.EdgeStacks = stacksSet + }) } diff --git a/api/http/handler/endpoints/endpoint_delete.go b/api/http/handler/endpoints/endpoint_delete.go index 5777616da..b82bd9f02 100644 --- a/api/http/handler/endpoints/endpoint_delete.go +++ b/api/http/handler/endpoints/endpoint_delete.go @@ -101,8 +101,9 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) * for idx := range edgeStacks { edgeStack := &edgeStacks[idx] if _, ok := edgeStack.Status[endpoint.ID]; ok { - delete(edgeStack.Status, endpoint.ID) - err = handler.DataStore.EdgeStack().UpdateEdgeStack(edgeStack.ID, edgeStack) + err = handler.DataStore.EdgeStack().UpdateEdgeStackFunc(edgeStack.ID, func(stack *portainer.EdgeStack) { + delete(stack.Status, endpoint.ID) + }) if err != nil { return httperror.InternalServerError("Unable to update edge stack", err) } diff --git a/api/http/handler/tags/tag_delete.go b/api/http/handler/tags/tag_delete.go index 40a0efd9f..9d0dd8e34 100644 --- a/api/http/handler/tags/tag_delete.go +++ b/api/http/handler/tags/tag_delete.go @@ -106,11 +106,6 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe } func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edgeGroups []portainer.EdgeGroup, edgeStacks []portainer.EdgeStack) error { - endpointRelation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpoint.ID) - if err != nil { - return err - } - endpointGroup, err := handler.DataStore.EndpointGroup().EndpointGroup(endpoint.GroupID) if err != nil { return err @@ -121,9 +116,10 @@ func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edg for _, edgeStackID := range endpointStacks { stacksSet[edgeStackID] = true } - endpointRelation.EdgeStacks = stacksSet - return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation) + return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) { + relation.EdgeStacks = stacksSet + }) } func removeElement(slice []portainer.TagID, elem portainer.TagID) []portainer.TagID { diff --git a/api/internal/edge/edgestacks/service.go b/api/internal/edge/edgestacks/service.go index a9a90235b..c0cce63c1 100644 --- a/api/internal/edge/edgestacks/service.go +++ b/api/internal/edge/edgestacks/service.go @@ -6,11 +6,13 @@ import ( "strings" "time" - "github.com/pkg/errors" portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" edgetypes "github.com/portainer/portainer/api/internal/edge/types" + + "github.com/pkg/errors" + "github.com/rs/zerolog/log" ) // Service represents a service for managing edge stacks. @@ -112,15 +114,12 @@ func (service *Service) updateEndpointRelations(edgeStackID portainer.EdgeStackI endpointRelationService := service.dataStore.EndpointRelation() for _, endpointID := range relatedEndpointIds { - relation, err := endpointRelationService.EndpointRelation(endpointID) - if err != nil { + err := endpointRelationService.UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) { + relation.EdgeStacks[edgeStackID] = true + }) + if service.dataStore.IsErrObjectNotFound(err) { return fmt.Errorf("unable to find endpoint relation in database: %w", err) - } - - relation.EdgeStacks[edgeStackID] = true - - err = endpointRelationService.UpdateEndpointRelation(endpointID, relation) - if err != nil { + } else if err != nil { return fmt.Errorf("unable to persist endpoint relation in database: %w", err) } } @@ -142,15 +141,15 @@ func (service *Service) DeleteEdgeStack(edgeStackID portainer.EdgeStackID, relat } for _, endpointID := range relatedEndpointIds { - relation, err := service.dataStore.EndpointRelation().EndpointRelation(endpointID) - if err != nil { - return errors.WithMessage(err, "Unable to find environment relation in database") - } - - delete(relation.EdgeStacks, edgeStackID) - - err = service.dataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation) - if err != nil { + service.dataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) { + delete(relation.EdgeStacks, edgeStackID) + }) + if service.dataStore.IsErrObjectNotFound(err) { + log.Warn(). + Int("endpoint_id", int(endpointID)). + Msg("Unable to find endpoint relation in database, skipping") + continue + } else if err != nil { return errors.WithMessage(err, "Unable to persist environment relation in database") } } diff --git a/api/internal/testhelpers/datastore.go b/api/internal/testhelpers/datastore.go index d283853ff..744e6f040 100644 --- a/api/internal/testhelpers/datastore.go +++ b/api/internal/testhelpers/datastore.go @@ -277,6 +277,16 @@ func (s *stubEndpointService) UpdateEndpoint(ID portainer.EndpointID, endpoint * return nil } +func (s *stubEndpointRelationService) UpdateEndpointRelationFunc(ID portainer.EndpointID, updateFunc func(relation *portainer.EndpointRelation)) error { + for i, r := range s.relations { + if r.EndpointID == ID { + updateFunc(&s.relations[i]) + } + } + + return nil +} + func (s *stubEndpointService) DeleteEndpoint(ID portainer.EndpointID) error { endpoints := []portainer.Endpoint{}