From e26a607d282a6c1be05fd20613fc20c2bb2c1b49 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Wed, 23 Nov 2022 21:36:17 -0300 Subject: [PATCH] fix(edgegroups): avoid a last-write-wins situation when updating edge groups concurrently EE-3732 (#8101) --- api/chisel/schedules.go | 1 - api/dataservices/edgegroup/edgegroup.go | 18 ++- api/dataservices/interface.go | 1 + api/http/handler/endpoints/endpoint_delete.go | 35 +++--- .../handler/endpoints/endpoint_delete_test.go | 109 ++++++++++++++++++ api/http/handler/tags/tag_delete.go | 57 ++++----- api/http/handler/tags/tag_delete_test.go | 105 +++++++++++++++++ api/http/proxy/manager.go | 5 +- 8 files changed, 266 insertions(+), 65 deletions(-) create mode 100644 api/http/handler/endpoints/endpoint_delete_test.go create mode 100644 api/http/handler/tags/tag_delete_test.go diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go index a00562d9a..0afa587bd 100644 --- a/api/chisel/schedules.go +++ b/api/chisel/schedules.go @@ -31,7 +31,6 @@ func (service *Service) RemoveEdgeJob(edgeJobID portainer.EdgeJobID) { service.mu.Lock() for _, tunnel := range service.tunnelDetailsMap { - // Filter in-place n := 0 for _, edgeJob := range tunnel.Jobs { if edgeJob.ID != edgeJobID { diff --git a/api/dataservices/edgegroup/edgegroup.go b/api/dataservices/edgegroup/edgegroup.go index 22272add8..d65b501c0 100644 --- a/api/dataservices/edgegroup/edgegroup.go +++ b/api/dataservices/edgegroup/edgegroup.go @@ -8,10 +8,8 @@ import ( "github.com/rs/zerolog/log" ) -const ( - // BucketName represents the name of the bucket where this service stores data. - BucketName = "edgegroups" -) +// BucketName represents the name of the bucket where this service stores data. +const BucketName = "edgegroups" // Service represents a service for managing Edge group data. type Service struct { @@ -68,12 +66,22 @@ func (service *Service) EdgeGroup(ID portainer.EdgeGroupID) (*portainer.EdgeGrou return &group, nil } -// UpdateEdgeGroup updates an Edge group. +// Deprecated: Use UpdateEdgeGroupFunc instead. func (service *Service) UpdateEdgeGroup(ID portainer.EdgeGroupID, group *portainer.EdgeGroup) error { identifier := service.connection.ConvertToKey(int(ID)) return service.connection.UpdateObject(BucketName, identifier, group) } +// UpdateEdgeGroupFunc updates an edge group inside a transaction avoiding data races. +func (service *Service) UpdateEdgeGroupFunc(ID portainer.EdgeGroupID, updateFunc func(edgeGroup *portainer.EdgeGroup)) error { + id := service.connection.ConvertToKey(int(ID)) + edgeGroup := &portainer.EdgeGroup{} + + return service.connection.UpdateObjectFunc(BucketName, id, edgeGroup, func() { + updateFunc(edgeGroup) + }) +} + // DeleteEdgeGroup deletes an Edge group. func (service *Service) DeleteEdgeGroup(ID portainer.EdgeGroupID) error { identifier := service.connection.ConvertToKey(int(ID)) diff --git a/api/dataservices/interface.go b/api/dataservices/interface.go index 0dbc99dba..fae9c0c43 100644 --- a/api/dataservices/interface.go +++ b/api/dataservices/interface.go @@ -69,6 +69,7 @@ type ( EdgeGroup(ID portainer.EdgeGroupID) (*portainer.EdgeGroup, error) Create(group *portainer.EdgeGroup) error UpdateEdgeGroup(ID portainer.EdgeGroupID, group *portainer.EdgeGroup) error + UpdateEdgeGroupFunc(ID portainer.EdgeGroupID, updateFunc func(group *portainer.EdgeGroup)) error DeleteEdgeGroup(ID portainer.EdgeGroupID) error BucketName() string } diff --git a/api/http/handler/endpoints/endpoint_delete.go b/api/http/handler/endpoints/endpoint_delete.go index ac4e967fd..ab9d92d6d 100644 --- a/api/http/handler/endpoints/endpoint_delete.go +++ b/api/http/handler/endpoints/endpoint_delete.go @@ -83,15 +83,12 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) * return httperror.InternalServerError("Unable to retrieve edge groups from the database", err) } - for idx := range edgeGroups { - edgeGroup := &edgeGroups[idx] - endpointIdx := findEndpointIndex(edgeGroup.Endpoints, endpoint.ID) - if endpointIdx != -1 { - edgeGroup.Endpoints = removeElement(edgeGroup.Endpoints, endpointIdx) - err = handler.DataStore.EdgeGroup().UpdateEdgeGroup(edgeGroup.ID, edgeGroup) - if err != nil { - return httperror.InternalServerError("Unable to update edge group", err) - } + for _, edgeGroup := range edgeGroups { + err = handler.DataStore.EdgeGroup().UpdateEdgeGroupFunc(edgeGroup.ID, func(g *portainer.EdgeGroup) { + g.Endpoints = removeElement(g.Endpoints, endpoint.ID) + }) + if err != nil { + return httperror.InternalServerError("Unable to update edge group", err) } } @@ -130,20 +127,14 @@ func (handler *Handler) endpointDelete(w http.ResponseWriter, r *http.Request) * return response.Empty(w) } -func findEndpointIndex(tags []portainer.EndpointID, searchEndpointID portainer.EndpointID) int { - for idx, tagID := range tags { - if searchEndpointID == tagID { - return idx +func removeElement(slice []portainer.EndpointID, elem portainer.EndpointID) []portainer.EndpointID { + for i, id := range slice { + if id == elem { + slice[i] = slice[len(slice)-1] + + return slice[:len(slice)-1] } } - return -1 -} -func removeElement(arr []portainer.EndpointID, index int) []portainer.EndpointID { - if index < 0 { - return arr - } - lastTagIdx := len(arr) - 1 - arr[index] = arr[lastTagIdx] - return arr[:lastTagIdx] + return slice } diff --git a/api/http/handler/endpoints/endpoint_delete_test.go b/api/http/handler/endpoints/endpoint_delete_test.go new file mode 100644 index 000000000..7865a19c0 --- /dev/null +++ b/api/http/handler/endpoints/endpoint_delete_test.go @@ -0,0 +1,109 @@ +package endpoints + +import ( + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/apikey" + "github.com/portainer/portainer/api/datastore" + "github.com/portainer/portainer/api/demo" + "github.com/portainer/portainer/api/http/proxy" + "github.com/portainer/portainer/api/http/security" + "github.com/portainer/portainer/api/jwt" +) + +func TestEndpointDeleteEdgeGroupsConcurrently(t *testing.T) { + const endpointsCount = 100 + + _, store, teardown := datastore.MustNewTestStore(t, true, false) + defer teardown() + + user := &portainer.User{ID: 2, Username: "admin", Role: portainer.AdministratorRole} + err := store.User().Create(user) + if err != nil { + t.Fatal("could not create admin user:", err) + } + + jwtService, err := jwt.NewService("1h", store) + if err != nil { + t.Fatal("could not initialize the JWT service:", err) + } + + apiKeyService := apikey.NewAPIKeyService(store.APIKeyRepository(), store.User()) + rawAPIKey, _, err := apiKeyService.GenerateApiKey(*user, "test") + if err != nil { + t.Fatal("could not generate API key:", err) + } + + bouncer := security.NewRequestBouncer(store, jwtService, apiKeyService) + + handler := NewHandler(bouncer, demo.NewService()) + handler.DataStore = store + handler.ProxyManager = proxy.NewManager(nil, nil, nil, nil, nil, nil, nil) + + // Create all the environments and add them to the same edge group + + var endpointIDs []portainer.EndpointID + + for i := 0; i < endpointsCount; i++ { + endpointID := portainer.EndpointID(i) + 1 + + err = store.Endpoint().Create(&portainer.Endpoint{ + ID: endpointID, + Name: "env-" + strconv.Itoa(int(endpointID)), + Type: portainer.EdgeAgentOnDockerEnvironment, + }) + if err != nil { + t.Fatal("could not create endpoint:", err) + } + + endpointIDs = append(endpointIDs, endpointID) + } + + err = store.EdgeGroup().Create(&portainer.EdgeGroup{ + ID: 1, + Name: "edgegroup-1", + Endpoints: endpointIDs, + }) + if err != nil { + t.Fatal("could not create edge group:", err) + } + + // Remove the environments concurrently + + var wg sync.WaitGroup + wg.Add(len(endpointIDs)) + + for _, endpointID := range endpointIDs { + go func(ID portainer.EndpointID) { + defer wg.Done() + + req, err := http.NewRequest(http.MethodDelete, "/endpoints/"+strconv.Itoa(int(ID)), nil) + if err != nil { + t.Fail() + return + } + req.Header.Add("X-Api-Key", rawAPIKey) + + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + }(endpointID) + } + + wg.Wait() + + // Check that the edge group is consistent + + edgeGroup, err := handler.DataStore.EdgeGroup().EdgeGroup(1) + if err != nil { + t.Fatal("could not retrieve the edge group:", err) + } + + if len(edgeGroup.Endpoints) > 0 { + t.Fatal("the edge group is not consistent") + } +} diff --git a/api/http/handler/tags/tag_delete.go b/api/http/handler/tags/tag_delete.go index b87a7f3d3..40a0efd9f 100644 --- a/api/http/handler/tags/tag_delete.go +++ b/api/http/handler/tags/tag_delete.go @@ -44,13 +44,10 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe return httperror.InternalServerError("Unable to retrieve environment from the database", err) } - tagIdx := findTagIndex(endpoint.TagIDs, tagID) - if tagIdx != -1 { - endpoint.TagIDs = removeElement(endpoint.TagIDs, tagIdx) - err = handler.DataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint) - if err != nil { - return httperror.InternalServerError("Unable to update environment", err) - } + endpoint.TagIDs = removeElement(endpoint.TagIDs, tagID) + err = handler.DataStore.Endpoint().UpdateEndpoint(endpoint.ID, endpoint) + if err != nil { + return httperror.InternalServerError("Unable to update environment", err) } } @@ -60,13 +57,10 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe return httperror.InternalServerError("Unable to retrieve environment group from the database", err) } - tagIdx := findTagIndex(endpointGroup.TagIDs, tagID) - if tagIdx != -1 { - endpointGroup.TagIDs = removeElement(endpointGroup.TagIDs, tagIdx) - err = handler.DataStore.EndpointGroup().UpdateEndpointGroup(endpointGroup.ID, endpointGroup) - if err != nil { - return httperror.InternalServerError("Unable to update environment group", err) - } + endpointGroup.TagIDs = removeElement(endpointGroup.TagIDs, tagID) + err = handler.DataStore.EndpointGroup().UpdateEndpointGroup(endpointGroup.ID, endpointGroup) + if err != nil { + return httperror.InternalServerError("Unable to update environment group", err) } } @@ -94,15 +88,12 @@ func (handler *Handler) tagDelete(w http.ResponseWriter, r *http.Request) *httpe } } - for idx := range edgeGroups { - edgeGroup := &edgeGroups[idx] - tagIdx := findTagIndex(edgeGroup.TagIDs, tagID) - if tagIdx != -1 { - edgeGroup.TagIDs = removeElement(edgeGroup.TagIDs, tagIdx) - err = handler.DataStore.EdgeGroup().UpdateEdgeGroup(edgeGroup.ID, edgeGroup) - if err != nil { - return httperror.InternalServerError("Unable to update environment group", err) - } + for _, edgeGroup := range edgeGroups { + err = handler.DataStore.EdgeGroup().UpdateEdgeGroupFunc(edgeGroup.ID, func(g *portainer.EdgeGroup) { + g.TagIDs = removeElement(g.TagIDs, tagID) + }) + if err != nil { + return httperror.InternalServerError("Unable to update edge group", err) } } @@ -135,20 +126,14 @@ func (handler *Handler) updateEndpointRelations(endpoint portainer.Endpoint, edg return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, endpointRelation) } -func findTagIndex(tags []portainer.TagID, searchTagID portainer.TagID) int { - for idx, tagID := range tags { - if searchTagID == tagID { - return idx +func removeElement(slice []portainer.TagID, elem portainer.TagID) []portainer.TagID { + for i, id := range slice { + if id == elem { + slice[i] = slice[len(slice)-1] + + return slice[:len(slice)-1] } } - return -1 -} -func removeElement(arr []portainer.TagID, index int) []portainer.TagID { - if index < 0 { - return arr - } - lastTagIdx := len(arr) - 1 - arr[index] = arr[lastTagIdx] - return arr[:lastTagIdx] + return slice } diff --git a/api/http/handler/tags/tag_delete_test.go b/api/http/handler/tags/tag_delete_test.go new file mode 100644 index 000000000..ccb283184 --- /dev/null +++ b/api/http/handler/tags/tag_delete_test.go @@ -0,0 +1,105 @@ +package tags + +import ( + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/apikey" + "github.com/portainer/portainer/api/datastore" + "github.com/portainer/portainer/api/http/security" + "github.com/portainer/portainer/api/jwt" +) + +func TestTagDeleteEdgeGroupsConcurrently(t *testing.T) { + const tagsCount = 100 + + _, store, teardown := datastore.MustNewTestStore(t, true, false) + defer teardown() + + user := &portainer.User{ID: 2, Username: "admin", Role: portainer.AdministratorRole} + err := store.User().Create(user) + if err != nil { + t.Fatal("could not create admin user:", err) + } + + jwtService, err := jwt.NewService("1h", store) + if err != nil { + t.Fatal("could not initialize the JWT service:", err) + } + + apiKeyService := apikey.NewAPIKeyService(store.APIKeyRepository(), store.User()) + rawAPIKey, _, err := apiKeyService.GenerateApiKey(*user, "test") + if err != nil { + t.Fatal("could not generate API key:", err) + } + + bouncer := security.NewRequestBouncer(store, jwtService, apiKeyService) + + handler := NewHandler(bouncer) + handler.DataStore = store + + // Create all the tags and add them to the same edge group + + var tagIDs []portainer.TagID + + for i := 0; i < tagsCount; i++ { + tagID := portainer.TagID(i) + 1 + + err = store.Tag().Create(&portainer.Tag{ + ID: tagID, + Name: "tag-" + strconv.Itoa(int(tagID)), + }) + if err != nil { + t.Fatal("could not create tag:", err) + } + + tagIDs = append(tagIDs, tagID) + } + + err = store.EdgeGroup().Create(&portainer.EdgeGroup{ + ID: 1, + Name: "edgegroup-1", + TagIDs: tagIDs, + }) + if err != nil { + t.Fatal("could not create edge group:", err) + } + + // Remove the tags concurrently + + var wg sync.WaitGroup + wg.Add(len(tagIDs)) + + for _, tagID := range tagIDs { + go func(ID portainer.TagID) { + defer wg.Done() + + req, err := http.NewRequest(http.MethodDelete, "/tags/"+strconv.Itoa(int(ID)), nil) + if err != nil { + t.Fail() + return + } + req.Header.Add("X-Api-Key", rawAPIKey) + + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + }(tagID) + } + + wg.Wait() + + // Check that the edge group is consistent + + edgeGroup, err := handler.DataStore.EdgeGroup().EdgeGroup(1) + if err != nil { + t.Fatal("could not retrieve the edge group:", err) + } + + if len(edgeGroup.TagIDs) > 0 { + t.Fatal("the edge group is not consistent") + } +} diff --git a/api/http/proxy/manager.go b/api/http/proxy/manager.go index e077e39c6..1795eeb64 100644 --- a/api/http/proxy/manager.go +++ b/api/http/proxy/manager.go @@ -66,7 +66,10 @@ func (manager *Manager) GetEndpointProxy(endpoint *portainer.Endpoint) http.Hand // is currently only called for edge connection clean up and when endpoint is updated func (manager *Manager) DeleteEndpointProxy(endpointID portainer.EndpointID) { manager.endpointProxies.Remove(fmt.Sprint(endpointID)) - manager.k8sClientFactory.RemoveKubeClient(endpointID) + + if manager.k8sClientFactory != nil { + manager.k8sClientFactory.RemoveKubeClient(endpointID) + } } // CreateGitlabProxy creates a new HTTP reverse proxy that can be used to send requests to the Gitlab API