From 75f165d1fff4561ed287e3123b1565adce40183c Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Thu, 5 Jun 2025 19:46:10 -0300 Subject: [PATCH] feat(edgestackstatus): optimize the Edge Stack structures BE-11740 (#756) --- .../edgestackstatus/edgestackstatus.go | 89 ++++++++++ api/dataservices/edgestackstatus/tx.go | 95 +++++++++++ api/dataservices/interface.go | 15 +- api/datastore/migrate_data.go | 10 +- api/datastore/migrator/migrate_2_31_0.go | 31 ++++ api/datastore/migrator/migrator.go | 9 +- api/datastore/services.go | 14 ++ api/datastore/services_tx.go | 4 + .../test_data/output_24_to_latest.json | 1 + .../edgestacks/edgestack_create_file.go | 3 +- .../edgestacks/edgestack_create_git.go | 12 +- .../edgestacks/edgestack_create_string.go | 10 +- .../edgestacks/edgestack_create_test.go | 31 ++-- .../handler/edgestacks/edgestack_delete.go | 5 +- .../edgestacks/edgestack_delete_test.go | 33 ++-- .../handler/edgestacks/edgestack_inspect.go | 31 ++++ api/http/handler/edgestacks/edgestack_list.go | 6 + .../edgestacks/edgestack_status_update.go | 59 +++---- .../edgestack_status_update_coordinator.go | 155 ------------------ .../edgestack_status_update_test.go | 31 +--- api/http/handler/edgestacks/edgestack_test.go | 42 ++--- .../handler/edgestacks/edgestack_update.go | 6 +- .../edgestacks/edgestack_update_test.go | 40 ++--- api/http/handler/edgestacks/handler.go | 4 +- .../edgestacks/utils_update_stack_version.go | 11 +- .../endpointedge_status_inspect_test.go | 7 +- api/http/handler/endpoints/endpoint_delete.go | 11 +- api/http/handler/endpoints/filter.go | 17 +- api/http/server.go | 5 +- api/internal/edge/edgestacks/service.go | 13 +- api/internal/edge/edgestacks/status.go | 26 --- api/internal/testhelpers/datastore.go | 8 +- api/portainer.go | 9 + 33 files changed, 452 insertions(+), 391 deletions(-) create mode 100644 api/dataservices/edgestackstatus/edgestackstatus.go create mode 100644 api/dataservices/edgestackstatus/tx.go create mode 100644 api/datastore/migrator/migrate_2_31_0.go delete mode 100644 api/http/handler/edgestacks/edgestack_status_update_coordinator.go delete mode 100644 api/internal/edge/edgestacks/status.go diff --git a/api/dataservices/edgestackstatus/edgestackstatus.go b/api/dataservices/edgestackstatus/edgestackstatus.go new file mode 100644 index 000000000..7d063ba49 --- /dev/null +++ b/api/dataservices/edgestackstatus/edgestackstatus.go @@ -0,0 +1,89 @@ +package edgestackstatus + +import ( + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" +) + +var _ dataservices.EdgeStackStatusService = &Service{} + +const BucketName = "edge_stack_status" + +type Service struct { + conn portainer.Connection +} + +func (service *Service) BucketName() string { + return BucketName +} + +func NewService(connection portainer.Connection) (*Service, error) { + if err := connection.SetServiceName(BucketName); err != nil { + return nil, err + } + + return &Service{conn: connection}, nil +} + +func (s *Service) Tx(tx portainer.Transaction) ServiceTx { + return ServiceTx{ + service: s, + tx: tx, + } +} + +func (s *Service) Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error { + return s.conn.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Create(edgeStackID, endpointID, status) + }) +} + +func (s *Service) Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error) { + var element *portainer.EdgeStackStatusForEnv + + return element, s.conn.ViewTx(func(tx portainer.Transaction) error { + var err error + element, err = s.Tx(tx).Read(edgeStackID, endpointID) + + return err + }) +} + +func (s *Service) ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error) { + var collection = make([]portainer.EdgeStackStatusForEnv, 0) + + return collection, s.conn.ViewTx(func(tx portainer.Transaction) error { + var err error + collection, err = s.Tx(tx).ReadAll(edgeStackID) + + return err + }) +} + +func (s *Service) Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error { + return s.conn.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Update(edgeStackID, endpointID, status) + }) +} + +func (s *Service) Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error { + return s.conn.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Delete(edgeStackID, endpointID) + }) +} + +func (s *Service) DeleteAll(edgeStackID portainer.EdgeStackID) error { + return s.conn.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).DeleteAll(edgeStackID) + }) +} + +func (s *Service) Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error { + return s.conn.UpdateTx(func(tx portainer.Transaction) error { + return s.Tx(tx).Clear(edgeStackID, relatedEnvironmentsIDs) + }) +} + +func (s *Service) key(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) []byte { + return append(s.conn.ConvertToKey(int(edgeStackID)), s.conn.ConvertToKey(int(endpointID))...) +} diff --git a/api/dataservices/edgestackstatus/tx.go b/api/dataservices/edgestackstatus/tx.go new file mode 100644 index 000000000..b0dc14856 --- /dev/null +++ b/api/dataservices/edgestackstatus/tx.go @@ -0,0 +1,95 @@ +package edgestackstatus + +import ( + "fmt" + + portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" +) + +var _ dataservices.EdgeStackStatusService = &Service{} + +type ServiceTx struct { + service *Service + tx portainer.Transaction +} + +func (service ServiceTx) Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error { + identifier := service.service.key(edgeStackID, endpointID) + return service.tx.CreateObjectWithStringId(BucketName, identifier, status) +} + +func (s ServiceTx) Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error) { + var status portainer.EdgeStackStatusForEnv + identifier := s.service.key(edgeStackID, endpointID) + + if err := s.tx.GetObject(BucketName, identifier, &status); err != nil { + return nil, err + } + + return &status, nil +} + +func (s ServiceTx) ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error) { + keyPrefix := s.service.conn.ConvertToKey(int(edgeStackID)) + + statuses := make([]portainer.EdgeStackStatusForEnv, 0) + + if err := s.tx.GetAllWithKeyPrefix(BucketName, keyPrefix, &portainer.EdgeStackStatusForEnv{}, dataservices.AppendFn(&statuses)); err != nil { + return nil, fmt.Errorf("unable to retrieve EdgeStackStatus for EdgeStack %d: %w", edgeStackID, err) + } + + return statuses, nil +} + +func (s ServiceTx) Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error { + identifier := s.service.key(edgeStackID, endpointID) + return s.tx.UpdateObject(BucketName, identifier, status) +} + +func (s ServiceTx) Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error { + identifier := s.service.key(edgeStackID, endpointID) + return s.tx.DeleteObject(BucketName, identifier) +} + +func (s ServiceTx) DeleteAll(edgeStackID portainer.EdgeStackID) error { + keyPrefix := s.service.conn.ConvertToKey(int(edgeStackID)) + + statuses := make([]portainer.EdgeStackStatusForEnv, 0) + + if err := s.tx.GetAllWithKeyPrefix(BucketName, keyPrefix, &portainer.EdgeStackStatusForEnv{}, dataservices.AppendFn(&statuses)); err != nil { + return fmt.Errorf("unable to retrieve EdgeStackStatus for EdgeStack %d: %w", edgeStackID, err) + } + + for _, status := range statuses { + if err := s.tx.DeleteObject(BucketName, s.service.key(edgeStackID, status.EndpointID)); err != nil { + return fmt.Errorf("unable to delete EdgeStackStatus for EdgeStack %d and Endpoint %d: %w", edgeStackID, status.EndpointID, err) + } + } + + return nil +} + +func (s ServiceTx) Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error { + for _, envID := range relatedEnvironmentsIDs { + existingStatus, err := s.Read(edgeStackID, envID) + if err != nil && !dataservices.IsErrObjectNotFound(err) { + return fmt.Errorf("unable to retrieve status for environment %d: %w", envID, err) + } + + var deploymentInfo portainer.StackDeploymentInfo + if existingStatus != nil { + deploymentInfo = existingStatus.DeploymentInfo + } + + if err := s.Update(edgeStackID, envID, &portainer.EdgeStackStatusForEnv{ + EndpointID: envID, + Status: []portainer.EdgeStackDeploymentStatus{}, + DeploymentInfo: deploymentInfo, + }); err != nil { + return err + } + } + + return nil +} diff --git a/api/dataservices/interface.go b/api/dataservices/interface.go index 8ba55531c..d330d4959 100644 --- a/api/dataservices/interface.go +++ b/api/dataservices/interface.go @@ -12,6 +12,7 @@ type ( EdgeGroup() EdgeGroupService EdgeJob() EdgeJobService EdgeStack() EdgeStackService + EdgeStackStatus() EdgeStackStatusService Endpoint() EndpointService EndpointGroup() EndpointGroupService EndpointRelation() EndpointRelationService @@ -39,8 +40,8 @@ type ( Open() (newStore bool, err error) Init() error Close() error - UpdateTx(func(DataStoreTx) error) error - ViewTx(func(DataStoreTx) error) error + UpdateTx(func(tx DataStoreTx) error) error + ViewTx(func(tx DataStoreTx) error) error MigrateData() error Rollback(force bool) error CheckCurrentEdition() error @@ -89,6 +90,16 @@ type ( BucketName() string } + EdgeStackStatusService interface { + Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error + Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error) + ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error) + Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error + Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error + DeleteAll(edgeStackID portainer.EdgeStackID) error + Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error + } + // EndpointService represents a service for managing environment(endpoint) data EndpointService interface { Endpoint(ID portainer.EndpointID) (*portainer.Endpoint, error) diff --git a/api/datastore/migrate_data.go b/api/datastore/migrate_data.go index 8047274d1..409936db8 100644 --- a/api/datastore/migrate_data.go +++ b/api/datastore/migrate_data.go @@ -40,13 +40,11 @@ func (store *Store) MigrateData() error { } // before we alter anything in the DB, create a backup - _, err = store.Backup("") - if err != nil { + if _, err := store.Backup(""); err != nil { return errors.Wrap(err, "while backing up database") } - err = store.FailSafeMigrate(migrator, version) - if err != nil { + if err := store.FailSafeMigrate(migrator, version); err != nil { err = errors.Wrap(err, "failed to migrate database") log.Warn().Err(err).Msg("migration failed, restoring database to previous version") @@ -85,6 +83,7 @@ func (store *Store) newMigratorParameters(version *models.Version, flags *portai DockerhubService: store.DockerHubService, AuthorizationService: authorization.NewService(store), EdgeStackService: store.EdgeStackService, + EdgeStackStatusService: store.EdgeStackStatusService, EdgeJobService: store.EdgeJobService, TunnelServerService: store.TunnelServerService, PendingActionsService: store.PendingActionsService, @@ -140,8 +139,7 @@ func (store *Store) connectionRollback(force bool) error { } } - err := store.Restore() - if err != nil { + if err := store.Restore(); err != nil { return err } diff --git a/api/datastore/migrator/migrate_2_31_0.go b/api/datastore/migrator/migrate_2_31_0.go new file mode 100644 index 000000000..7afea9802 --- /dev/null +++ b/api/datastore/migrator/migrate_2_31_0.go @@ -0,0 +1,31 @@ +package migrator + +import portainer "github.com/portainer/portainer/api" + +func (m *Migrator) migrateEdgeStacksStatuses_2_31_0() error { + edgeStacks, err := m.edgeStackService.EdgeStacks() + if err != nil { + return err + } + + for _, edgeStack := range edgeStacks { + for envID, status := range edgeStack.Status { + if err := m.edgeStackStatusService.Create(edgeStack.ID, envID, &portainer.EdgeStackStatusForEnv{ + EndpointID: envID, + Status: status.Status, + DeploymentInfo: status.DeploymentInfo, + ReadyRePullImage: status.ReadyRePullImage, + }); err != nil { + return err + } + } + + edgeStack.Status = nil + + if err := m.edgeStackService.UpdateEdgeStack(edgeStack.ID, &edgeStack); err != nil { + return err + } + } + + return nil +} diff --git a/api/datastore/migrator/migrator.go b/api/datastore/migrator/migrator.go index dc92006ad..992dd0b9d 100644 --- a/api/datastore/migrator/migrator.go +++ b/api/datastore/migrator/migrator.go @@ -3,12 +3,12 @@ package migrator import ( "errors" - "github.com/Masterminds/semver" portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/database/models" "github.com/portainer/portainer/api/dataservices/dockerhub" "github.com/portainer/portainer/api/dataservices/edgejob" "github.com/portainer/portainer/api/dataservices/edgestack" + "github.com/portainer/portainer/api/dataservices/edgestackstatus" "github.com/portainer/portainer/api/dataservices/endpoint" "github.com/portainer/portainer/api/dataservices/endpointgroup" "github.com/portainer/portainer/api/dataservices/endpointrelation" @@ -27,6 +27,8 @@ import ( "github.com/portainer/portainer/api/dataservices/user" "github.com/portainer/portainer/api/dataservices/version" "github.com/portainer/portainer/api/internal/authorization" + + "github.com/Masterminds/semver" "github.com/rs/zerolog/log" ) @@ -56,6 +58,7 @@ type ( authorizationService *authorization.Service dockerhubService *dockerhub.Service edgeStackService *edgestack.Service + edgeStackStatusService *edgestackstatus.Service edgeJobService *edgejob.Service TunnelServerService *tunnelserver.Service pendingActionsService *pendingactions.Service @@ -84,6 +87,7 @@ type ( AuthorizationService *authorization.Service DockerhubService *dockerhub.Service EdgeStackService *edgestack.Service + EdgeStackStatusService *edgestackstatus.Service EdgeJobService *edgejob.Service TunnelServerService *tunnelserver.Service PendingActionsService *pendingactions.Service @@ -114,6 +118,7 @@ func NewMigrator(parameters *MigratorParameters) *Migrator { authorizationService: parameters.AuthorizationService, dockerhubService: parameters.DockerhubService, edgeStackService: parameters.EdgeStackService, + edgeStackStatusService: parameters.EdgeStackStatusService, edgeJobService: parameters.EdgeJobService, TunnelServerService: parameters.TunnelServerService, pendingActionsService: parameters.PendingActionsService, @@ -242,6 +247,8 @@ func (m *Migrator) initMigrations() { m.migratePendingActionsDataForDB130, ) + m.addMigrations("2.31.0", m.migrateEdgeStacksStatuses_2_31_0) + // Add new migrations above... // One function per migration, each versions migration funcs in the same file. } diff --git a/api/datastore/services.go b/api/datastore/services.go index b5363afe9..b0570fa67 100644 --- a/api/datastore/services.go +++ b/api/datastore/services.go @@ -13,6 +13,7 @@ import ( "github.com/portainer/portainer/api/dataservices/edgegroup" "github.com/portainer/portainer/api/dataservices/edgejob" "github.com/portainer/portainer/api/dataservices/edgestack" + "github.com/portainer/portainer/api/dataservices/edgestackstatus" "github.com/portainer/portainer/api/dataservices/endpoint" "github.com/portainer/portainer/api/dataservices/endpointgroup" "github.com/portainer/portainer/api/dataservices/endpointrelation" @@ -39,6 +40,8 @@ import ( "github.com/segmentio/encoding/json" ) +var _ dataservices.DataStore = &Store{} + // Store defines the implementation of portainer.DataStore using // BoltDB as the storage system. type Store struct { @@ -51,6 +54,7 @@ type Store struct { EdgeGroupService *edgegroup.Service EdgeJobService *edgejob.Service EdgeStackService *edgestack.Service + EdgeStackStatusService *edgestackstatus.Service EndpointGroupService *endpointgroup.Service EndpointService *endpoint.Service EndpointRelationService *endpointrelation.Service @@ -109,6 +113,12 @@ func (store *Store) initServices() error { store.EdgeStackService = edgeStackService endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFunc, edgeStackService.UpdateEdgeStackFuncTx) + edgeStackStatusService, err := edgestackstatus.NewService(store.connection) + if err != nil { + return err + } + store.EdgeStackStatusService = edgeStackStatusService + edgeGroupService, err := edgegroup.NewService(store.connection) if err != nil { return err @@ -269,6 +279,10 @@ func (store *Store) EdgeStack() dataservices.EdgeStackService { return store.EdgeStackService } +func (store *Store) EdgeStackStatus() dataservices.EdgeStackStatusService { + return store.EdgeStackStatusService +} + // Environment(Endpoint) gives access to the Environment(Endpoint) data management layer func (store *Store) Endpoint() dataservices.EndpointService { return store.EndpointService diff --git a/api/datastore/services_tx.go b/api/datastore/services_tx.go index ddedf20cc..cf9f868f4 100644 --- a/api/datastore/services_tx.go +++ b/api/datastore/services_tx.go @@ -32,6 +32,10 @@ func (tx *StoreTx) EdgeStack() dataservices.EdgeStackService { return tx.store.EdgeStackService.Tx(tx.tx) } +func (tx *StoreTx) EdgeStackStatus() dataservices.EdgeStackStatusService { + return tx.store.EdgeStackStatusService.Tx(tx.tx) +} + func (tx *StoreTx) Endpoint() dataservices.EndpointService { return tx.store.EndpointService.Tx(tx.tx) } diff --git a/api/datastore/test_data/output_24_to_latest.json b/api/datastore/test_data/output_24_to_latest.json index 40971c519..41494fbee 100644 --- a/api/datastore/test_data/output_24_to_latest.json +++ b/api/datastore/test_data/output_24_to_latest.json @@ -8,6 +8,7 @@ } ], "edge_stack": null, + "edge_stack_status": null, "edgegroups": null, "edgejobs": null, "endpoint_groups": [ diff --git a/api/http/handler/edgestacks/edgestack_create_file.go b/api/http/handler/edgestacks/edgestack_create_file.go index 555418835..a0bc2995f 100644 --- a/api/http/handler/edgestacks/edgestack_create_file.go +++ b/api/http/handler/edgestacks/edgestack_create_file.go @@ -101,8 +101,7 @@ func (payload *edgeStackFromFileUploadPayload) Validate(r *http.Request) error { // @router /edge_stacks/create/file [post] func (handler *Handler) createEdgeStackFromFileUpload(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) { payload := &edgeStackFromFileUploadPayload{} - err := payload.Validate(r) - if err != nil { + if err := payload.Validate(r); err != nil { return nil, err } diff --git a/api/http/handler/edgestacks/edgestack_create_git.go b/api/http/handler/edgestacks/edgestack_create_git.go index 2da816481..d20e5b5c2 100644 --- a/api/http/handler/edgestacks/edgestack_create_git.go +++ b/api/http/handler/edgestacks/edgestack_create_git.go @@ -103,8 +103,7 @@ func (payload *edgeStackFromGitRepositoryPayload) Validate(r *http.Request) erro // @router /edge_stacks/create/repository [post] func (handler *Handler) createEdgeStackFromGitRepository(r *http.Request, tx dataservices.DataStoreTx, dryrun bool, userID portainer.UserID) (*portainer.EdgeStack, error) { var payload edgeStackFromGitRepositoryPayload - err := request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { + if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil { return nil, err } @@ -137,11 +136,9 @@ func (handler *Handler) createEdgeStackFromGitRepository(r *http.Request, tx dat } func (handler *Handler) storeManifestFromGitRepository(tx dataservices.DataStoreTx, stackFolder string, relatedEndpointIds []portainer.EndpointID, deploymentType portainer.EdgeStackDeploymentType, currentUserID portainer.UserID, repositoryConfig gittypes.RepoConfig) (composePath, manifestPath, projectPath string, err error) { - hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType) - if err != nil { + if hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType); err != nil { return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err) - } - if hasWrongType { + } else if hasWrongType { return "", "", "", errors.New("edge stack with config do not match the environment type") } @@ -153,8 +150,7 @@ func (handler *Handler) storeManifestFromGitRepository(tx dataservices.DataStore repositoryPassword = repositoryConfig.Authentication.Password } - err = handler.GitService.CloneRepository(projectPath, repositoryConfig.URL, repositoryConfig.ReferenceName, repositoryUsername, repositoryPassword, repositoryConfig.TLSSkipVerify) - if err != nil { + if err := handler.GitService.CloneRepository(projectPath, repositoryConfig.URL, repositoryConfig.ReferenceName, repositoryUsername, repositoryPassword, repositoryConfig.TLSSkipVerify); err != nil { return "", "", "", err } diff --git a/api/http/handler/edgestacks/edgestack_create_string.go b/api/http/handler/edgestacks/edgestack_create_string.go index 556633fae..5e3fb57b8 100644 --- a/api/http/handler/edgestacks/edgestack_create_string.go +++ b/api/http/handler/edgestacks/edgestack_create_string.go @@ -76,8 +76,7 @@ func (payload *edgeStackFromStringPayload) Validate(r *http.Request) error { // @router /edge_stacks/create/string [post] func (handler *Handler) createEdgeStackFromFileContent(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) { var payload edgeStackFromStringPayload - err := request.DecodeAndValidateJSONPayload(r, &payload) - if err != nil { + if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil { return nil, err } @@ -96,11 +95,9 @@ func (handler *Handler) createEdgeStackFromFileContent(r *http.Request, tx datas } func (handler *Handler) storeFileContent(tx dataservices.DataStoreTx, stackFolder string, deploymentType portainer.EdgeStackDeploymentType, relatedEndpointIds []portainer.EndpointID, fileContent []byte) (composePath, manifestPath, projectPath string, err error) { - hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType) - if err != nil { + if hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType); err != nil { return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err) - } - if hasWrongType { + } else if hasWrongType { return "", "", "", errors.New("edge stack with config do not match the environment type") } @@ -124,7 +121,6 @@ func (handler *Handler) storeFileContent(tx dataservices.DataStoreTx, stackFolde } return "", manifestPath, projectPath, nil - } errMessage := fmt.Sprintf("invalid deployment type: %d", deploymentType) diff --git a/api/http/handler/edgestacks/edgestack_create_test.go b/api/http/handler/edgestacks/edgestack_create_test.go index 32158d300..486cc09d0 100644 --- a/api/http/handler/edgestacks/edgestack_create_test.go +++ b/api/http/handler/edgestacks/edgestack_create_test.go @@ -8,6 +8,7 @@ import ( "testing" portainer "github.com/portainer/portainer/api" + "github.com/stretchr/testify/require" "github.com/segmentio/encoding/json" ) @@ -28,9 +29,7 @@ func TestCreateAndInspect(t *testing.T) { } err := handler.DataStore.EdgeGroup().Create(&edgeGroup) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) endpointRelation := portainer.EndpointRelation{ EndpointID: endpoint.ID, @@ -38,9 +37,7 @@ func TestCreateAndInspect(t *testing.T) { } err = handler.DataStore.EndpointRelation().Create(&endpointRelation) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) payload := edgeStackFromStringPayload{ Name: "test-stack", @@ -50,16 +47,14 @@ func TestCreateAndInspect(t *testing.T) { } jsonPayload, err := json.Marshal(payload) - if err != nil { - t.Fatal("JSON marshal error:", err) - } + require.NoError(t, err) + r := bytes.NewBuffer(jsonPayload) // Create EdgeStack req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", r) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) + req.Header.Add("x-api-key", rawAPIKey) rec := httptest.NewRecorder() handler.ServeHTTP(rec, req) @@ -70,15 +65,11 @@ func TestCreateAndInspect(t *testing.T) { data := portainer.EdgeStack{} err = json.NewDecoder(rec.Body).Decode(&data) - if err != nil { - t.Fatal("error decoding response:", err) - } + require.NoError(t, err) // Inspect req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", data.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() @@ -90,9 +81,7 @@ func TestCreateAndInspect(t *testing.T) { data = portainer.EdgeStack{} err = json.NewDecoder(rec.Body).Decode(&data) - if err != nil { - t.Fatal("error decoding response:", err) - } + require.NoError(t, err) if payload.Name != data.Name { t.Fatalf("expected EdgeStack Name %s, found %s", payload.Name, data.Name) diff --git a/api/http/handler/edgestacks/edgestack_delete.go b/api/http/handler/edgestacks/edgestack_delete.go index 3d71f2bce..0e6307684 100644 --- a/api/http/handler/edgestacks/edgestack_delete.go +++ b/api/http/handler/edgestacks/edgestack_delete.go @@ -30,10 +30,9 @@ func (handler *Handler) edgeStackDelete(w http.ResponseWriter, r *http.Request) return httperror.BadRequest("Invalid edge stack identifier route variable", err) } - err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { return handler.deleteEdgeStack(tx, portainer.EdgeStackID(edgeStackID)) - }) - if err != nil { + }); err != nil { var httpErr *httperror.HandlerError if errors.As(err, &httpErr) { return httpErr diff --git a/api/http/handler/edgestacks/edgestack_delete_test.go b/api/http/handler/edgestacks/edgestack_delete_test.go index ef25ae45c..ca334c7ce 100644 --- a/api/http/handler/edgestacks/edgestack_delete_test.go +++ b/api/http/handler/edgestacks/edgestack_delete_test.go @@ -8,9 +8,10 @@ import ( "testing" portainer "github.com/portainer/portainer/api" - "github.com/stretchr/testify/assert" "github.com/segmentio/encoding/json" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Delete @@ -23,9 +24,7 @@ func TestDeleteAndInspect(t *testing.T) { // Inspect req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec := httptest.NewRecorder() @@ -37,9 +36,7 @@ func TestDeleteAndInspect(t *testing.T) { data := portainer.EdgeStack{} err = json.NewDecoder(rec.Body).Decode(&data) - if err != nil { - t.Fatal("error decoding response:", err) - } + require.NoError(t, err) if data.ID != edgeStack.ID { t.Fatalf("expected EdgeStackID %d, found %d", int(edgeStack.ID), data.ID) @@ -47,9 +44,7 @@ func TestDeleteAndInspect(t *testing.T) { // Delete req, err = http.NewRequest(http.MethodDelete, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() @@ -61,9 +56,7 @@ func TestDeleteAndInspect(t *testing.T) { // Inspect req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() @@ -117,15 +110,12 @@ func TestDeleteEdgeStack_RemoveProjectFolder(t *testing.T) { } var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(payload); err != nil { - t.Fatal("error encoding payload:", err) - } + err := json.NewEncoder(&buf).Encode(payload) + require.NoError(t, err) // Create req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", &buf) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec := httptest.NewRecorder() @@ -138,9 +128,8 @@ func TestDeleteEdgeStack_RemoveProjectFolder(t *testing.T) { assert.DirExists(t, handler.FileService.GetEdgeStackProjectPath("1")) // Delete - if req, err = http.NewRequest(http.MethodDelete, "/edge_stacks/1", nil); err != nil { - t.Fatal("request error:", err) - } + req, err = http.NewRequest(http.MethodDelete, "/edge_stacks/1", nil) + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() diff --git a/api/http/handler/edgestacks/edgestack_inspect.go b/api/http/handler/edgestacks/edgestack_inspect.go index 06c118835..2936f320e 100644 --- a/api/http/handler/edgestacks/edgestack_inspect.go +++ b/api/http/handler/edgestacks/edgestack_inspect.go @@ -4,6 +4,7 @@ import ( "net/http" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" @@ -33,5 +34,35 @@ func (handler *Handler) edgeStackInspect(w http.ResponseWriter, r *http.Request) return handlerDBErr(err, "Unable to find an edge stack with the specified identifier inside the database") } + if err := fillEdgeStackStatus(handler.DataStore, edgeStack); err != nil { + return handlerDBErr(err, "Unable to retrieve edge stack status from the database") + } + return response.JSON(w, edgeStack) } + +func fillEdgeStackStatus(tx dataservices.DataStoreTx, edgeStack *portainer.EdgeStack) error { + status, err := tx.EdgeStackStatus().ReadAll(edgeStack.ID) + if err != nil { + return err + } + + edgeStack.Status = make(map[portainer.EndpointID]portainer.EdgeStackStatus, len(status)) + + emptyStatus := make([]portainer.EdgeStackDeploymentStatus, 0) + + for _, s := range status { + if s.Status == nil { + s.Status = emptyStatus + } + + edgeStack.Status[s.EndpointID] = portainer.EdgeStackStatus{ + Status: s.Status, + EndpointID: s.EndpointID, + DeploymentInfo: s.DeploymentInfo, + ReadyRePullImage: s.ReadyRePullImage, + } + } + + return nil +} diff --git a/api/http/handler/edgestacks/edgestack_list.go b/api/http/handler/edgestacks/edgestack_list.go index 26fd7da05..b0df238c3 100644 --- a/api/http/handler/edgestacks/edgestack_list.go +++ b/api/http/handler/edgestacks/edgestack_list.go @@ -25,5 +25,11 @@ func (handler *Handler) edgeStackList(w http.ResponseWriter, r *http.Request) *h return httperror.InternalServerError("Unable to retrieve edge stacks from the database", err) } + for i := range edgeStacks { + if err := fillEdgeStackStatus(handler.DataStore, &edgeStacks[i]); err != nil { + return handlerDBErr(err, "Unable to retrieve edge stack status from the database") + } + } + return response.JSON(w, edgeStacks) } diff --git a/api/http/handler/edgestacks/edgestack_status_update.go b/api/http/handler/edgestacks/edgestack_status_update.go index fef5a6927..4f99e7ab3 100644 --- a/api/http/handler/edgestacks/edgestack_status_update.go +++ b/api/http/handler/edgestacks/edgestack_status_update.go @@ -9,11 +9,10 @@ import ( "time" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" httperror "github.com/portainer/portainer/pkg/libhttp/error" "github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/response" - - "github.com/rs/zerolog/log" ) type updateStatusPayload struct { @@ -78,12 +77,25 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name)) } - updateFn := func(stack *portainer.EdgeStack) (*portainer.EdgeStack, error) { - return handler.updateEdgeStackStatus(stack, stack.ID, payload) - } + var stack *portainer.EdgeStack - stack, err := handler.stackCoordinator.UpdateStatus(r, portainer.EdgeStackID(stackID), updateFn) - if err != nil { + if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + var err error + stack, err = tx.EdgeStack().EdgeStack(portainer.EdgeStackID(stackID)) + if err != nil { + if dataservices.IsErrObjectNotFound(err) { + return nil + } + + return httperror.InternalServerError("Unable to retrieve Edge stack from the database", err) + } + + if err := handler.updateEdgeStackStatus(tx, stack, stack.ID, payload); err != nil { + return httperror.InternalServerError("Unable to update Edge stack status", err) + } + + return nil + }); err != nil { var httpErr *httperror.HandlerError if errors.As(err, &httpErr) { return httpErr @@ -96,43 +108,34 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req return nil } + if err := fillEdgeStackStatus(handler.DataStore, stack); err != nil { + return handlerDBErr(err, "Unable to retrieve edge stack status from the database") + } + return response.JSON(w, stack) } -func (handler *Handler) updateEdgeStackStatus(stack *portainer.EdgeStack, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) { +func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, stack *portainer.EdgeStack, stackID portainer.EdgeStackID, payload updateStatusPayload) error { if payload.Version > 0 && payload.Version < stack.Version { - return stack, nil + return nil } status := *payload.Status - log.Debug(). - Int("stackID", int(stackID)). - Int("status", int(status)). - Msg("Updating stack status") - deploymentStatus := portainer.EdgeStackDeploymentStatus{ Type: status, Error: payload.Error, Time: payload.Time, } - updateEnvStatus(payload.EndpointID, stack, deploymentStatus) - - return stack, nil -} - -func updateEnvStatus(environmentId portainer.EndpointID, stack *portainer.EdgeStack, deploymentStatus portainer.EdgeStackDeploymentStatus) { if deploymentStatus.Type == portainer.EdgeStackStatusRemoved { - delete(stack.Status, environmentId) - - return + return tx.EdgeStackStatus().Delete(stackID, payload.EndpointID) } - environmentStatus, ok := stack.Status[environmentId] - if !ok { - environmentStatus = portainer.EdgeStackStatus{ - EndpointID: environmentId, + environmentStatus, err := tx.EdgeStackStatus().Read(stackID, payload.EndpointID) + if err != nil { + environmentStatus = &portainer.EdgeStackStatusForEnv{ + EndpointID: payload.EndpointID, Status: []portainer.EdgeStackDeploymentStatus{}, } } @@ -143,5 +146,5 @@ func updateEnvStatus(environmentId portainer.EndpointID, stack *portainer.EdgeSt environmentStatus.Status = append(environmentStatus.Status, deploymentStatus) } - stack.Status[environmentId] = environmentStatus + return tx.EdgeStackStatus().Update(stackID, payload.EndpointID, environmentStatus) } diff --git a/api/http/handler/edgestacks/edgestack_status_update_coordinator.go b/api/http/handler/edgestacks/edgestack_status_update_coordinator.go deleted file mode 100644 index 885b4c6da..000000000 --- a/api/http/handler/edgestacks/edgestack_status_update_coordinator.go +++ /dev/null @@ -1,155 +0,0 @@ -package edgestacks - -import ( - "errors" - "fmt" - "net/http" - - portainer "github.com/portainer/portainer/api" - "github.com/portainer/portainer/api/dataservices" - - "github.com/rs/zerolog/log" -) - -type statusRequest struct { - respCh chan statusResponse - stackID portainer.EdgeStackID - updateFn statusUpdateFn -} - -type statusResponse struct { - Stack *portainer.EdgeStack - Error error -} - -type statusUpdateFn func(*portainer.EdgeStack) (*portainer.EdgeStack, error) - -type EdgeStackStatusUpdateCoordinator struct { - updateCh chan statusRequest - dataStore dataservices.DataStore -} - -var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress") - -func NewEdgeStackStatusUpdateCoordinator(dataStore dataservices.DataStore) *EdgeStackStatusUpdateCoordinator { - return &EdgeStackStatusUpdateCoordinator{ - updateCh: make(chan statusRequest), - dataStore: dataStore, - } -} - -func (c *EdgeStackStatusUpdateCoordinator) Start() { - for { - c.loop() - } -} - -func (c *EdgeStackStatusUpdateCoordinator) loop() { - u := <-c.updateCh - - respChs := []chan statusResponse{u.respCh} - - var stack *portainer.EdgeStack - - err := c.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { - // 1. Load the edge stack - var err error - - stack, err = loadEdgeStack(tx, u.stackID) - if err != nil { - return err - } - - // Return early when the agent tries to update the status on a deleted stack - if stack == nil { - return nil - } - - // 2. Mutate the edge stack opportunistically until there are no more pending updates - for { - stack, err = u.updateFn(stack) - if err != nil { - return err - } - - if m, ok := c.getNextUpdate(stack.ID); ok { - u = m - } else { - break - } - - respChs = append(respChs, u.respCh) - } - - // 3. Save the changes back to the database - if err := tx.EdgeStack().UpdateEdgeStack(stack.ID, stack); err != nil { - return handlerDBErr(fmt.Errorf("unable to update Edge stack: %w.", err), "Unable to persist the stack changes inside the database") - } - - return nil - }) - - // 4. Send back the responses - for _, ch := range respChs { - ch <- statusResponse{Stack: stack, Error: err} - } -} - -func loadEdgeStack(tx dataservices.DataStoreTx, stackID portainer.EdgeStackID) (*portainer.EdgeStack, error) { - stack, err := tx.EdgeStack().EdgeStack(stackID) - if err != nil { - if dataservices.IsErrObjectNotFound(err) { - // Skip the error when the agent tries to update the status on a deleted stack - log.Debug(). - Err(err). - Int("stackID", int(stackID)). - Msg("Unable to find a stack inside the database, skipping error") - - return nil, nil - } - - return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w.", err) - } - - return stack, nil -} - -func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeStackID) (statusRequest, bool) { - for { - select { - case u := <-c.updateCh: - // Discard the update and let the agent retry - if u.stackID != stackID { - u.respCh <- statusResponse{Error: errAnotherStackUpdateInProgress} - - continue - } - - return u, true - - default: - return statusRequest{}, false - } - } -} - -func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(r *http.Request, stackID portainer.EdgeStackID, updateFn statusUpdateFn) (*portainer.EdgeStack, error) { - respCh := make(chan statusResponse) - defer close(respCh) - - msg := statusRequest{ - respCh: respCh, - stackID: stackID, - updateFn: updateFn, - } - - select { - case c.updateCh <- msg: - r := <-respCh - - return r.Stack, r.Error - - case <-r.Context().Done(): - return nil, r.Context().Err() - } -} diff --git a/api/http/handler/edgestacks/edgestack_status_update_test.go b/api/http/handler/edgestacks/edgestack_status_update_test.go index 50a0863d4..4d94368fe 100644 --- a/api/http/handler/edgestacks/edgestack_status_update_test.go +++ b/api/http/handler/edgestacks/edgestack_status_update_test.go @@ -10,6 +10,7 @@ import ( portainer "github.com/portainer/portainer/api" "github.com/segmentio/encoding/json" + "github.com/stretchr/testify/require" ) // Update Status @@ -28,15 +29,11 @@ func TestUpdateStatusAndInspect(t *testing.T) { } jsonPayload, err := json.Marshal(payload) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) r := bytes.NewBuffer(jsonPayload) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID) rec := httptest.NewRecorder() @@ -48,9 +45,7 @@ func TestUpdateStatusAndInspect(t *testing.T) { // Get updated edge stack req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() @@ -62,14 +57,10 @@ func TestUpdateStatusAndInspect(t *testing.T) { updatedStack := portainer.EdgeStack{} err = json.NewDecoder(rec.Body).Decode(&updatedStack) - if err != nil { - t.Fatal("error decoding response:", err) - } + require.NoError(t, err) endpointStatus, ok := updatedStack.Status[payload.EndpointID] - if !ok { - t.Fatal("Missing status") - } + require.True(t, ok) lastStatus := endpointStatus.Status[len(endpointStatus.Status)-1] @@ -84,8 +75,8 @@ func TestUpdateStatusAndInspect(t *testing.T) { if endpointStatus.EndpointID != payload.EndpointID { t.Fatalf("expected EndpointID %d, found %d", payload.EndpointID, endpointStatus.EndpointID) } - } + func TestUpdateStatusWithInvalidPayload(t *testing.T) { handler, _ := setupHandler(t) @@ -136,15 +127,11 @@ func TestUpdateStatusWithInvalidPayload(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { jsonPayload, err := json.Marshal(tc.Payload) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) r := bytes.NewBuffer(jsonPayload) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID) rec := httptest.NewRecorder() diff --git a/api/http/handler/edgestacks/edgestack_test.go b/api/http/handler/edgestacks/edgestack_test.go index ce1e9b659..91600117b 100644 --- a/api/http/handler/edgestacks/edgestack_test.go +++ b/api/http/handler/edgestacks/edgestack_test.go @@ -17,6 +17,7 @@ import ( "github.com/portainer/portainer/api/jwt" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // Helpers @@ -51,27 +52,21 @@ func setupHandler(t *testing.T) (*Handler, string) { t.Fatal(err) } - coord := NewEdgeStackStatusUpdateCoordinator(store) - go coord.Start() - handler := NewHandler( security.NewRequestBouncer(store, jwtService, apiKeyService), store, edgestacks.NewService(store), - coord, ) handler.FileService = fs settings, err := handler.DataStore.Settings().Settings() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + settings.EnableEdgeComputeFeatures = true - if err := handler.DataStore.Settings().UpdateSettings(settings); err != nil { - t.Fatal(err) - } + err = handler.DataStore.Settings().UpdateSettings(settings) + require.NoError(t, err) handler.GitService = testhelpers.NewGitService(errors.New("Clone error"), "git-service-id") @@ -90,9 +85,8 @@ func createEndpointWithId(t *testing.T, store dataservices.DataStore, endpointID LastCheckInDate: time.Now().Unix(), } - if err := store.Endpoint().Create(&endpoint); err != nil { - t.Fatal(err) - } + err := store.Endpoint().Create(&endpoint) + require.NoError(t, err) return endpoint } @@ -113,15 +107,13 @@ func createEdgeStack(t *testing.T, store dataservices.DataStore, endpointID port PartialMatch: false, } - if err := store.EdgeGroup().Create(&edgeGroup); err != nil { - t.Fatal(err) - } + err := store.EdgeGroup().Create(&edgeGroup) + require.NoError(t, err) edgeStackID := portainer.EdgeStackID(14) edgeStack := portainer.EdgeStack{ ID: edgeStackID, Name: "test-edge-stack-" + strconv.Itoa(int(edgeStackID)), - Status: map[portainer.EndpointID]portainer.EdgeStackStatus{}, CreationDate: time.Now().Unix(), EdgeGroups: []portainer.EdgeGroupID{edgeGroup.ID}, ProjectPath: "/project/path", @@ -138,13 +130,11 @@ func createEdgeStack(t *testing.T, store dataservices.DataStore, endpointID port }, } - if err := store.EdgeStack().Create(edgeStack.ID, &edgeStack); err != nil { - t.Fatal(err) - } + err = store.EdgeStack().Create(edgeStack.ID, &edgeStack) + require.NoError(t, err) - if err := store.EndpointRelation().Create(&endpointRelation); err != nil { - t.Fatal(err) - } + err = store.EndpointRelation().Create(&endpointRelation) + require.NoError(t, err) return edgeStack } @@ -155,8 +145,8 @@ func createEdgeGroup(t *testing.T, store dataservices.DataStore) portainer.EdgeG Name: "EdgeGroup 1", } - if err := store.EdgeGroup().Create(&edgeGroup); err != nil { - t.Fatal(err) - } + err := store.EdgeGroup().Create(&edgeGroup) + require.NoError(t, err) + return edgeGroup } diff --git a/api/http/handler/edgestacks/edgestack_update.go b/api/http/handler/edgestacks/edgestack_update.go index a3d59abb8..db896d0eb 100644 --- a/api/http/handler/edgestacks/edgestack_update.go +++ b/api/http/handler/edgestacks/edgestack_update.go @@ -74,6 +74,10 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request) return httperror.InternalServerError("Unexpected error", err) } + if err := fillEdgeStackStatus(handler.DataStore, stack); err != nil { + return handlerDBErr(err, "Unable to retrieve edge stack status from the database") + } + return response.JSON(w, stack) } @@ -120,7 +124,7 @@ func (handler *Handler) updateEdgeStack(tx dataservices.DataStoreTx, stackID por stack.EdgeGroups = groupsIds if payload.UpdateVersion { - if err := handler.updateStackVersion(stack, payload.DeploymentType, []byte(payload.StackFileContent), "", relatedEndpointIds); err != nil { + if err := handler.updateStackVersion(tx, stack, payload.DeploymentType, []byte(payload.StackFileContent), "", relatedEndpointIds); err != nil { return nil, httperror.InternalServerError("Unable to update stack version", err) } } diff --git a/api/http/handler/edgestacks/edgestack_update_test.go b/api/http/handler/edgestacks/edgestack_update_test.go index 7e4a9b23c..68baa4129 100644 --- a/api/http/handler/edgestacks/edgestack_update_test.go +++ b/api/http/handler/edgestacks/edgestack_update_test.go @@ -25,9 +25,8 @@ func TestUpdateAndInspect(t *testing.T) { endpointID := portainer.EndpointID(6) newEndpoint := createEndpointWithId(t, handler.DataStore, endpointID) - if err := handler.DataStore.Endpoint().Create(&newEndpoint); err != nil { - t.Fatal(err) - } + err := handler.DataStore.Endpoint().Create(&newEndpoint) + require.NoError(t, err) endpointRelation := portainer.EndpointRelation{ EndpointID: endpointID, @@ -36,9 +35,8 @@ func TestUpdateAndInspect(t *testing.T) { }, } - if err := handler.DataStore.EndpointRelation().Create(&endpointRelation); err != nil { - t.Fatal(err) - } + err = handler.DataStore.EndpointRelation().Create(&endpointRelation) + require.NoError(t, err) newEdgeGroup := portainer.EdgeGroup{ ID: 2, @@ -49,9 +47,8 @@ func TestUpdateAndInspect(t *testing.T) { PartialMatch: false, } - if err := handler.DataStore.EdgeGroup().Create(&newEdgeGroup); err != nil { - t.Fatal(err) - } + err = handler.DataStore.EdgeGroup().Create(&newEdgeGroup) + require.NoError(t, err) payload := updateEdgeStackPayload{ StackFileContent: "update-test", @@ -61,15 +58,11 @@ func TestUpdateAndInspect(t *testing.T) { } jsonPayload, err := json.Marshal(payload) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) r := bytes.NewBuffer(jsonPayload) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec := httptest.NewRecorder() @@ -81,9 +74,7 @@ func TestUpdateAndInspect(t *testing.T) { // Get updated edge stack req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec = httptest.NewRecorder() @@ -94,9 +85,8 @@ func TestUpdateAndInspect(t *testing.T) { } updatedStack := portainer.EdgeStack{} - if err := json.NewDecoder(rec.Body).Decode(&updatedStack); err != nil { - t.Fatal("error decoding response:", err) - } + err = json.NewDecoder(rec.Body).Decode(&updatedStack) + require.NoError(t, err) if payload.UpdateVersion && updatedStack.Version != edgeStack.Version+1 { t.Fatalf("expected EdgeStack version %d, found %d", edgeStack.Version+1, updatedStack.Version+1) @@ -226,15 +216,11 @@ func TestUpdateWithInvalidPayload(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { jsonPayload, err := json.Marshal(tc.Payload) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) r := bytes.NewBuffer(jsonPayload) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r) - if err != nil { - t.Fatal("request error:", err) - } + require.NoError(t, err) req.Header.Add("x-api-key", rawAPIKey) rec := httptest.NewRecorder() diff --git a/api/http/handler/edgestacks/handler.go b/api/http/handler/edgestacks/handler.go index 9fa90776f..78df853a6 100644 --- a/api/http/handler/edgestacks/handler.go +++ b/api/http/handler/edgestacks/handler.go @@ -22,17 +22,15 @@ type Handler struct { GitService portainer.GitService edgeStacksService *edgestackservice.Service KubernetesDeployer portainer.KubernetesDeployer - stackCoordinator *EdgeStackStatusUpdateCoordinator } // NewHandler creates a handler to manage environment(endpoint) group operations. -func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service, stackCoordinator *EdgeStackStatusUpdateCoordinator) *Handler { +func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service) *Handler { h := &Handler{ Router: mux.NewRouter(), requestBouncer: bouncer, DataStore: dataStore, edgeStacksService: edgeStacksService, - stackCoordinator: stackCoordinator, } h.Handle("/edge_stacks/create/{method}", diff --git a/api/http/handler/edgestacks/utils_update_stack_version.go b/api/http/handler/edgestacks/utils_update_stack_version.go index 2502a88f6..78ac5002f 100644 --- a/api/http/handler/edgestacks/utils_update_stack_version.go +++ b/api/http/handler/edgestacks/utils_update_stack_version.go @@ -5,15 +5,18 @@ import ( "strconv" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/filesystem" - edgestackutils "github.com/portainer/portainer/api/internal/edge/edgestacks" "github.com/rs/zerolog/log" ) -func (handler *Handler) updateStackVersion(stack *portainer.EdgeStack, deploymentType portainer.EdgeStackDeploymentType, config []byte, oldGitHash string, relatedEnvironmentsIDs []portainer.EndpointID) error { - stack.Version = stack.Version + 1 - stack.Status = edgestackutils.NewStatus(stack.Status, relatedEnvironmentsIDs) +func (handler *Handler) updateStackVersion(tx dataservices.DataStoreTx, stack *portainer.EdgeStack, deploymentType portainer.EdgeStackDeploymentType, config []byte, oldGitHash string, relatedEnvironmentsIDs []portainer.EndpointID) error { + stack.Version++ + + if err := tx.EdgeStackStatus().Clear(stack.ID, relatedEnvironmentsIDs); err != nil { + return err + } return handler.storeStackFile(stack, deploymentType, config) } diff --git a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go index 8bfaa9814..ca9b12723 100644 --- a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go +++ b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go @@ -287,11 +287,8 @@ func TestEdgeStackStatus(t *testing.T) { edgeStackID := portainer.EdgeStackID(17) edgeStack := portainer.EdgeStack{ - ID: edgeStackID, - Name: "test-edge-stack-17", - Status: map[portainer.EndpointID]portainer.EdgeStackStatus{ - endpointID: {}, - }, + ID: edgeStackID, + Name: "test-edge-stack-17", CreationDate: time.Now().Unix(), EdgeGroups: []portainer.EdgeGroupID{1, 2}, ProjectPath: "/project/path", diff --git a/api/http/handler/endpoints/endpoint_delete.go b/api/http/handler/endpoints/endpoint_delete.go index 4752364ec..f26b9dd13 100644 --- a/api/http/handler/endpoints/endpoint_delete.go +++ b/api/http/handler/endpoints/endpoint_delete.go @@ -214,14 +214,9 @@ func (handler *Handler) deleteEndpoint(tx dataservices.DataStoreTx, endpointID p log.Warn().Err(err).Msg("Unable to retrieve edge stacks from the database") } - for idx := range edgeStacks { - edgeStack := &edgeStacks[idx] - if _, ok := edgeStack.Status[endpoint.ID]; ok { - delete(edgeStack.Status, endpoint.ID) - - if err := tx.EdgeStack().UpdateEdgeStack(edgeStack.ID, edgeStack); err != nil { - log.Warn().Err(err).Msg("Unable to update edge stack") - } + for _, edgeStack := range edgeStacks { + if err := tx.EdgeStackStatus().Delete(edgeStack.ID, endpoint.ID); err != nil { + log.Warn().Err(err).Msg("Unable to delete edge stack status") } } diff --git a/api/http/handler/endpoints/filter.go b/api/http/handler/endpoints/filter.go index 6dc41b0bd..9b6004d1c 100644 --- a/api/http/handler/endpoints/filter.go +++ b/api/http/handler/endpoints/filter.go @@ -247,19 +247,17 @@ func (handler *Handler) filterEndpointsByQuery( return filteredEndpoints, totalAvailableEndpoints, nil } -func endpointStatusInStackMatchesFilter(edgeStackStatus map[portainer.EndpointID]portainer.EdgeStackStatus, envId portainer.EndpointID, statusFilter portainer.EdgeStackStatusType) bool { - status, ok := edgeStackStatus[envId] - +func endpointStatusInStackMatchesFilter(stackStatus *portainer.EdgeStackStatusForEnv, envId portainer.EndpointID, statusFilter portainer.EdgeStackStatusType) bool { // consider that if the env has no status in the stack it is in Pending state if statusFilter == portainer.EdgeStackStatusPending { - return !ok || len(status.Status) == 0 + return stackStatus == nil || len(stackStatus.Status) == 0 } - if !ok { + if stackStatus == nil { return false } - return slices.ContainsFunc(status.Status, func(s portainer.EdgeStackDeploymentStatus) bool { + return slices.ContainsFunc(stackStatus.Status, func(s portainer.EdgeStackDeploymentStatus) bool { return s.Type == statusFilter }) } @@ -291,7 +289,12 @@ func filterEndpointsByEdgeStack(endpoints []portainer.Endpoint, edgeStackId port if statusFilter != nil { n := 0 for _, envId := range envIds { - if endpointStatusInStackMatchesFilter(stack.Status, envId, *statusFilter) { + edgeStackStatus, err := datastore.EdgeStackStatus().Read(edgeStackId, envId) + if err != nil { + return nil, errors.WithMessagef(err, "Unable to retrieve edge stack status for environment %d", envId) + } + + if endpointStatusInStackMatchesFilter(edgeStackStatus, envId, *statusFilter) { envIds[n] = envId n++ } diff --git a/api/http/server.go b/api/http/server.go index 183a78c04..88d131650 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -161,10 +161,7 @@ func (server *Server) Start() error { edgeJobsHandler.FileService = server.FileService edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService - edgeStackCoordinator := edgestacks.NewEdgeStackStatusUpdateCoordinator(server.DataStore) - go edgeStackCoordinator.Start() - - var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService, edgeStackCoordinator) + var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService) edgeStacksHandler.FileService = server.FileService edgeStacksHandler.GitService = server.GitService edgeStacksHandler.KubernetesDeployer = server.KubernetesDeployer diff --git a/api/internal/edge/edgestacks/service.go b/api/internal/edge/edgestacks/service.go index 6986a6917..5932a5ec8 100644 --- a/api/internal/edge/edgestacks/service.go +++ b/api/internal/edge/edgestacks/service.go @@ -49,7 +49,6 @@ func (service *Service) BuildEdgeStack( DeploymentType: deploymentType, CreationDate: time.Now().Unix(), EdgeGroups: edgeGroups, - Status: make(map[portainer.EndpointID]portainer.EdgeStackStatus, 0), Version: 1, UseManifestNamespaces: useManifestNamespaces, }, nil @@ -104,6 +103,14 @@ func (service *Service) PersistEdgeStack( return nil, err } + for _, endpointID := range relatedEndpointIds { + status := &portainer.EdgeStackStatusForEnv{EndpointID: endpointID} + + if err := tx.EdgeStackStatus().Create(stack.ID, endpointID, status); err != nil { + return nil, err + } + } + if err := tx.EndpointRelation().AddEndpointRelationsForEdgeStack(relatedEndpointIds, stack.ID); err != nil { return nil, fmt.Errorf("unable to add endpoint relations: %w", err) } @@ -158,5 +165,9 @@ func (service *Service) DeleteEdgeStack(tx dataservices.DataStoreTx, edgeStackID return errors.WithMessage(err, "Unable to remove the edge stack from the database") } + if err := tx.EdgeStackStatus().DeleteAll(edgeStackID); err != nil { + return errors.WithMessage(err, "unable to remove edge stack statuses from the database") + } + return nil } diff --git a/api/internal/edge/edgestacks/status.go b/api/internal/edge/edgestacks/status.go deleted file mode 100644 index 25629d958..000000000 --- a/api/internal/edge/edgestacks/status.go +++ /dev/null @@ -1,26 +0,0 @@ -package edgestacks - -import ( - portainer "github.com/portainer/portainer/api" -) - -// NewStatus returns a new status object for an Edge stack -func NewStatus(oldStatus map[portainer.EndpointID]portainer.EdgeStackStatus, relatedEnvironmentIDs []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeStackStatus { - status := map[portainer.EndpointID]portainer.EdgeStackStatus{} - - for _, environmentID := range relatedEnvironmentIDs { - newEnvStatus := portainer.EdgeStackStatus{ - Status: []portainer.EdgeStackDeploymentStatus{}, - EndpointID: environmentID, - } - - oldEnvStatus, ok := oldStatus[environmentID] - if ok { - newEnvStatus.DeploymentInfo = oldEnvStatus.DeploymentInfo - } - - status[environmentID] = newEnvStatus - } - - return status -} diff --git a/api/internal/testhelpers/datastore.go b/api/internal/testhelpers/datastore.go index d4a29ae09..392f21e97 100644 --- a/api/internal/testhelpers/datastore.go +++ b/api/internal/testhelpers/datastore.go @@ -16,6 +16,7 @@ type testDatastore struct { edgeGroup dataservices.EdgeGroupService edgeJob dataservices.EdgeJobService edgeStack dataservices.EdgeStackService + edgeStackStatus dataservices.EdgeStackStatusService endpoint dataservices.EndpointService endpointGroup dataservices.EndpointGroupService endpointRelation dataservices.EndpointRelationService @@ -53,8 +54,11 @@ func (d *testDatastore) CustomTemplate() dataservices.CustomTemplateService { re func (d *testDatastore) EdgeGroup() dataservices.EdgeGroupService { return d.edgeGroup } func (d *testDatastore) EdgeJob() dataservices.EdgeJobService { return d.edgeJob } func (d *testDatastore) EdgeStack() dataservices.EdgeStackService { return d.edgeStack } -func (d *testDatastore) Endpoint() dataservices.EndpointService { return d.endpoint } -func (d *testDatastore) EndpointGroup() dataservices.EndpointGroupService { return d.endpointGroup } +func (d *testDatastore) EdgeStackStatus() dataservices.EdgeStackStatusService { + return d.edgeStackStatus +} +func (d *testDatastore) Endpoint() dataservices.EndpointService { return d.endpoint } +func (d *testDatastore) EndpointGroup() dataservices.EndpointGroupService { return d.endpointGroup } func (d *testDatastore) EndpointRelation() dataservices.EndpointRelationService { return d.endpointRelation diff --git a/api/portainer.go b/api/portainer.go index bb383e44c..f81a5c767 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -336,6 +336,15 @@ type ( UseManifestNamespaces bool } + EdgeStackStatusForEnv struct { + EndpointID EndpointID + Status []EdgeStackDeploymentStatus + // EE only feature + DeploymentInfo StackDeploymentInfo + // ReadyRePullImage is a flag to indicate whether the auto update is trigger to re-pull image + ReadyRePullImage bool `json:"ReadyRePullImage,omitempty"` + } + EdgeStackDeploymentType int // EdgeStackID represents an edge stack id