From a46db61c4c7a3570b0495cc2caa8df4ddc7b9fa5 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Mon, 28 Jul 2025 13:19:05 -0300 Subject: [PATCH] fix(endpointrelation): optimize updateEdgeStacksAfterRelationChange() BE-12092 (#941) --- api/dataservices/edgestack/edgestack_test.go | 50 +++++++++ .../endpointrelation/endpointrelation.go | 97 +--------------- .../endpointrelation/endpointrelation_test.go | 104 ++++++++++++++++++ api/dataservices/endpointrelation/tx.go | 66 ++++++----- api/datastore/services.go | 2 +- 5 files changed, 192 insertions(+), 127 deletions(-) create mode 100644 api/dataservices/edgestack/edgestack_test.go create mode 100644 api/dataservices/endpointrelation/endpointrelation_test.go diff --git a/api/dataservices/edgestack/edgestack_test.go b/api/dataservices/edgestack/edgestack_test.go new file mode 100644 index 000000000..debb4652e --- /dev/null +++ b/api/dataservices/edgestack/edgestack_test.go @@ -0,0 +1,50 @@ +package edgestack + +import ( + "testing" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/database/boltdb" + + "github.com/stretchr/testify/require" +) + +func TestUpdate(t *testing.T) { + var conn portainer.Connection = &boltdb.DbConnection{Path: t.TempDir()} + err := conn.Open() + require.NoError(t, err) + + defer conn.Close() + + service, err := NewService(conn, func(portainer.Transaction, portainer.EdgeStackID) {}) + require.NoError(t, err) + + const edgeStackID = 1 + edgeStack := &portainer.EdgeStack{ + ID: edgeStackID, + Name: "Test Stack", + } + + err = service.Create(edgeStackID, edgeStack) + require.NoError(t, err) + + err = service.UpdateEdgeStackFunc(edgeStackID, func(edgeStack *portainer.EdgeStack) { + edgeStack.Name = "Updated Stack" + }) + require.NoError(t, err) + + updatedStack, err := service.EdgeStack(edgeStackID) + require.NoError(t, err) + require.Equal(t, "Updated Stack", updatedStack.Name) + + err = conn.UpdateTx(func(tx portainer.Transaction) error { + return service.UpdateEdgeStackFuncTx(tx, edgeStackID, func(edgeStack *portainer.EdgeStack) { + edgeStack.Name = "Updated Stack Again" + }) + }) + require.NoError(t, err) + + updatedStack, err = service.EdgeStack(edgeStackID) + require.NoError(t, err) + require.Equal(t, "Updated Stack Again", updatedStack.Name) +} diff --git a/api/dataservices/endpointrelation/endpointrelation.go b/api/dataservices/endpointrelation/endpointrelation.go index 556a046bb..91c00f05a 100644 --- a/api/dataservices/endpointrelation/endpointrelation.go +++ b/api/dataservices/endpointrelation/endpointrelation.go @@ -6,8 +6,6 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge/cache" - - "github.com/rs/zerolog/log" ) // BucketName represents the name of the bucket where this service stores data. @@ -16,7 +14,6 @@ const BucketName = "endpoint_relations" // Service represents a service for managing environment(endpoint) relation data. type Service struct { connection portainer.Connection - updateStackFn func(ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error updateStackFnTx func(tx portainer.Transaction, ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error endpointRelationsCache []portainer.EndpointRelation mu sync.Mutex @@ -29,10 +26,8 @@ func (service *Service) BucketName() string { } func (service *Service) RegisterUpdateStackFunction( - updateFunc func(portainer.EdgeStackID, func(*portainer.EdgeStack)) error, updateFuncTx func(portainer.Transaction, portainer.EdgeStackID, func(*portainer.EdgeStack)) error, ) { - service.updateStackFn = updateFunc service.updateStackFnTx = updateFuncTx } @@ -91,24 +86,9 @@ func (service *Service) Create(endpointRelation *portainer.EndpointRelation) err // UpdateEndpointRelation updates an Environment(Endpoint) relation object func (service *Service) UpdateEndpointRelation(endpointID portainer.EndpointID, endpointRelation *portainer.EndpointRelation) error { - previousRelationState, _ := service.EndpointRelation(endpointID) - - identifier := service.connection.ConvertToKey(int(endpointID)) - err := service.connection.UpdateObject(BucketName, identifier, endpointRelation) - cache.Del(endpointID) - if err != nil { - return err - } - - updatedRelationState, _ := service.EndpointRelation(endpointID) - - service.mu.Lock() - service.endpointRelationsCache = nil - service.mu.Unlock() - - service.updateEdgeStacksAfterRelationChange(previousRelationState, updatedRelationState) - - return nil + return service.connection.UpdateTx(func(tx portainer.Transaction) error { + return service.Tx(tx).UpdateEndpointRelation(endpointID, endpointRelation) + }) } func (service *Service) AddEndpointRelationsForEdgeStack(endpointIDs []portainer.EndpointID, edgeStackID portainer.EdgeStackID) error { @@ -125,72 +105,7 @@ func (service *Service) RemoveEndpointRelationsForEdgeStack(endpointIDs []portai // DeleteEndpointRelation deletes an Environment(Endpoint) relation object func (service *Service) DeleteEndpointRelation(endpointID portainer.EndpointID) error { - deletedRelation, _ := service.EndpointRelation(endpointID) - - identifier := service.connection.ConvertToKey(int(endpointID)) - err := service.connection.DeleteObject(BucketName, identifier) - cache.Del(endpointID) - if err != nil { - return err - } - - service.mu.Lock() - service.endpointRelationsCache = nil - service.mu.Unlock() - - service.updateEdgeStacksAfterRelationChange(deletedRelation, nil) - - return nil -} - -func (service *Service) updateEdgeStacksAfterRelationChange(previousRelationState *portainer.EndpointRelation, updatedRelationState *portainer.EndpointRelation) { - relations, _ := service.EndpointRelations() - - stacksToUpdate := map[portainer.EdgeStackID]bool{} - - if previousRelationState != nil { - for stackId, enabled := range previousRelationState.EdgeStacks { - // flag stack for update if stack is not in the updated relation state - // = stack has been removed for this relation - // or this relation has been deleted - if enabled && (updatedRelationState == nil || !updatedRelationState.EdgeStacks[stackId]) { - stacksToUpdate[stackId] = true - } - } - } - - if updatedRelationState != nil { - for stackId, enabled := range updatedRelationState.EdgeStacks { - // flag stack for update if stack is not in the previous relation state - // = stack has been added for this relation - if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) { - stacksToUpdate[stackId] = true - } - } - } - - // for each stack referenced by the updated relation - // list how many time this stack is referenced in all relations - // in order to update the stack deployments count - for refStackId, refStackEnabled := range stacksToUpdate { - if !refStackEnabled { - continue - } - - numDeployments := 0 - - for _, r := range relations { - for sId, enabled := range r.EdgeStacks { - if enabled && sId == refStackId { - numDeployments += 1 - } - } - } - - if err := service.updateStackFn(refStackId, func(edgeStack *portainer.EdgeStack) { - edgeStack.NumDeployments = numDeployments - }); err != nil { - log.Error().Err(err).Msg("could not update the number of deployments") - } - } + return service.connection.UpdateTx(func(tx portainer.Transaction) error { + return service.Tx(tx).DeleteEndpointRelation(endpointID) + }) } diff --git a/api/dataservices/endpointrelation/endpointrelation_test.go b/api/dataservices/endpointrelation/endpointrelation_test.go new file mode 100644 index 000000000..f1ead0919 --- /dev/null +++ b/api/dataservices/endpointrelation/endpointrelation_test.go @@ -0,0 +1,104 @@ +package endpointrelation + +import ( + "testing" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/database/boltdb" + "github.com/portainer/portainer/api/internal/edge/cache" + + "github.com/stretchr/testify/require" +) + +func TestUpdateRelation(t *testing.T) { + const endpointID = 1 + const edgeStackID1 = 1 + const edgeStackID2 = 2 + + var conn portainer.Connection = &boltdb.DbConnection{Path: t.TempDir()} + err := conn.Open() + require.NoError(t, err) + + defer conn.Close() + + service, err := NewService(conn) + require.NoError(t, err) + + updateStackFnTxCalled := false + + edgeStacks := make(map[portainer.EdgeStackID]portainer.EdgeStack) + edgeStacks[edgeStackID1] = portainer.EdgeStack{ID: edgeStackID1} + edgeStacks[edgeStackID2] = portainer.EdgeStack{ID: edgeStackID2} + + service.RegisterUpdateStackFunction(func(tx portainer.Transaction, ID portainer.EdgeStackID, updateFunc func(edgeStack *portainer.EdgeStack)) error { + updateStackFnTxCalled = true + + s, ok := edgeStacks[ID] + require.True(t, ok) + + updateFunc(&s) + edgeStacks[ID] = s + + return nil + }) + + // Nil relation + + cache.Set(endpointID, []byte("value")) + + err = service.UpdateEndpointRelation(endpointID, nil) + _, cacheKeyExists := cache.Get(endpointID) + require.NoError(t, err) + require.False(t, updateStackFnTxCalled) + require.False(t, cacheKeyExists) + + // Add a relation to two edge stacks + + cache.Set(endpointID, []byte("value")) + + err = service.UpdateEndpointRelation(endpointID, &portainer.EndpointRelation{ + EndpointID: endpointID, + EdgeStacks: map[portainer.EdgeStackID]bool{ + edgeStackID1: true, + edgeStackID2: true, + }, + }) + _, cacheKeyExists = cache.Get(endpointID) + require.NoError(t, err) + require.True(t, updateStackFnTxCalled) + require.False(t, cacheKeyExists) + require.Equal(t, 1, edgeStacks[edgeStackID1].NumDeployments) + require.Equal(t, 1, edgeStacks[edgeStackID2].NumDeployments) + + // Remove a relation to one edge stack + + updateStackFnTxCalled = false + cache.Set(endpointID, []byte("value")) + + err = service.UpdateEndpointRelation(endpointID, &portainer.EndpointRelation{ + EndpointID: endpointID, + EdgeStacks: map[portainer.EdgeStackID]bool{ + 2: true, + }, + }) + _, cacheKeyExists = cache.Get(endpointID) + require.NoError(t, err) + require.True(t, updateStackFnTxCalled) + require.False(t, cacheKeyExists) + require.Equal(t, 0, edgeStacks[edgeStackID1].NumDeployments) + require.Equal(t, 1, edgeStacks[edgeStackID2].NumDeployments) + + // Delete the relation + + updateStackFnTxCalled = false + cache.Set(endpointID, []byte("value")) + + err = service.DeleteEndpointRelation(endpointID) + + _, cacheKeyExists = cache.Get(endpointID) + require.NoError(t, err) + require.True(t, updateStackFnTxCalled) + require.False(t, cacheKeyExists) + require.Equal(t, 0, edgeStacks[edgeStackID1].NumDeployments) + require.Equal(t, 0, edgeStacks[edgeStackID2].NumDeployments) +} diff --git a/api/dataservices/endpointrelation/tx.go b/api/dataservices/endpointrelation/tx.go index bc58678fd..54e66a31b 100644 --- a/api/dataservices/endpointrelation/tx.go +++ b/api/dataservices/endpointrelation/tx.go @@ -186,53 +186,49 @@ func (service ServiceTx) cachedEndpointRelations() ([]portainer.EndpointRelation } func (service ServiceTx) updateEdgeStacksAfterRelationChange(previousRelationState *portainer.EndpointRelation, updatedRelationState *portainer.EndpointRelation) { - relations, _ := service.EndpointRelations() - - stacksToUpdate := map[portainer.EdgeStackID]bool{} - if previousRelationState != nil { for stackId, enabled := range previousRelationState.EdgeStacks { // flag stack for update if stack is not in the updated relation state // = stack has been removed for this relation // or this relation has been deleted if enabled && (updatedRelationState == nil || !updatedRelationState.EdgeStacks[stackId]) { - stacksToUpdate[stackId] = true - } - } - } + if err := service.service.updateStackFnTx(service.tx, stackId, func(edgeStack *portainer.EdgeStack) { + // Sanity check + if edgeStack.NumDeployments <= 0 { + log.Error(). + Int("edgestack_id", int(edgeStack.ID)). + Int("endpoint_id", int(previousRelationState.EndpointID)). + Int("num_deployments", edgeStack.NumDeployments). + Msg("cannot decrement the number of deployments for an edge stack with zero deployments") - if updatedRelationState != nil { - for stackId, enabled := range updatedRelationState.EdgeStacks { - // flag stack for update if stack is not in the previous relation state - // = stack has been added for this relation - if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) { - stacksToUpdate[stackId] = true - } - } - } + return + } - // for each stack referenced by the updated relation - // list how many time this stack is referenced in all relations - // in order to update the stack deployments count - for refStackId, refStackEnabled := range stacksToUpdate { - if !refStackEnabled { - continue - } - - numDeployments := 0 - - for _, r := range relations { - for sId, enabled := range r.EdgeStacks { - if enabled && sId == refStackId { - numDeployments += 1 + edgeStack.NumDeployments-- + }); err != nil { + log.Error().Err(err).Msg("could not update the number of deployments") } + + cache.Del(previousRelationState.EndpointID) } } + } - if err := service.service.updateStackFnTx(service.tx, refStackId, func(edgeStack *portainer.EdgeStack) { - edgeStack.NumDeployments = numDeployments - }); err != nil { - log.Error().Err(err).Msg("could not update the number of deployments") + if updatedRelationState == nil { + return + } + + for stackId, enabled := range updatedRelationState.EdgeStacks { + // flag stack for update if stack is not in the previous relation state + // = stack has been added for this relation + if enabled && (previousRelationState == nil || !previousRelationState.EdgeStacks[stackId]) { + if err := service.service.updateStackFnTx(service.tx, stackId, func(edgeStack *portainer.EdgeStack) { + edgeStack.NumDeployments++ + }); err != nil { + log.Error().Err(err).Msg("could not update the number of deployments") + } + + cache.Del(updatedRelationState.EndpointID) } } } diff --git a/api/datastore/services.go b/api/datastore/services.go index b0570fa67..7413d6c03 100644 --- a/api/datastore/services.go +++ b/api/datastore/services.go @@ -111,7 +111,7 @@ func (store *Store) initServices() error { return err } store.EdgeStackService = edgeStackService - endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFunc, edgeStackService.UpdateEdgeStackFuncTx) + endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFuncTx) edgeStackStatusService, err := edgestackstatus.NewService(store.connection) if err != nil {