mirror of
https://github.com/portainer/portainer.git
synced 2025-08-06 22:35:23 +02:00
fix(edgestacks): fix edge stacks cache invalidation EE-4909 (#8400)
This commit is contained in:
parent
2216d2cdd2
commit
1000d57cd2
12 changed files with 111 additions and 146 deletions
|
@ -5,7 +5,6 @@ import (
|
|||
"sync"
|
||||
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/internal/edge/cache"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
@ -17,9 +16,10 @@ const (
|
|||
|
||||
// Service represents a service for managing Edge stack data.
|
||||
type Service struct {
|
||||
connection portainer.Connection
|
||||
idxVersion map[portainer.EdgeStackID]int
|
||||
mu sync.RWMutex
|
||||
connection portainer.Connection
|
||||
idxVersion map[portainer.EdgeStackID]int
|
||||
mu sync.RWMutex
|
||||
cacheInvalidationFn func(portainer.EdgeStackID)
|
||||
}
|
||||
|
||||
func (service *Service) BucketName() string {
|
||||
|
@ -27,15 +27,20 @@ func (service *Service) BucketName() string {
|
|||
}
|
||||
|
||||
// NewService creates a new instance of a service.
|
||||
func NewService(connection portainer.Connection) (*Service, error) {
|
||||
func NewService(connection portainer.Connection, cacheInvalidationFn func(portainer.EdgeStackID)) (*Service, error) {
|
||||
err := connection.SetServiceName(BucketName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
connection: connection,
|
||||
idxVersion: make(map[portainer.EdgeStackID]int),
|
||||
connection: connection,
|
||||
idxVersion: make(map[portainer.EdgeStackID]int),
|
||||
cacheInvalidationFn: cacheInvalidationFn,
|
||||
}
|
||||
|
||||
if s.cacheInvalidationFn == nil {
|
||||
s.cacheInvalidationFn = func(portainer.EdgeStackID) {}
|
||||
}
|
||||
|
||||
es, err := s.EdgeStacks()
|
||||
|
@ -109,12 +114,9 @@ func (service *Service) Create(id portainer.EdgeStackID, edgeStack *portainer.Ed
|
|||
|
||||
service.mu.Lock()
|
||||
service.idxVersion[id] = edgeStack.Version
|
||||
service.cacheInvalidationFn(id)
|
||||
service.mu.Unlock()
|
||||
|
||||
for endpointID := range edgeStack.Status {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -123,37 +125,15 @@ func (service *Service) UpdateEdgeStack(ID portainer.EdgeStackID, edgeStack *por
|
|||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
|
||||
prevEdgeStack, err := service.EdgeStack(ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
|
||||
err = service.connection.UpdateObject(BucketName, identifier, edgeStack)
|
||||
err := service.connection.UpdateObject(BucketName, identifier, edgeStack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.idxVersion[ID] = edgeStack.Version
|
||||
|
||||
// Invalidate cache for removed environments
|
||||
for endpointID := range prevEdgeStack.Status {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
}
|
||||
|
||||
// Invalidate cache when version changes and for added environments
|
||||
for endpointID := range edgeStack.Status {
|
||||
if prevEdgeStack.Version == edgeStack.Version {
|
||||
if _, ok := prevEdgeStack.Status[endpointID]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -167,35 +147,10 @@ func (service *Service) UpdateEdgeStackFunc(ID portainer.EdgeStackID, updateFunc
|
|||
defer service.mu.Unlock()
|
||||
|
||||
return service.connection.UpdateObjectFunc(BucketName, id, edgeStack, func() {
|
||||
prevEndpoints := make(map[portainer.EndpointID]struct{}, len(edgeStack.Status))
|
||||
for endpointID := range edgeStack.Status {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
prevEndpoints[endpointID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
updateFunc(edgeStack)
|
||||
|
||||
prevVersion := service.idxVersion[ID]
|
||||
service.idxVersion[ID] = edgeStack.Version
|
||||
|
||||
// Invalidate cache for removed environments
|
||||
for endpointID := range prevEndpoints {
|
||||
if _, ok := edgeStack.Status[endpointID]; !ok {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
}
|
||||
|
||||
// Invalidate cache when version changes and for added environments
|
||||
for endpointID := range edgeStack.Status {
|
||||
if prevVersion == edgeStack.Version {
|
||||
if _, ok := prevEndpoints[endpointID]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -204,23 +159,16 @@ func (service *Service) DeleteEdgeStack(ID portainer.EdgeStackID) error {
|
|||
service.mu.Lock()
|
||||
defer service.mu.Unlock()
|
||||
|
||||
edgeStack, err := service.EdgeStack(ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
identifier := service.connection.ConvertToKey(int(ID))
|
||||
|
||||
err = service.connection.DeleteObject(BucketName, identifier)
|
||||
err := service.connection.DeleteObject(BucketName, identifier)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(service.idxVersion, ID)
|
||||
|
||||
for endpointID := range edgeStack.Status {
|
||||
cache.Del(endpointID)
|
||||
}
|
||||
service.cacheInvalidationFn(ID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ func (service *Service) Create(endpointRelation *portainer.EndpointRelation) err
|
|||
return err
|
||||
}
|
||||
|
||||
// UpdateEndpointRelation updates an Environment(Endpoint) relation object
|
||||
// Deprecated: Use UpdateEndpointRelationFunc instead.
|
||||
func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error {
|
||||
identifier := service.connection.ConvertToKey(int(endpointID))
|
||||
err := service.connection.UpdateObject(BucketName, identifier, endpointRelation)
|
||||
|
@ -87,6 +87,18 @@ func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID,
|
|||
return err
|
||||
}
|
||||
|
||||
// UpdateEndpointRelationFunc updates an Environment(Endpoint) relation object
|
||||
func (service *Service) UpdateEndpointRelationFunc(endpointID portainer.EndpointID, updateFunc func(endpointRelation *portainer.EndpointRelation)) error {
|
||||
id := service.connection.ConvertToKey(int(endpointID))
|
||||
endpointRelation := &portainer.EndpointRelation{}
|
||||
|
||||
return service.connection.UpdateObjectFunc(BucketName, id, endpointRelation, func() {
|
||||
updateFunc(endpointRelation)
|
||||
|
||||
cache.Del(endpointID)
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteEndpointRelation deletes an Environment(Endpoint) relation object
|
||||
func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID) error {
|
||||
identifier := service.connection.ConvertToKey(int(endpointID))
|
||||
|
@ -95,3 +107,19 @@ func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID)
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (service *Service) InvalidateEdgeCacheForEdgeStack(edgeStackID portainer.EdgeStackID) {
|
||||
rels, err := service.EndpointRelations()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cannot retrieve endpoint relations")
|
||||
return
|
||||
}
|
||||
|
||||
for _, rel := range rels {
|
||||
for id := range rel.EdgeStacks {
|
||||
if edgeStackID == id {
|
||||
cache.Del(rel.EndpointID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,6 +126,7 @@ type (
|
|||
EndpointRelation(EndpointID portainer.EndpointID) (*portainer.EndpointRelation, error)
|
||||
Create(endpointRelation *portainer.EndpointRelation) error
|
||||
UpdateEndpointRelation(EndpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error
|
||||
UpdateEndpointRelationFunc(EndpointID portainer.EndpointID, updateFunc func(*portainer.EndpointRelation)) error
|
||||
DeleteEndpointRelation(EndpointID portainer.EndpointID) error
|
||||
BucketName() string
|
||||
}
|
||||
|
|
|
@ -93,7 +93,13 @@ func (store *Store) initServices() error {
|
|||
}
|
||||
store.DockerHubService = dockerhubService
|
||||
|
||||
edgeStackService, err := edgestack.NewService(store.connection)
|
||||
endpointRelationService, err := endpointrelation.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
store.EndpointRelationService = endpointRelationService
|
||||
|
||||
edgeStackService, err := edgestack.NewService(store.connection, endpointRelationService.InvalidateEdgeCacheForEdgeStack)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -123,12 +129,6 @@ func (store *Store) initServices() error {
|
|||
}
|
||||
store.EndpointService = endpointService
|
||||
|
||||
endpointRelationService, err := endpointrelation.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
store.EndpointRelationService = endpointRelationService
|
||||
|
||||
extensionService, err := extension.NewService(store.connection)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -163,11 +163,6 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) error {
|
||||
relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -195,9 +190,9 @@ func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) er
|
|||
edgeStackSet[edgeStackID] = true
|
||||
}
|
||||
|
||||
relation.EdgeStacks = edgeStackSet
|
||||
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, relation)
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) {
|
||||
relation.EdgeStacks = edgeStackSet
|
||||
})
|
||||
}
|
||||
|
||||
func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpointID portainer.EndpointID, edgeJobs []portainer.EdgeJob, operation string) error {
|
||||
|
|
|
@ -38,17 +38,14 @@ func (handler *Handler) edgeStackStatusDelete(w http.ResponseWriter, r *http.Req
|
|||
return httperror.Forbidden("Permission denied to access environment", err)
|
||||
}
|
||||
|
||||
stack, err := handler.DataStore.EdgeStack().EdgeStack(portainer.EdgeStackID(stackID))
|
||||
var edgeStack *portainer.EdgeStack
|
||||
err = handler.DataStore.EdgeStack().UpdateEdgeStackFunc(portainer.EdgeStackID(stackID), func(stack *portainer.EdgeStack) {
|
||||
delete(stack.Status, endpoint.ID)
|
||||
edgeStack = stack
|
||||
})
|
||||
if err != nil {
|
||||
return handler.handlerDBErr(err, "Unable to find a stack with the specified identifier inside the database")
|
||||
return handler.handlerDBErr(err, "Unable to persist the stack changes inside the database")
|
||||
}
|
||||
|
||||
delete(stack.Status, endpoint.ID)
|
||||
|
||||
err = handler.DataStore.EdgeStack().UpdateEdgeStack(stack.ID, stack)
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to persist the stack changes inside the database", err)
|
||||
}
|
||||
|
||||
return response.JSON(w, stack)
|
||||
return response.JSON(w, edgeStack)
|
||||
}
|
||||
|
|
|
@ -94,15 +94,12 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
for endpointID := range endpointsToRemove {
|
||||
relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID)
|
||||
if err != nil {
|
||||
err = handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) {
|
||||
delete(relation.EdgeStacks, stack.ID)
|
||||
})
|
||||
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||
return httperror.InternalServerError("Unable to find environment relation in database", err)
|
||||
}
|
||||
|
||||
delete(relation.EdgeStacks, stack.ID)
|
||||
|
||||
err = handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation)
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
return httperror.InternalServerError("Unable to persist environment relation in database", err)
|
||||
}
|
||||
}
|
||||
|
@ -114,15 +111,12 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
for endpointID := range endpointsToAdd {
|
||||
relation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpointID)
|
||||
if err != nil {
|
||||
err = handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) {
|
||||
relation.EdgeStacks[stack.ID] = true
|
||||
})
|
||||
if handler.DataStore.IsErrObjectNotFound(err) {
|
||||
return httperror.InternalServerError("Unable to find environment relation in database", err)
|
||||
}
|
||||
|
||||
relation.EdgeStacks[stack.ID] = true
|
||||
|
||||
err = handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation)
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
return httperror.InternalServerError("Unable to persist environment relation in database", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,6 @@ func (handler *Handler) updateEndpointRelations(endpoint *portainer.Endpoint, en
|
|||
endpointGroup = unassignedGroup
|
||||
}
|
||||
|
||||
endpointRelation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpoint.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
edgeGroups, err := handler.DataStore.EdgeGroup().EdgeGroups()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -39,7 +34,8 @@ func (handler *Handler) updateEndpointRelations(endpoint *portainer.Endpoint, en
|
|||
for _, edgeStackID := range endpointStacks {
|
||||
stacksSet[edgeStackID] = true
|
||||
}
|
||||
endpointRelation.EdgeStacks = stacksSet
|
||||
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation)
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) {
|
||||
relation.EdgeStacks = stacksSet
|
||||
})
|
||||
}
|
||||
|
|
|
@ -101,8 +101,9 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) *
|
|||
for idx := range edgeStacks {
|
||||
edgeStack := &edgeStacks[idx]
|
||||
if _, ok := edgeStack.Status[endpoint.ID]; ok {
|
||||
delete(edgeStack.Status, endpoint.ID)
|
||||
err = handler.DataStore.EdgeStack().UpdateEdgeStack(edgeStack.ID, edgeStack)
|
||||
err = handler.DataStore.EdgeStack().UpdateEdgeStackFunc(edgeStack.ID, func(stack *portainer.EdgeStack) {
|
||||
delete(stack.Status, endpoint.ID)
|
||||
})
|
||||
if err != nil {
|
||||
return httperror.InternalServerError("Unable to update edge stack", err)
|
||||
}
|
||||
|
|
|
@ -106,11 +106,6 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe
|
|||
}
|
||||
|
||||
func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edgeGroups []portainer.EdgeGroup, edgeStacks []portainer.EdgeStack) error {
|
||||
endpointRelation, err := handler.DataStore.EndpointRelation().EndpointRelation(endpoint.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
endpointGroup, err := handler.DataStore.EndpointGroup().EndpointGroup(endpoint.GroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -121,9 +116,10 @@ func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edg
|
|||
for _, edgeStackID := range endpointStacks {
|
||||
stacksSet[edgeStackID] = true
|
||||
}
|
||||
endpointRelation.EdgeStacks = stacksSet
|
||||
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation)
|
||||
return handler.DataStore.EndpointRelation().UpdateEndpointRelationFunc(endpoint.ID, func(relation *portainer.EndpointRelation) {
|
||||
relation.EdgeStacks = stacksSet
|
||||
})
|
||||
}
|
||||
|
||||
func removeElement(slice []portainer.TagID, elem portainer.TagID) []portainer.TagID {
|
||||
|
|
|
@ -6,11 +6,13 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/internal/edge"
|
||||
edgetypes "github.com/portainer/portainer/api/internal/edge/types"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Service represents a service for managing edge stacks.
|
||||
|
@ -112,15 +114,12 @@ func (service *Service) updateEndpointRelations(edgeStackID portainer.EdgeStackI
|
|||
endpointRelationService := service.dataStore.EndpointRelation()
|
||||
|
||||
for _, endpointID := range relatedEndpointIds {
|
||||
relation, err := endpointRelationService.EndpointRelation(endpointID)
|
||||
if err != nil {
|
||||
err := endpointRelationService.UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) {
|
||||
relation.EdgeStacks[edgeStackID] = true
|
||||
})
|
||||
if service.dataStore.IsErrObjectNotFound(err) {
|
||||
return fmt.Errorf("unable to find endpoint relation in database: %w", err)
|
||||
}
|
||||
|
||||
relation.EdgeStacks[edgeStackID] = true
|
||||
|
||||
err = endpointRelationService.UpdateEndpointRelation(endpointID, relation)
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("unable to persist endpoint relation in database: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -142,15 +141,15 @@ func (service *Service) DeleteEdgeStack(edgeStackID portainer.EdgeStackID, relat
|
|||
}
|
||||
|
||||
for _, endpointID := range relatedEndpointIds {
|
||||
relation, err := service.dataStore.EndpointRelation().EndpointRelation(endpointID)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "Unable to find environment relation in database")
|
||||
}
|
||||
|
||||
delete(relation.EdgeStacks, edgeStackID)
|
||||
|
||||
err = service.dataStore.EndpointRelation().UpdateEndpointRelation(endpointID, relation)
|
||||
if err != nil {
|
||||
service.dataStore.EndpointRelation().UpdateEndpointRelationFunc(endpointID, func(relation *portainer.EndpointRelation) {
|
||||
delete(relation.EdgeStacks, edgeStackID)
|
||||
})
|
||||
if service.dataStore.IsErrObjectNotFound(err) {
|
||||
log.Warn().
|
||||
Int("endpoint_id", int(endpointID)).
|
||||
Msg("Unable to find endpoint relation in database, skipping")
|
||||
continue
|
||||
} else if err != nil {
|
||||
return errors.WithMessage(err, "Unable to persist environment relation in database")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,6 +277,16 @@ func (s *stubEndpointService) UpdateEndpoint(ID portainer.EndpointID, endpoint *
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *stubEndpointRelationService) UpdateEndpointRelationFunc(ID portainer.EndpointID, updateFunc func(relation *portainer.EndpointRelation)) error {
|
||||
for i, r := range s.relations {
|
||||
if r.EndpointID == ID {
|
||||
updateFunc(&s.relations[i])
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stubEndpointService) DeleteEndpoint(ID portainer.EndpointID) error {
|
||||
endpoints := []portainer.Endpoint{}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue