1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-07-19 21:39:40 +02:00

feat(edgestackstatus): optimize the Edge Stack structures BE-11740 (#756)

This commit is contained in:
andres-portainer 2025-06-05 19:46:10 -03:00 committed by GitHub
parent eaf0deb2f6
commit 75f165d1ff
33 changed files with 452 additions and 391 deletions

View file

@ -0,0 +1,89 @@
package edgestackstatus
import (
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
)
var _ dataservices.EdgeStackStatusService = &Service{}
const BucketName = "edge_stack_status"
type Service struct {
conn portainer.Connection
}
func (service *Service) BucketName() string {
return BucketName
}
func NewService(connection portainer.Connection) (*Service, error) {
if err := connection.SetServiceName(BucketName); err != nil {
return nil, err
}
return &Service{conn: connection}, nil
}
func (s *Service) Tx(tx portainer.Transaction) ServiceTx {
return ServiceTx{
service: s,
tx: tx,
}
}
func (s *Service) Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error {
return s.conn.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Create(edgeStackID, endpointID, status)
})
}
func (s *Service) Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error) {
var element *portainer.EdgeStackStatusForEnv
return element, s.conn.ViewTx(func(tx portainer.Transaction) error {
var err error
element, err = s.Tx(tx).Read(edgeStackID, endpointID)
return err
})
}
func (s *Service) ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error) {
var collection = make([]portainer.EdgeStackStatusForEnv, 0)
return collection, s.conn.ViewTx(func(tx portainer.Transaction) error {
var err error
collection, err = s.Tx(tx).ReadAll(edgeStackID)
return err
})
}
func (s *Service) Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error {
return s.conn.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Update(edgeStackID, endpointID, status)
})
}
func (s *Service) Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error {
return s.conn.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Delete(edgeStackID, endpointID)
})
}
func (s *Service) DeleteAll(edgeStackID portainer.EdgeStackID) error {
return s.conn.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).DeleteAll(edgeStackID)
})
}
func (s *Service) Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error {
return s.conn.UpdateTx(func(tx portainer.Transaction) error {
return s.Tx(tx).Clear(edgeStackID, relatedEnvironmentsIDs)
})
}
func (s *Service) key(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) []byte {
return append(s.conn.ConvertToKey(int(edgeStackID)), s.conn.ConvertToKey(int(endpointID))...)
}

View file

@ -0,0 +1,95 @@
package edgestackstatus
import (
"fmt"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
)
var _ dataservices.EdgeStackStatusService = &Service{}
type ServiceTx struct {
service *Service
tx portainer.Transaction
}
func (service ServiceTx) Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error {
identifier := service.service.key(edgeStackID, endpointID)
return service.tx.CreateObjectWithStringId(BucketName, identifier, status)
}
func (s ServiceTx) Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error) {
var status portainer.EdgeStackStatusForEnv
identifier := s.service.key(edgeStackID, endpointID)
if err := s.tx.GetObject(BucketName, identifier, &status); err != nil {
return nil, err
}
return &status, nil
}
func (s ServiceTx) ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error) {
keyPrefix := s.service.conn.ConvertToKey(int(edgeStackID))
statuses := make([]portainer.EdgeStackStatusForEnv, 0)
if err := s.tx.GetAllWithKeyPrefix(BucketName, keyPrefix, &portainer.EdgeStackStatusForEnv{}, dataservices.AppendFn(&statuses)); err != nil {
return nil, fmt.Errorf("unable to retrieve EdgeStackStatus for EdgeStack %d: %w", edgeStackID, err)
}
return statuses, nil
}
func (s ServiceTx) Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error {
identifier := s.service.key(edgeStackID, endpointID)
return s.tx.UpdateObject(BucketName, identifier, status)
}
func (s ServiceTx) Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error {
identifier := s.service.key(edgeStackID, endpointID)
return s.tx.DeleteObject(BucketName, identifier)
}
func (s ServiceTx) DeleteAll(edgeStackID portainer.EdgeStackID) error {
keyPrefix := s.service.conn.ConvertToKey(int(edgeStackID))
statuses := make([]portainer.EdgeStackStatusForEnv, 0)
if err := s.tx.GetAllWithKeyPrefix(BucketName, keyPrefix, &portainer.EdgeStackStatusForEnv{}, dataservices.AppendFn(&statuses)); err != nil {
return fmt.Errorf("unable to retrieve EdgeStackStatus for EdgeStack %d: %w", edgeStackID, err)
}
for _, status := range statuses {
if err := s.tx.DeleteObject(BucketName, s.service.key(edgeStackID, status.EndpointID)); err != nil {
return fmt.Errorf("unable to delete EdgeStackStatus for EdgeStack %d and Endpoint %d: %w", edgeStackID, status.EndpointID, err)
}
}
return nil
}
func (s ServiceTx) Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error {
for _, envID := range relatedEnvironmentsIDs {
existingStatus, err := s.Read(edgeStackID, envID)
if err != nil && !dataservices.IsErrObjectNotFound(err) {
return fmt.Errorf("unable to retrieve status for environment %d: %w", envID, err)
}
var deploymentInfo portainer.StackDeploymentInfo
if existingStatus != nil {
deploymentInfo = existingStatus.DeploymentInfo
}
if err := s.Update(edgeStackID, envID, &portainer.EdgeStackStatusForEnv{
EndpointID: envID,
Status: []portainer.EdgeStackDeploymentStatus{},
DeploymentInfo: deploymentInfo,
}); err != nil {
return err
}
}
return nil
}

View file

@ -12,6 +12,7 @@ type (
EdgeGroup() EdgeGroupService EdgeGroup() EdgeGroupService
EdgeJob() EdgeJobService EdgeJob() EdgeJobService
EdgeStack() EdgeStackService EdgeStack() EdgeStackService
EdgeStackStatus() EdgeStackStatusService
Endpoint() EndpointService Endpoint() EndpointService
EndpointGroup() EndpointGroupService EndpointGroup() EndpointGroupService
EndpointRelation() EndpointRelationService EndpointRelation() EndpointRelationService
@ -39,8 +40,8 @@ type (
Open() (newStore bool, err error) Open() (newStore bool, err error)
Init() error Init() error
Close() error Close() error
UpdateTx(func(DataStoreTx) error) error UpdateTx(func(tx DataStoreTx) error) error
ViewTx(func(DataStoreTx) error) error ViewTx(func(tx DataStoreTx) error) error
MigrateData() error MigrateData() error
Rollback(force bool) error Rollback(force bool) error
CheckCurrentEdition() error CheckCurrentEdition() error
@ -89,6 +90,16 @@ type (
BucketName() string BucketName() string
} }
EdgeStackStatusService interface {
Create(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error
Read(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) (*portainer.EdgeStackStatusForEnv, error)
ReadAll(edgeStackID portainer.EdgeStackID) ([]portainer.EdgeStackStatusForEnv, error)
Update(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID, status *portainer.EdgeStackStatusForEnv) error
Delete(edgeStackID portainer.EdgeStackID, endpointID portainer.EndpointID) error
DeleteAll(edgeStackID portainer.EdgeStackID) error
Clear(edgeStackID portainer.EdgeStackID, relatedEnvironmentsIDs []portainer.EndpointID) error
}
// EndpointService represents a service for managing environment(endpoint) data // EndpointService represents a service for managing environment(endpoint) data
EndpointService interface { EndpointService interface {
Endpoint(ID portainer.EndpointID) (*portainer.Endpoint, error) Endpoint(ID portainer.EndpointID) (*portainer.Endpoint, error)

View file

@ -40,13 +40,11 @@ func (store *Store) MigrateData() error {
} }
// before we alter anything in the DB, create a backup // before we alter anything in the DB, create a backup
_, err = store.Backup("") if _, err := store.Backup(""); err != nil {
if err != nil {
return errors.Wrap(err, "while backing up database") return errors.Wrap(err, "while backing up database")
} }
err = store.FailSafeMigrate(migrator, version) if err := store.FailSafeMigrate(migrator, version); err != nil {
if err != nil {
err = errors.Wrap(err, "failed to migrate database") err = errors.Wrap(err, "failed to migrate database")
log.Warn().Err(err).Msg("migration failed, restoring database to previous version") log.Warn().Err(err).Msg("migration failed, restoring database to previous version")
@ -85,6 +83,7 @@ func (store *Store) newMigratorParameters(version *models.Version, flags *portai
DockerhubService: store.DockerHubService, DockerhubService: store.DockerHubService,
AuthorizationService: authorization.NewService(store), AuthorizationService: authorization.NewService(store),
EdgeStackService: store.EdgeStackService, EdgeStackService: store.EdgeStackService,
EdgeStackStatusService: store.EdgeStackStatusService,
EdgeJobService: store.EdgeJobService, EdgeJobService: store.EdgeJobService,
TunnelServerService: store.TunnelServerService, TunnelServerService: store.TunnelServerService,
PendingActionsService: store.PendingActionsService, PendingActionsService: store.PendingActionsService,
@ -140,8 +139,7 @@ func (store *Store) connectionRollback(force bool) error {
} }
} }
err := store.Restore() if err := store.Restore(); err != nil {
if err != nil {
return err return err
} }

View file

@ -0,0 +1,31 @@
package migrator
import portainer "github.com/portainer/portainer/api"
func (m *Migrator) migrateEdgeStacksStatuses_2_31_0() error {
edgeStacks, err := m.edgeStackService.EdgeStacks()
if err != nil {
return err
}
for _, edgeStack := range edgeStacks {
for envID, status := range edgeStack.Status {
if err := m.edgeStackStatusService.Create(edgeStack.ID, envID, &portainer.EdgeStackStatusForEnv{
EndpointID: envID,
Status: status.Status,
DeploymentInfo: status.DeploymentInfo,
ReadyRePullImage: status.ReadyRePullImage,
}); err != nil {
return err
}
}
edgeStack.Status = nil
if err := m.edgeStackService.UpdateEdgeStack(edgeStack.ID, &edgeStack); err != nil {
return err
}
}
return nil
}

View file

@ -3,12 +3,12 @@ package migrator
import ( import (
"errors" "errors"
"github.com/Masterminds/semver"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/database/models" "github.com/portainer/portainer/api/database/models"
"github.com/portainer/portainer/api/dataservices/dockerhub" "github.com/portainer/portainer/api/dataservices/dockerhub"
"github.com/portainer/portainer/api/dataservices/edgejob" "github.com/portainer/portainer/api/dataservices/edgejob"
"github.com/portainer/portainer/api/dataservices/edgestack" "github.com/portainer/portainer/api/dataservices/edgestack"
"github.com/portainer/portainer/api/dataservices/edgestackstatus"
"github.com/portainer/portainer/api/dataservices/endpoint" "github.com/portainer/portainer/api/dataservices/endpoint"
"github.com/portainer/portainer/api/dataservices/endpointgroup" "github.com/portainer/portainer/api/dataservices/endpointgroup"
"github.com/portainer/portainer/api/dataservices/endpointrelation" "github.com/portainer/portainer/api/dataservices/endpointrelation"
@ -27,6 +27,8 @@ import (
"github.com/portainer/portainer/api/dataservices/user" "github.com/portainer/portainer/api/dataservices/user"
"github.com/portainer/portainer/api/dataservices/version" "github.com/portainer/portainer/api/dataservices/version"
"github.com/portainer/portainer/api/internal/authorization" "github.com/portainer/portainer/api/internal/authorization"
"github.com/Masterminds/semver"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -56,6 +58,7 @@ type (
authorizationService *authorization.Service authorizationService *authorization.Service
dockerhubService *dockerhub.Service dockerhubService *dockerhub.Service
edgeStackService *edgestack.Service edgeStackService *edgestack.Service
edgeStackStatusService *edgestackstatus.Service
edgeJobService *edgejob.Service edgeJobService *edgejob.Service
TunnelServerService *tunnelserver.Service TunnelServerService *tunnelserver.Service
pendingActionsService *pendingactions.Service pendingActionsService *pendingactions.Service
@ -84,6 +87,7 @@ type (
AuthorizationService *authorization.Service AuthorizationService *authorization.Service
DockerhubService *dockerhub.Service DockerhubService *dockerhub.Service
EdgeStackService *edgestack.Service EdgeStackService *edgestack.Service
EdgeStackStatusService *edgestackstatus.Service
EdgeJobService *edgejob.Service EdgeJobService *edgejob.Service
TunnelServerService *tunnelserver.Service TunnelServerService *tunnelserver.Service
PendingActionsService *pendingactions.Service PendingActionsService *pendingactions.Service
@ -114,6 +118,7 @@ func NewMigrator(parameters *MigratorParameters) *Migrator {
authorizationService: parameters.AuthorizationService, authorizationService: parameters.AuthorizationService,
dockerhubService: parameters.DockerhubService, dockerhubService: parameters.DockerhubService,
edgeStackService: parameters.EdgeStackService, edgeStackService: parameters.EdgeStackService,
edgeStackStatusService: parameters.EdgeStackStatusService,
edgeJobService: parameters.EdgeJobService, edgeJobService: parameters.EdgeJobService,
TunnelServerService: parameters.TunnelServerService, TunnelServerService: parameters.TunnelServerService,
pendingActionsService: parameters.PendingActionsService, pendingActionsService: parameters.PendingActionsService,
@ -242,6 +247,8 @@ func (m *Migrator) initMigrations() {
m.migratePendingActionsDataForDB130, m.migratePendingActionsDataForDB130,
) )
m.addMigrations("2.31.0", m.migrateEdgeStacksStatuses_2_31_0)
// Add new migrations above... // Add new migrations above...
// One function per migration, each versions migration funcs in the same file. // One function per migration, each versions migration funcs in the same file.
} }

View file

@ -13,6 +13,7 @@ import (
"github.com/portainer/portainer/api/dataservices/edgegroup" "github.com/portainer/portainer/api/dataservices/edgegroup"
"github.com/portainer/portainer/api/dataservices/edgejob" "github.com/portainer/portainer/api/dataservices/edgejob"
"github.com/portainer/portainer/api/dataservices/edgestack" "github.com/portainer/portainer/api/dataservices/edgestack"
"github.com/portainer/portainer/api/dataservices/edgestackstatus"
"github.com/portainer/portainer/api/dataservices/endpoint" "github.com/portainer/portainer/api/dataservices/endpoint"
"github.com/portainer/portainer/api/dataservices/endpointgroup" "github.com/portainer/portainer/api/dataservices/endpointgroup"
"github.com/portainer/portainer/api/dataservices/endpointrelation" "github.com/portainer/portainer/api/dataservices/endpointrelation"
@ -39,6 +40,8 @@ import (
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
) )
var _ dataservices.DataStore = &Store{}
// Store defines the implementation of portainer.DataStore using // Store defines the implementation of portainer.DataStore using
// BoltDB as the storage system. // BoltDB as the storage system.
type Store struct { type Store struct {
@ -51,6 +54,7 @@ type Store struct {
EdgeGroupService *edgegroup.Service EdgeGroupService *edgegroup.Service
EdgeJobService *edgejob.Service EdgeJobService *edgejob.Service
EdgeStackService *edgestack.Service EdgeStackService *edgestack.Service
EdgeStackStatusService *edgestackstatus.Service
EndpointGroupService *endpointgroup.Service EndpointGroupService *endpointgroup.Service
EndpointService *endpoint.Service EndpointService *endpoint.Service
EndpointRelationService *endpointrelation.Service EndpointRelationService *endpointrelation.Service
@ -109,6 +113,12 @@ func (store *Store) initServices() error {
store.EdgeStackService = edgeStackService store.EdgeStackService = edgeStackService
endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFunc, edgeStackService.UpdateEdgeStackFuncTx) endpointRelationService.RegisterUpdateStackFunction(edgeStackService.UpdateEdgeStackFunc, edgeStackService.UpdateEdgeStackFuncTx)
edgeStackStatusService, err := edgestackstatus.NewService(store.connection)
if err != nil {
return err
}
store.EdgeStackStatusService = edgeStackStatusService
edgeGroupService, err := edgegroup.NewService(store.connection) edgeGroupService, err := edgegroup.NewService(store.connection)
if err != nil { if err != nil {
return err return err
@ -269,6 +279,10 @@ func (store *Store) EdgeStack() dataservices.EdgeStackService {
return store.EdgeStackService return store.EdgeStackService
} }
func (store *Store) EdgeStackStatus() dataservices.EdgeStackStatusService {
return store.EdgeStackStatusService
}
// Environment(Endpoint) gives access to the Environment(Endpoint) data management layer // Environment(Endpoint) gives access to the Environment(Endpoint) data management layer
func (store *Store) Endpoint() dataservices.EndpointService { func (store *Store) Endpoint() dataservices.EndpointService {
return store.EndpointService return store.EndpointService

View file

@ -32,6 +32,10 @@ func (tx *StoreTx) EdgeStack() dataservices.EdgeStackService {
return tx.store.EdgeStackService.Tx(tx.tx) return tx.store.EdgeStackService.Tx(tx.tx)
} }
func (tx *StoreTx) EdgeStackStatus() dataservices.EdgeStackStatusService {
return tx.store.EdgeStackStatusService.Tx(tx.tx)
}
func (tx *StoreTx) Endpoint() dataservices.EndpointService { func (tx *StoreTx) Endpoint() dataservices.EndpointService {
return tx.store.EndpointService.Tx(tx.tx) return tx.store.EndpointService.Tx(tx.tx)
} }

View file

@ -8,6 +8,7 @@
} }
], ],
"edge_stack": null, "edge_stack": null,
"edge_stack_status": null,
"edgegroups": null, "edgegroups": null,
"edgejobs": null, "edgejobs": null,
"endpoint_groups": [ "endpoint_groups": [

View file

@ -101,8 +101,7 @@ func (payload *edgeStackFromFileUploadPayload) Validate(r *http.Request) error {
// @router /edge_stacks/create/file [post] // @router /edge_stacks/create/file [post]
func (handler *Handler) createEdgeStackFromFileUpload(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) { func (handler *Handler) createEdgeStackFromFileUpload(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) {
payload := &edgeStackFromFileUploadPayload{} payload := &edgeStackFromFileUploadPayload{}
err := payload.Validate(r) if err := payload.Validate(r); err != nil {
if err != nil {
return nil, err return nil, err
} }

View file

@ -103,8 +103,7 @@ func (payload *edgeStackFromGitRepositoryPayload) Validate(r *http.Request) erro
// @router /edge_stacks/create/repository [post] // @router /edge_stacks/create/repository [post]
func (handler *Handler) createEdgeStackFromGitRepository(r *http.Request, tx dataservices.DataStoreTx, dryrun bool, userID portainer.UserID) (*portainer.EdgeStack, error) { func (handler *Handler) createEdgeStackFromGitRepository(r *http.Request, tx dataservices.DataStoreTx, dryrun bool, userID portainer.UserID) (*portainer.EdgeStack, error) {
var payload edgeStackFromGitRepositoryPayload var payload edgeStackFromGitRepositoryPayload
err := request.DecodeAndValidateJSONPayload(r, &payload) if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil {
if err != nil {
return nil, err return nil, err
} }
@ -137,11 +136,9 @@ func (handler *Handler) createEdgeStackFromGitRepository(r *http.Request, tx dat
} }
func (handler *Handler) storeManifestFromGitRepository(tx dataservices.DataStoreTx, stackFolder string, relatedEndpointIds []portainer.EndpointID, deploymentType portainer.EdgeStackDeploymentType, currentUserID portainer.UserID, repositoryConfig gittypes.RepoConfig) (composePath, manifestPath, projectPath string, err error) { func (handler *Handler) storeManifestFromGitRepository(tx dataservices.DataStoreTx, stackFolder string, relatedEndpointIds []portainer.EndpointID, deploymentType portainer.EdgeStackDeploymentType, currentUserID portainer.UserID, repositoryConfig gittypes.RepoConfig) (composePath, manifestPath, projectPath string, err error) {
hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType) if hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType); err != nil {
if err != nil {
return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err) return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err)
} } else if hasWrongType {
if hasWrongType {
return "", "", "", errors.New("edge stack with config do not match the environment type") return "", "", "", errors.New("edge stack with config do not match the environment type")
} }
@ -153,8 +150,7 @@ func (handler *Handler) storeManifestFromGitRepository(tx dataservices.DataStore
repositoryPassword = repositoryConfig.Authentication.Password repositoryPassword = repositoryConfig.Authentication.Password
} }
err = handler.GitService.CloneRepository(projectPath, repositoryConfig.URL, repositoryConfig.ReferenceName, repositoryUsername, repositoryPassword, repositoryConfig.TLSSkipVerify) if err := handler.GitService.CloneRepository(projectPath, repositoryConfig.URL, repositoryConfig.ReferenceName, repositoryUsername, repositoryPassword, repositoryConfig.TLSSkipVerify); err != nil {
if err != nil {
return "", "", "", err return "", "", "", err
} }

View file

@ -76,8 +76,7 @@ func (payload *edgeStackFromStringPayload) Validate(r *http.Request) error {
// @router /edge_stacks/create/string [post] // @router /edge_stacks/create/string [post]
func (handler *Handler) createEdgeStackFromFileContent(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) { func (handler *Handler) createEdgeStackFromFileContent(r *http.Request, tx dataservices.DataStoreTx, dryrun bool) (*portainer.EdgeStack, error) {
var payload edgeStackFromStringPayload var payload edgeStackFromStringPayload
err := request.DecodeAndValidateJSONPayload(r, &payload) if err := request.DecodeAndValidateJSONPayload(r, &payload); err != nil {
if err != nil {
return nil, err return nil, err
} }
@ -96,11 +95,9 @@ func (handler *Handler) createEdgeStackFromFileContent(r *http.Request, tx datas
} }
func (handler *Handler) storeFileContent(tx dataservices.DataStoreTx, stackFolder string, deploymentType portainer.EdgeStackDeploymentType, relatedEndpointIds []portainer.EndpointID, fileContent []byte) (composePath, manifestPath, projectPath string, err error) { func (handler *Handler) storeFileContent(tx dataservices.DataStoreTx, stackFolder string, deploymentType portainer.EdgeStackDeploymentType, relatedEndpointIds []portainer.EndpointID, fileContent []byte) (composePath, manifestPath, projectPath string, err error) {
hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType) if hasWrongType, err := hasWrongEnvironmentType(tx.Endpoint(), relatedEndpointIds, deploymentType); err != nil {
if err != nil {
return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err) return "", "", "", fmt.Errorf("unable to check for existence of non fitting environments: %w", err)
} } else if hasWrongType {
if hasWrongType {
return "", "", "", errors.New("edge stack with config do not match the environment type") return "", "", "", errors.New("edge stack with config do not match the environment type")
} }
@ -124,7 +121,6 @@ func (handler *Handler) storeFileContent(tx dataservices.DataStoreTx, stackFolde
} }
return "", manifestPath, projectPath, nil return "", manifestPath, projectPath, nil
} }
errMessage := fmt.Sprintf("invalid deployment type: %d", deploymentType) errMessage := fmt.Sprintf("invalid deployment type: %d", deploymentType)

View file

@ -8,6 +8,7 @@ import (
"testing" "testing"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/stretchr/testify/require"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
) )
@ -28,9 +29,7 @@ func TestCreateAndInspect(t *testing.T) {
} }
err := handler.DataStore.EdgeGroup().Create(&edgeGroup) err := handler.DataStore.EdgeGroup().Create(&edgeGroup)
if err != nil { require.NoError(t, err)
t.Fatal(err)
}
endpointRelation := portainer.EndpointRelation{ endpointRelation := portainer.EndpointRelation{
EndpointID: endpoint.ID, EndpointID: endpoint.ID,
@ -38,9 +37,7 @@ func TestCreateAndInspect(t *testing.T) {
} }
err = handler.DataStore.EndpointRelation().Create(&endpointRelation) err = handler.DataStore.EndpointRelation().Create(&endpointRelation)
if err != nil { require.NoError(t, err)
t.Fatal(err)
}
payload := edgeStackFromStringPayload{ payload := edgeStackFromStringPayload{
Name: "test-stack", Name: "test-stack",
@ -50,16 +47,14 @@ func TestCreateAndInspect(t *testing.T) {
} }
jsonPayload, err := json.Marshal(payload) jsonPayload, err := json.Marshal(payload)
if err != nil { require.NoError(t, err)
t.Fatal("JSON marshal error:", err)
}
r := bytes.NewBuffer(jsonPayload) r := bytes.NewBuffer(jsonPayload)
// Create EdgeStack // Create EdgeStack
req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", r) req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", r)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req) handler.ServeHTTP(rec, req)
@ -70,15 +65,11 @@ func TestCreateAndInspect(t *testing.T) {
data := portainer.EdgeStack{} data := portainer.EdgeStack{}
err = json.NewDecoder(rec.Body).Decode(&data) err = json.NewDecoder(rec.Body).Decode(&data)
if err != nil { require.NoError(t, err)
t.Fatal("error decoding response:", err)
}
// Inspect // Inspect
req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", data.ID), nil) req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", data.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()
@ -90,9 +81,7 @@ func TestCreateAndInspect(t *testing.T) {
data = portainer.EdgeStack{} data = portainer.EdgeStack{}
err = json.NewDecoder(rec.Body).Decode(&data) err = json.NewDecoder(rec.Body).Decode(&data)
if err != nil { require.NoError(t, err)
t.Fatal("error decoding response:", err)
}
if payload.Name != data.Name { if payload.Name != data.Name {
t.Fatalf("expected EdgeStack Name %s, found %s", payload.Name, data.Name) t.Fatalf("expected EdgeStack Name %s, found %s", payload.Name, data.Name)

View file

@ -30,10 +30,9 @@ func (handler *Handler) edgeStackDelete(w http.ResponseWriter, r *http.Request)
return httperror.BadRequest("Invalid edge stack identifier route variable", err) return httperror.BadRequest("Invalid edge stack identifier route variable", err)
} }
err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
return handler.deleteEdgeStack(tx, portainer.EdgeStackID(edgeStackID)) return handler.deleteEdgeStack(tx, portainer.EdgeStackID(edgeStackID))
}) }); err != nil {
if err != nil {
var httpErr *httperror.HandlerError var httpErr *httperror.HandlerError
if errors.As(err, &httpErr) { if errors.As(err, &httpErr) {
return httpErr return httpErr

View file

@ -8,9 +8,10 @@ import (
"testing" "testing"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/stretchr/testify/assert"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
// Delete // Delete
@ -23,9 +24,7 @@ func TestDeleteAndInspect(t *testing.T) {
// Inspect // Inspect
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@ -37,9 +36,7 @@ func TestDeleteAndInspect(t *testing.T) {
data := portainer.EdgeStack{} data := portainer.EdgeStack{}
err = json.NewDecoder(rec.Body).Decode(&data) err = json.NewDecoder(rec.Body).Decode(&data)
if err != nil { require.NoError(t, err)
t.Fatal("error decoding response:", err)
}
if data.ID != edgeStack.ID { if data.ID != edgeStack.ID {
t.Fatalf("expected EdgeStackID %d, found %d", int(edgeStack.ID), data.ID) t.Fatalf("expected EdgeStackID %d, found %d", int(edgeStack.ID), data.ID)
@ -47,9 +44,7 @@ func TestDeleteAndInspect(t *testing.T) {
// Delete // Delete
req, err = http.NewRequest(http.MethodDelete, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) req, err = http.NewRequest(http.MethodDelete, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()
@ -61,9 +56,7 @@ func TestDeleteAndInspect(t *testing.T) {
// Inspect // Inspect
req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()
@ -117,15 +110,12 @@ func TestDeleteEdgeStack_RemoveProjectFolder(t *testing.T) {
} }
var buf bytes.Buffer var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(payload); err != nil { err := json.NewEncoder(&buf).Encode(payload)
t.Fatal("error encoding payload:", err) require.NoError(t, err)
}
// Create // Create
req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", &buf) req, err := http.NewRequest(http.MethodPost, "/edge_stacks/create/string", &buf)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@ -138,9 +128,8 @@ func TestDeleteEdgeStack_RemoveProjectFolder(t *testing.T) {
assert.DirExists(t, handler.FileService.GetEdgeStackProjectPath("1")) assert.DirExists(t, handler.FileService.GetEdgeStackProjectPath("1"))
// Delete // Delete
if req, err = http.NewRequest(http.MethodDelete, "/edge_stacks/1", nil); err != nil { req, err = http.NewRequest(http.MethodDelete, "/edge_stacks/1", nil)
t.Fatal("request error:", err) require.NoError(t, err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()

View file

@ -4,6 +4,7 @@ import (
"net/http" "net/http"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
@ -33,5 +34,35 @@ func (handler *Handler) edgeStackInspect(w http.ResponseWriter, r *http.Request)
return handlerDBErr(err, "Unable to find an edge stack with the specified identifier inside the database") return handlerDBErr(err, "Unable to find an edge stack with the specified identifier inside the database")
} }
if err := fillEdgeStackStatus(handler.DataStore, edgeStack); err != nil {
return handlerDBErr(err, "Unable to retrieve edge stack status from the database")
}
return response.JSON(w, edgeStack) return response.JSON(w, edgeStack)
} }
func fillEdgeStackStatus(tx dataservices.DataStoreTx, edgeStack *portainer.EdgeStack) error {
status, err := tx.EdgeStackStatus().ReadAll(edgeStack.ID)
if err != nil {
return err
}
edgeStack.Status = make(map[portainer.EndpointID]portainer.EdgeStackStatus, len(status))
emptyStatus := make([]portainer.EdgeStackDeploymentStatus, 0)
for _, s := range status {
if s.Status == nil {
s.Status = emptyStatus
}
edgeStack.Status[s.EndpointID] = portainer.EdgeStackStatus{
Status: s.Status,
EndpointID: s.EndpointID,
DeploymentInfo: s.DeploymentInfo,
ReadyRePullImage: s.ReadyRePullImage,
}
}
return nil
}

View file

@ -25,5 +25,11 @@ func (handler *Handler) edgeStackList(w http.ResponseWriter, r *http.Request) *h
return httperror.InternalServerError("Unable to retrieve edge stacks from the database", err) return httperror.InternalServerError("Unable to retrieve edge stacks from the database", err)
} }
for i := range edgeStacks {
if err := fillEdgeStackStatus(handler.DataStore, &edgeStacks[i]); err != nil {
return handlerDBErr(err, "Unable to retrieve edge stack status from the database")
}
}
return response.JSON(w, edgeStacks) return response.JSON(w, edgeStacks)
} }

View file

@ -9,11 +9,10 @@ import (
"time" "time"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
httperror "github.com/portainer/portainer/pkg/libhttp/error" httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request" "github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response" "github.com/portainer/portainer/pkg/libhttp/response"
"github.com/rs/zerolog/log"
) )
type updateStatusPayload struct { type updateStatusPayload struct {
@ -78,12 +77,25 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req
return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name)) return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name))
} }
updateFn := func(stack *portainer.EdgeStack) (*portainer.EdgeStack, error) { var stack *portainer.EdgeStack
return handler.updateEdgeStackStatus(stack, stack.ID, payload)
}
stack, err := handler.stackCoordinator.UpdateStatus(r, portainer.EdgeStackID(stackID), updateFn) if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
if err != nil { var err error
stack, err = tx.EdgeStack().EdgeStack(portainer.EdgeStackID(stackID))
if err != nil {
if dataservices.IsErrObjectNotFound(err) {
return nil
}
return httperror.InternalServerError("Unable to retrieve Edge stack from the database", err)
}
if err := handler.updateEdgeStackStatus(tx, stack, stack.ID, payload); err != nil {
return httperror.InternalServerError("Unable to update Edge stack status", err)
}
return nil
}); err != nil {
var httpErr *httperror.HandlerError var httpErr *httperror.HandlerError
if errors.As(err, &httpErr) { if errors.As(err, &httpErr) {
return httpErr return httpErr
@ -96,43 +108,34 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req
return nil return nil
} }
if err := fillEdgeStackStatus(handler.DataStore, stack); err != nil {
return handlerDBErr(err, "Unable to retrieve edge stack status from the database")
}
return response.JSON(w, stack) return response.JSON(w, stack)
} }
func (handler *Handler) updateEdgeStackStatus(stack *portainer.EdgeStack, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) { func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, stack *portainer.EdgeStack, stackID portainer.EdgeStackID, payload updateStatusPayload) error {
if payload.Version > 0 && payload.Version < stack.Version { if payload.Version > 0 && payload.Version < stack.Version {
return stack, nil return nil
} }
status := *payload.Status status := *payload.Status
log.Debug().
Int("stackID", int(stackID)).
Int("status", int(status)).
Msg("Updating stack status")
deploymentStatus := portainer.EdgeStackDeploymentStatus{ deploymentStatus := portainer.EdgeStackDeploymentStatus{
Type: status, Type: status,
Error: payload.Error, Error: payload.Error,
Time: payload.Time, Time: payload.Time,
} }
updateEnvStatus(payload.EndpointID, stack, deploymentStatus)
return stack, nil
}
func updateEnvStatus(environmentId portainer.EndpointID, stack *portainer.EdgeStack, deploymentStatus portainer.EdgeStackDeploymentStatus) {
if deploymentStatus.Type == portainer.EdgeStackStatusRemoved { if deploymentStatus.Type == portainer.EdgeStackStatusRemoved {
delete(stack.Status, environmentId) return tx.EdgeStackStatus().Delete(stackID, payload.EndpointID)
return
} }
environmentStatus, ok := stack.Status[environmentId] environmentStatus, err := tx.EdgeStackStatus().Read(stackID, payload.EndpointID)
if !ok { if err != nil {
environmentStatus = portainer.EdgeStackStatus{ environmentStatus = &portainer.EdgeStackStatusForEnv{
EndpointID: environmentId, EndpointID: payload.EndpointID,
Status: []portainer.EdgeStackDeploymentStatus{}, Status: []portainer.EdgeStackDeploymentStatus{},
} }
} }
@ -143,5 +146,5 @@ func updateEnvStatus(environmentId portainer.EndpointID, stack *portainer.EdgeSt
environmentStatus.Status = append(environmentStatus.Status, deploymentStatus) environmentStatus.Status = append(environmentStatus.Status, deploymentStatus)
} }
stack.Status[environmentId] = environmentStatus return tx.EdgeStackStatus().Update(stackID, payload.EndpointID, environmentStatus)
} }

View file

@ -1,155 +0,0 @@
package edgestacks
import (
"errors"
"fmt"
"net/http"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/rs/zerolog/log"
)
type statusRequest struct {
respCh chan statusResponse
stackID portainer.EdgeStackID
updateFn statusUpdateFn
}
type statusResponse struct {
Stack *portainer.EdgeStack
Error error
}
type statusUpdateFn func(*portainer.EdgeStack) (*portainer.EdgeStack, error)
type EdgeStackStatusUpdateCoordinator struct {
updateCh chan statusRequest
dataStore dataservices.DataStore
}
var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress")
func NewEdgeStackStatusUpdateCoordinator(dataStore dataservices.DataStore) *EdgeStackStatusUpdateCoordinator {
return &EdgeStackStatusUpdateCoordinator{
updateCh: make(chan statusRequest),
dataStore: dataStore,
}
}
func (c *EdgeStackStatusUpdateCoordinator) Start() {
for {
c.loop()
}
}
func (c *EdgeStackStatusUpdateCoordinator) loop() {
u := <-c.updateCh
respChs := []chan statusResponse{u.respCh}
var stack *portainer.EdgeStack
err := c.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
// 1. Load the edge stack
var err error
stack, err = loadEdgeStack(tx, u.stackID)
if err != nil {
return err
}
// Return early when the agent tries to update the status on a deleted stack
if stack == nil {
return nil
}
// 2. Mutate the edge stack opportunistically until there are no more pending updates
for {
stack, err = u.updateFn(stack)
if err != nil {
return err
}
if m, ok := c.getNextUpdate(stack.ID); ok {
u = m
} else {
break
}
respChs = append(respChs, u.respCh)
}
// 3. Save the changes back to the database
if err := tx.EdgeStack().UpdateEdgeStack(stack.ID, stack); err != nil {
return handlerDBErr(fmt.Errorf("unable to update Edge stack: %w.", err), "Unable to persist the stack changes inside the database")
}
return nil
})
// 4. Send back the responses
for _, ch := range respChs {
ch <- statusResponse{Stack: stack, Error: err}
}
}
func loadEdgeStack(tx dataservices.DataStoreTx, stackID portainer.EdgeStackID) (*portainer.EdgeStack, error) {
stack, err := tx.EdgeStack().EdgeStack(stackID)
if err != nil {
if dataservices.IsErrObjectNotFound(err) {
// Skip the error when the agent tries to update the status on a deleted stack
log.Debug().
Err(err).
Int("stackID", int(stackID)).
Msg("Unable to find a stack inside the database, skipping error")
return nil, nil
}
return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w.", err)
}
return stack, nil
}
func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeStackID) (statusRequest, bool) {
for {
select {
case u := <-c.updateCh:
// Discard the update and let the agent retry
if u.stackID != stackID {
u.respCh <- statusResponse{Error: errAnotherStackUpdateInProgress}
continue
}
return u, true
default:
return statusRequest{}, false
}
}
}
func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(r *http.Request, stackID portainer.EdgeStackID, updateFn statusUpdateFn) (*portainer.EdgeStack, error) {
respCh := make(chan statusResponse)
defer close(respCh)
msg := statusRequest{
respCh: respCh,
stackID: stackID,
updateFn: updateFn,
}
select {
case c.updateCh <- msg:
r := <-respCh
return r.Stack, r.Error
case <-r.Context().Done():
return nil, r.Context().Err()
}
}

View file

@ -10,6 +10,7 @@ import (
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/segmentio/encoding/json" "github.com/segmentio/encoding/json"
"github.com/stretchr/testify/require"
) )
// Update Status // Update Status
@ -28,15 +29,11 @@ func TestUpdateStatusAndInspect(t *testing.T) {
} }
jsonPayload, err := json.Marshal(payload) jsonPayload, err := json.Marshal(payload)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
r := bytes.NewBuffer(jsonPayload) r := bytes.NewBuffer(jsonPayload)
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID) req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@ -48,9 +45,7 @@ func TestUpdateStatusAndInspect(t *testing.T) {
// Get updated edge stack // Get updated edge stack
req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()
@ -62,14 +57,10 @@ func TestUpdateStatusAndInspect(t *testing.T) {
updatedStack := portainer.EdgeStack{} updatedStack := portainer.EdgeStack{}
err = json.NewDecoder(rec.Body).Decode(&updatedStack) err = json.NewDecoder(rec.Body).Decode(&updatedStack)
if err != nil { require.NoError(t, err)
t.Fatal("error decoding response:", err)
}
endpointStatus, ok := updatedStack.Status[payload.EndpointID] endpointStatus, ok := updatedStack.Status[payload.EndpointID]
if !ok { require.True(t, ok)
t.Fatal("Missing status")
}
lastStatus := endpointStatus.Status[len(endpointStatus.Status)-1] lastStatus := endpointStatus.Status[len(endpointStatus.Status)-1]
@ -84,8 +75,8 @@ func TestUpdateStatusAndInspect(t *testing.T) {
if endpointStatus.EndpointID != payload.EndpointID { if endpointStatus.EndpointID != payload.EndpointID {
t.Fatalf("expected EndpointID %d, found %d", payload.EndpointID, endpointStatus.EndpointID) t.Fatalf("expected EndpointID %d, found %d", payload.EndpointID, endpointStatus.EndpointID)
} }
} }
func TestUpdateStatusWithInvalidPayload(t *testing.T) { func TestUpdateStatusWithInvalidPayload(t *testing.T) {
handler, _ := setupHandler(t) handler, _ := setupHandler(t)
@ -136,15 +127,11 @@ func TestUpdateStatusWithInvalidPayload(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
jsonPayload, err := json.Marshal(tc.Payload) jsonPayload, err := json.Marshal(tc.Payload)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
r := bytes.NewBuffer(jsonPayload) r := bytes.NewBuffer(jsonPayload)
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d/status", edgeStack.ID), r)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID) req.Header.Set(portainer.PortainerAgentEdgeIDHeader, endpoint.EdgeID)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()

View file

@ -17,6 +17,7 @@ import (
"github.com/portainer/portainer/api/jwt" "github.com/portainer/portainer/api/jwt"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/require"
) )
// Helpers // Helpers
@ -51,27 +52,21 @@ func setupHandler(t *testing.T) (*Handler, string) {
t.Fatal(err) t.Fatal(err)
} }
coord := NewEdgeStackStatusUpdateCoordinator(store)
go coord.Start()
handler := NewHandler( handler := NewHandler(
security.NewRequestBouncer(store, jwtService, apiKeyService), security.NewRequestBouncer(store, jwtService, apiKeyService),
store, store,
edgestacks.NewService(store), edgestacks.NewService(store),
coord,
) )
handler.FileService = fs handler.FileService = fs
settings, err := handler.DataStore.Settings().Settings() settings, err := handler.DataStore.Settings().Settings()
if err != nil { require.NoError(t, err)
t.Fatal(err)
}
settings.EnableEdgeComputeFeatures = true settings.EnableEdgeComputeFeatures = true
if err := handler.DataStore.Settings().UpdateSettings(settings); err != nil { err = handler.DataStore.Settings().UpdateSettings(settings)
t.Fatal(err) require.NoError(t, err)
}
handler.GitService = testhelpers.NewGitService(errors.New("Clone error"), "git-service-id") handler.GitService = testhelpers.NewGitService(errors.New("Clone error"), "git-service-id")
@ -90,9 +85,8 @@ func createEndpointWithId(t *testing.T, store dataservices.DataStore, endpointID
LastCheckInDate: time.Now().Unix(), LastCheckInDate: time.Now().Unix(),
} }
if err := store.Endpoint().Create(&endpoint); err != nil { err := store.Endpoint().Create(&endpoint)
t.Fatal(err) require.NoError(t, err)
}
return endpoint return endpoint
} }
@ -113,15 +107,13 @@ func createEdgeStack(t *testing.T, store dataservices.DataStore, endpointID port
PartialMatch: false, PartialMatch: false,
} }
if err := store.EdgeGroup().Create(&edgeGroup); err != nil { err := store.EdgeGroup().Create(&edgeGroup)
t.Fatal(err) require.NoError(t, err)
}
edgeStackID := portainer.EdgeStackID(14) edgeStackID := portainer.EdgeStackID(14)
edgeStack := portainer.EdgeStack{ edgeStack := portainer.EdgeStack{
ID: edgeStackID, ID: edgeStackID,
Name: "test-edge-stack-" + strconv.Itoa(int(edgeStackID)), Name: "test-edge-stack-" + strconv.Itoa(int(edgeStackID)),
Status: map[portainer.EndpointID]portainer.EdgeStackStatus{},
CreationDate: time.Now().Unix(), CreationDate: time.Now().Unix(),
EdgeGroups: []portainer.EdgeGroupID{edgeGroup.ID}, EdgeGroups: []portainer.EdgeGroupID{edgeGroup.ID},
ProjectPath: "/project/path", ProjectPath: "/project/path",
@ -138,13 +130,11 @@ func createEdgeStack(t *testing.T, store dataservices.DataStore, endpointID port
}, },
} }
if err := store.EdgeStack().Create(edgeStack.ID, &edgeStack); err != nil { err = store.EdgeStack().Create(edgeStack.ID, &edgeStack)
t.Fatal(err) require.NoError(t, err)
}
if err := store.EndpointRelation().Create(&endpointRelation); err != nil { err = store.EndpointRelation().Create(&endpointRelation)
t.Fatal(err) require.NoError(t, err)
}
return edgeStack return edgeStack
} }
@ -155,8 +145,8 @@ func createEdgeGroup(t *testing.T, store dataservices.DataStore) portainer.EdgeG
Name: "EdgeGroup 1", Name: "EdgeGroup 1",
} }
if err := store.EdgeGroup().Create(&edgeGroup); err != nil { err := store.EdgeGroup().Create(&edgeGroup)
t.Fatal(err) require.NoError(t, err)
}
return edgeGroup return edgeGroup
} }

View file

@ -74,6 +74,10 @@ func (handler *Handler) edgeStackUpdate(w http.ResponseWriter, r *http.Request)
return httperror.InternalServerError("Unexpected error", err) return httperror.InternalServerError("Unexpected error", err)
} }
if err := fillEdgeStackStatus(handler.DataStore, stack); err != nil {
return handlerDBErr(err, "Unable to retrieve edge stack status from the database")
}
return response.JSON(w, stack) return response.JSON(w, stack)
} }
@ -120,7 +124,7 @@ func (handler *Handler) updateEdgeStack(tx dataservices.DataStoreTx, stackID por
stack.EdgeGroups = groupsIds stack.EdgeGroups = groupsIds
if payload.UpdateVersion { if payload.UpdateVersion {
if err := handler.updateStackVersion(stack, payload.DeploymentType, []byte(payload.StackFileContent), "", relatedEndpointIds); err != nil { if err := handler.updateStackVersion(tx, stack, payload.DeploymentType, []byte(payload.StackFileContent), "", relatedEndpointIds); err != nil {
return nil, httperror.InternalServerError("Unable to update stack version", err) return nil, httperror.InternalServerError("Unable to update stack version", err)
} }
} }

View file

@ -25,9 +25,8 @@ func TestUpdateAndInspect(t *testing.T) {
endpointID := portainer.EndpointID(6) endpointID := portainer.EndpointID(6)
newEndpoint := createEndpointWithId(t, handler.DataStore, endpointID) newEndpoint := createEndpointWithId(t, handler.DataStore, endpointID)
if err := handler.DataStore.Endpoint().Create(&newEndpoint); err != nil { err := handler.DataStore.Endpoint().Create(&newEndpoint)
t.Fatal(err) require.NoError(t, err)
}
endpointRelation := portainer.EndpointRelation{ endpointRelation := portainer.EndpointRelation{
EndpointID: endpointID, EndpointID: endpointID,
@ -36,9 +35,8 @@ func TestUpdateAndInspect(t *testing.T) {
}, },
} }
if err := handler.DataStore.EndpointRelation().Create(&endpointRelation); err != nil { err = handler.DataStore.EndpointRelation().Create(&endpointRelation)
t.Fatal(err) require.NoError(t, err)
}
newEdgeGroup := portainer.EdgeGroup{ newEdgeGroup := portainer.EdgeGroup{
ID: 2, ID: 2,
@ -49,9 +47,8 @@ func TestUpdateAndInspect(t *testing.T) {
PartialMatch: false, PartialMatch: false,
} }
if err := handler.DataStore.EdgeGroup().Create(&newEdgeGroup); err != nil { err = handler.DataStore.EdgeGroup().Create(&newEdgeGroup)
t.Fatal(err) require.NoError(t, err)
}
payload := updateEdgeStackPayload{ payload := updateEdgeStackPayload{
StackFileContent: "update-test", StackFileContent: "update-test",
@ -61,15 +58,11 @@ func TestUpdateAndInspect(t *testing.T) {
} }
jsonPayload, err := json.Marshal(payload) jsonPayload, err := json.Marshal(payload)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
r := bytes.NewBuffer(jsonPayload) r := bytes.NewBuffer(jsonPayload)
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()
@ -81,9 +74,7 @@ func TestUpdateAndInspect(t *testing.T) {
// Get updated edge stack // Get updated edge stack
req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil) req, err = http.NewRequest(http.MethodGet, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), nil)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec = httptest.NewRecorder() rec = httptest.NewRecorder()
@ -94,9 +85,8 @@ func TestUpdateAndInspect(t *testing.T) {
} }
updatedStack := portainer.EdgeStack{} updatedStack := portainer.EdgeStack{}
if err := json.NewDecoder(rec.Body).Decode(&updatedStack); err != nil { err = json.NewDecoder(rec.Body).Decode(&updatedStack)
t.Fatal("error decoding response:", err) require.NoError(t, err)
}
if payload.UpdateVersion && updatedStack.Version != edgeStack.Version+1 { if payload.UpdateVersion && updatedStack.Version != edgeStack.Version+1 {
t.Fatalf("expected EdgeStack version %d, found %d", edgeStack.Version+1, updatedStack.Version+1) t.Fatalf("expected EdgeStack version %d, found %d", edgeStack.Version+1, updatedStack.Version+1)
@ -226,15 +216,11 @@ func TestUpdateWithInvalidPayload(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) { t.Run(tc.Name, func(t *testing.T) {
jsonPayload, err := json.Marshal(tc.Payload) jsonPayload, err := json.Marshal(tc.Payload)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
r := bytes.NewBuffer(jsonPayload) r := bytes.NewBuffer(jsonPayload)
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r) req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("/edge_stacks/%d", edgeStack.ID), r)
if err != nil { require.NoError(t, err)
t.Fatal("request error:", err)
}
req.Header.Add("x-api-key", rawAPIKey) req.Header.Add("x-api-key", rawAPIKey)
rec := httptest.NewRecorder() rec := httptest.NewRecorder()

View file

@ -22,17 +22,15 @@ type Handler struct {
GitService portainer.GitService GitService portainer.GitService
edgeStacksService *edgestackservice.Service edgeStacksService *edgestackservice.Service
KubernetesDeployer portainer.KubernetesDeployer KubernetesDeployer portainer.KubernetesDeployer
stackCoordinator *EdgeStackStatusUpdateCoordinator
} }
// NewHandler creates a handler to manage environment(endpoint) group operations. // NewHandler creates a handler to manage environment(endpoint) group operations.
func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service, stackCoordinator *EdgeStackStatusUpdateCoordinator) *Handler { func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, edgeStacksService *edgestackservice.Service) *Handler {
h := &Handler{ h := &Handler{
Router: mux.NewRouter(), Router: mux.NewRouter(),
requestBouncer: bouncer, requestBouncer: bouncer,
DataStore: dataStore, DataStore: dataStore,
edgeStacksService: edgeStacksService, edgeStacksService: edgeStacksService,
stackCoordinator: stackCoordinator,
} }
h.Handle("/edge_stacks/create/{method}", h.Handle("/edge_stacks/create/{method}",

View file

@ -5,15 +5,18 @@ import (
"strconv" "strconv"
portainer "github.com/portainer/portainer/api" portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/portainer/portainer/api/filesystem" "github.com/portainer/portainer/api/filesystem"
edgestackutils "github.com/portainer/portainer/api/internal/edge/edgestacks"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
func (handler *Handler) updateStackVersion(stack *portainer.EdgeStack, deploymentType portainer.EdgeStackDeploymentType, config []byte, oldGitHash string, relatedEnvironmentsIDs []portainer.EndpointID) error { func (handler *Handler) updateStackVersion(tx dataservices.DataStoreTx, stack *portainer.EdgeStack, deploymentType portainer.EdgeStackDeploymentType, config []byte, oldGitHash string, relatedEnvironmentsIDs []portainer.EndpointID) error {
stack.Version = stack.Version + 1 stack.Version++
stack.Status = edgestackutils.NewStatus(stack.Status, relatedEnvironmentsIDs)
if err := tx.EdgeStackStatus().Clear(stack.ID, relatedEnvironmentsIDs); err != nil {
return err
}
return handler.storeStackFile(stack, deploymentType, config) return handler.storeStackFile(stack, deploymentType, config)
} }

View file

@ -287,11 +287,8 @@ func TestEdgeStackStatus(t *testing.T) {
edgeStackID := portainer.EdgeStackID(17) edgeStackID := portainer.EdgeStackID(17)
edgeStack := portainer.EdgeStack{ edgeStack := portainer.EdgeStack{
ID: edgeStackID, ID: edgeStackID,
Name: "test-edge-stack-17", Name: "test-edge-stack-17",
Status: map[portainer.EndpointID]portainer.EdgeStackStatus{
endpointID: {},
},
CreationDate: time.Now().Unix(), CreationDate: time.Now().Unix(),
EdgeGroups: []portainer.EdgeGroupID{1, 2}, EdgeGroups: []portainer.EdgeGroupID{1, 2},
ProjectPath: "/project/path", ProjectPath: "/project/path",

View file

@ -214,14 +214,9 @@ func (handler *Handler) deleteEndpoint(tx dataservices.DataStoreTx, endpointID p
log.Warn().Err(err).Msg("Unable to retrieve edge stacks from the database") log.Warn().Err(err).Msg("Unable to retrieve edge stacks from the database")
} }
for idx := range edgeStacks { for _, edgeStack := range edgeStacks {
edgeStack := &edgeStacks[idx] if err := tx.EdgeStackStatus().Delete(edgeStack.ID, endpoint.ID); err != nil {
if _, ok := edgeStack.Status[endpoint.ID]; ok { log.Warn().Err(err).Msg("Unable to delete edge stack status")
delete(edgeStack.Status, endpoint.ID)
if err := tx.EdgeStack().UpdateEdgeStack(edgeStack.ID, edgeStack); err != nil {
log.Warn().Err(err).Msg("Unable to update edge stack")
}
} }
} }

View file

@ -247,19 +247,17 @@ func (handler *Handler) filterEndpointsByQuery(
return filteredEndpoints, totalAvailableEndpoints, nil return filteredEndpoints, totalAvailableEndpoints, nil
} }
func endpointStatusInStackMatchesFilter(edgeStackStatus map[portainer.EndpointID]portainer.EdgeStackStatus, envId portainer.EndpointID, statusFilter portainer.EdgeStackStatusType) bool { func endpointStatusInStackMatchesFilter(stackStatus *portainer.EdgeStackStatusForEnv, envId portainer.EndpointID, statusFilter portainer.EdgeStackStatusType) bool {
status, ok := edgeStackStatus[envId]
// consider that if the env has no status in the stack it is in Pending state // consider that if the env has no status in the stack it is in Pending state
if statusFilter == portainer.EdgeStackStatusPending { if statusFilter == portainer.EdgeStackStatusPending {
return !ok || len(status.Status) == 0 return stackStatus == nil || len(stackStatus.Status) == 0
} }
if !ok { if stackStatus == nil {
return false return false
} }
return slices.ContainsFunc(status.Status, func(s portainer.EdgeStackDeploymentStatus) bool { return slices.ContainsFunc(stackStatus.Status, func(s portainer.EdgeStackDeploymentStatus) bool {
return s.Type == statusFilter return s.Type == statusFilter
}) })
} }
@ -291,7 +289,12 @@ func filterEndpointsByEdgeStack(endpoints []portainer.Endpoint, edgeStackId port
if statusFilter != nil { if statusFilter != nil {
n := 0 n := 0
for _, envId := range envIds { for _, envId := range envIds {
if endpointStatusInStackMatchesFilter(stack.Status, envId, *statusFilter) { edgeStackStatus, err := datastore.EdgeStackStatus().Read(edgeStackId, envId)
if err != nil {
return nil, errors.WithMessagef(err, "Unable to retrieve edge stack status for environment %d", envId)
}
if endpointStatusInStackMatchesFilter(edgeStackStatus, envId, *statusFilter) {
envIds[n] = envId envIds[n] = envId
n++ n++
} }

View file

@ -161,10 +161,7 @@ func (server *Server) Start() error {
edgeJobsHandler.FileService = server.FileService edgeJobsHandler.FileService = server.FileService
edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService edgeJobsHandler.ReverseTunnelService = server.ReverseTunnelService
edgeStackCoordinator := edgestacks.NewEdgeStackStatusUpdateCoordinator(server.DataStore) var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService)
go edgeStackCoordinator.Start()
var edgeStacksHandler = edgestacks.NewHandler(requestBouncer, server.DataStore, server.EdgeStacksService, edgeStackCoordinator)
edgeStacksHandler.FileService = server.FileService edgeStacksHandler.FileService = server.FileService
edgeStacksHandler.GitService = server.GitService edgeStacksHandler.GitService = server.GitService
edgeStacksHandler.KubernetesDeployer = server.KubernetesDeployer edgeStacksHandler.KubernetesDeployer = server.KubernetesDeployer

View file

@ -49,7 +49,6 @@ func (service *Service) BuildEdgeStack(
DeploymentType: deploymentType, DeploymentType: deploymentType,
CreationDate: time.Now().Unix(), CreationDate: time.Now().Unix(),
EdgeGroups: edgeGroups, EdgeGroups: edgeGroups,
Status: make(map[portainer.EndpointID]portainer.EdgeStackStatus, 0),
Version: 1, Version: 1,
UseManifestNamespaces: useManifestNamespaces, UseManifestNamespaces: useManifestNamespaces,
}, nil }, nil
@ -104,6 +103,14 @@ func (service *Service) PersistEdgeStack(
return nil, err return nil, err
} }
for _, endpointID := range relatedEndpointIds {
status := &portainer.EdgeStackStatusForEnv{EndpointID: endpointID}
if err := tx.EdgeStackStatus().Create(stack.ID, endpointID, status); err != nil {
return nil, err
}
}
if err := tx.EndpointRelation().AddEndpointRelationsForEdgeStack(relatedEndpointIds, stack.ID); err != nil { if err := tx.EndpointRelation().AddEndpointRelationsForEdgeStack(relatedEndpointIds, stack.ID); err != nil {
return nil, fmt.Errorf("unable to add endpoint relations: %w", err) return nil, fmt.Errorf("unable to add endpoint relations: %w", err)
} }
@ -158,5 +165,9 @@ func (service *Service) DeleteEdgeStack(tx dataservices.DataStoreTx, edgeStackID
return errors.WithMessage(err, "Unable to remove the edge stack from the database") return errors.WithMessage(err, "Unable to remove the edge stack from the database")
} }
if err := tx.EdgeStackStatus().DeleteAll(edgeStackID); err != nil {
return errors.WithMessage(err, "unable to remove edge stack statuses from the database")
}
return nil return nil
} }

View file

@ -1,26 +0,0 @@
package edgestacks
import (
portainer "github.com/portainer/portainer/api"
)
// NewStatus returns a new status object for an Edge stack
func NewStatus(oldStatus map[portainer.EndpointID]portainer.EdgeStackStatus, relatedEnvironmentIDs []portainer.EndpointID) map[portainer.EndpointID]portainer.EdgeStackStatus {
status := map[portainer.EndpointID]portainer.EdgeStackStatus{}
for _, environmentID := range relatedEnvironmentIDs {
newEnvStatus := portainer.EdgeStackStatus{
Status: []portainer.EdgeStackDeploymentStatus{},
EndpointID: environmentID,
}
oldEnvStatus, ok := oldStatus[environmentID]
if ok {
newEnvStatus.DeploymentInfo = oldEnvStatus.DeploymentInfo
}
status[environmentID] = newEnvStatus
}
return status
}

View file

@ -16,6 +16,7 @@ type testDatastore struct {
edgeGroup dataservices.EdgeGroupService edgeGroup dataservices.EdgeGroupService
edgeJob dataservices.EdgeJobService edgeJob dataservices.EdgeJobService
edgeStack dataservices.EdgeStackService edgeStack dataservices.EdgeStackService
edgeStackStatus dataservices.EdgeStackStatusService
endpoint dataservices.EndpointService endpoint dataservices.EndpointService
endpointGroup dataservices.EndpointGroupService endpointGroup dataservices.EndpointGroupService
endpointRelation dataservices.EndpointRelationService endpointRelation dataservices.EndpointRelationService
@ -53,8 +54,11 @@ func (d *testDatastore) CustomTemplate() dataservices.CustomTemplateService { re
func (d *testDatastore) EdgeGroup() dataservices.EdgeGroupService { return d.edgeGroup } func (d *testDatastore) EdgeGroup() dataservices.EdgeGroupService { return d.edgeGroup }
func (d *testDatastore) EdgeJob() dataservices.EdgeJobService { return d.edgeJob } func (d *testDatastore) EdgeJob() dataservices.EdgeJobService { return d.edgeJob }
func (d *testDatastore) EdgeStack() dataservices.EdgeStackService { return d.edgeStack } func (d *testDatastore) EdgeStack() dataservices.EdgeStackService { return d.edgeStack }
func (d *testDatastore) Endpoint() dataservices.EndpointService { return d.endpoint } func (d *testDatastore) EdgeStackStatus() dataservices.EdgeStackStatusService {
func (d *testDatastore) EndpointGroup() dataservices.EndpointGroupService { return d.endpointGroup } return d.edgeStackStatus
}
func (d *testDatastore) Endpoint() dataservices.EndpointService { return d.endpoint }
func (d *testDatastore) EndpointGroup() dataservices.EndpointGroupService { return d.endpointGroup }
func (d *testDatastore) EndpointRelation() dataservices.EndpointRelationService { func (d *testDatastore) EndpointRelation() dataservices.EndpointRelationService {
return d.endpointRelation return d.endpointRelation

View file

@ -336,6 +336,15 @@ type (
UseManifestNamespaces bool UseManifestNamespaces bool
} }
EdgeStackStatusForEnv struct {
EndpointID EndpointID
Status []EdgeStackDeploymentStatus
// EE only feature
DeploymentInfo StackDeploymentInfo
// ReadyRePullImage is a flag to indicate whether the auto update is trigger to re-pull image
ReadyRePullImage bool `json:"ReadyRePullImage,omitempty"`
}
EdgeStackDeploymentType int EdgeStackDeploymentType int
// EdgeStackID represents an edge stack id // EdgeStackID represents an edge stack id