mirror of
https://github.com/portainer/portainer.git
synced 2025-07-23 07:19:41 +02:00
fix(pending-actions): further refactoring [EE-7011] (#11806)
Some checks failed
Test / test-server (map[arch:arm64 platform:linux]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:windows version:1809]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:windows version:ltsc2022]) (push) Has been cancelled
ci / build_images (map[arch:arm platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:arm64 platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:ppc64le platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:s390x platform:linux version:]) (push) Has been cancelled
/ triage (push) Has been cancelled
Lint / Run linters (push) Has been cancelled
Test / test-client (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:linux]) (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:windows version:1809]) (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:windows version:ltsc2022]) (push) Has been cancelled
ci / build_manifests (push) Has been cancelled
Some checks failed
Test / test-server (map[arch:arm64 platform:linux]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:windows version:1809]) (push) Has been cancelled
ci / build_images (map[arch:amd64 platform:windows version:ltsc2022]) (push) Has been cancelled
ci / build_images (map[arch:arm platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:arm64 platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:ppc64le platform:linux version:]) (push) Has been cancelled
ci / build_images (map[arch:s390x platform:linux version:]) (push) Has been cancelled
/ triage (push) Has been cancelled
Lint / Run linters (push) Has been cancelled
Test / test-client (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:linux]) (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:windows version:1809]) (push) Has been cancelled
Test / test-server (map[arch:amd64 platform:windows version:ltsc2022]) (push) Has been cancelled
ci / build_manifests (push) Has been cancelled
This commit is contained in:
parent
b4e829e8c6
commit
6a51b6b41e
6 changed files with 56 additions and 62 deletions
|
@ -486,7 +486,7 @@ func buildServer(flags *portainer.CLIFlags) portainer.Server {
|
||||||
|
|
||||||
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory)
|
pendingActionsService := pendingactions.NewService(dataStore, kubernetesClientFactory)
|
||||||
pendingActionsService.RegisterHandler(actions.CleanNAPWithOverridePolicies, handlers.NewHandlerCleanNAPWithOverridePolicies(authorizationService, dataStore))
|
pendingActionsService.RegisterHandler(actions.CleanNAPWithOverridePolicies, handlers.NewHandlerCleanNAPWithOverridePolicies(authorizationService, dataStore))
|
||||||
pendingActionsService.RegisterHandler(actions.DeleteK8sRegistrySecrets, handlers.NewHandlerDeleteRegistrySecrets(authorizationService, dataStore, kubernetesClientFactory))
|
pendingActionsService.RegisterHandler(actions.DeletePortainerK8sRegistrySecrets, handlers.NewHandlerDeleteRegistrySecrets(authorizationService, dataStore, kubernetesClientFactory))
|
||||||
pendingActionsService.RegisterHandler(actions.PostInitMigrateEnvironment, handlers.NewHandlerPostInitMigrateEnvironment(authorizationService, dataStore, kubernetesClientFactory, dockerClientFactory, *flags.Assets, kubernetesDeployer))
|
pendingActionsService.RegisterHandler(actions.PostInitMigrateEnvironment, handlers.NewHandlerPostInitMigrateEnvironment(authorizationService, dataStore, kubernetesClientFactory, dockerClientFactory, *flags.Assets, kubernetesDeployer))
|
||||||
|
|
||||||
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
|
snapshotService, err := initSnapshotService(*flags.SnapshotInterval, dataStore, dockerClientFactory, kubernetesClientFactory, shutdownCtx, pendingActionsService)
|
||||||
|
|
|
@ -2,8 +2,6 @@ package postinit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/container"
|
"github.com/docker/docker/api/types/container"
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
|
@ -74,28 +72,11 @@ func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
|
||||||
// this function exists for readability, not reusability
|
// this function exists for readability, not reusability
|
||||||
// TODO: This should be moved into pending actions as part of the pending action migration
|
// TODO: This should be moved into pending actions as part of the pending action migration
|
||||||
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
|
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
|
||||||
migrateEnvPendingAction := portainer.PendingAction{
|
// If there are no pending actions for the given endpoint, create one
|
||||||
|
err := postInitMigrator.dataStore.PendingActions().Create(&portainer.PendingAction{
|
||||||
EndpointID: environmentID,
|
EndpointID: environmentID,
|
||||||
Action: actions.PostInitMigrateEnvironment,
|
Action: actions.PostInitMigrateEnvironment,
|
||||||
}
|
})
|
||||||
|
|
||||||
// Get all pending actions and filter them by endpoint, action and action args that are equal to the migrateEnvPendingAction
|
|
||||||
pendingActions, err := postInitMigrator.dataStore.PendingActions().ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msgf("Error retrieving pending actions")
|
|
||||||
return fmt.Errorf("failed to retrieve pending actions for environment %d: %w", environmentID, err)
|
|
||||||
}
|
|
||||||
for _, pendingAction := range pendingActions {
|
|
||||||
if pendingAction.EndpointID == environmentID &&
|
|
||||||
pendingAction.Action == migrateEnvPendingAction.Action &&
|
|
||||||
reflect.DeepEqual(pendingAction.ActionData, migrateEnvPendingAction.ActionData) {
|
|
||||||
log.Debug().Msgf("Migration pending action for environment %d already exists, skipping creating another", environmentID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If there are no pending actions for the given endpoint, create one
|
|
||||||
err = postInitMigrator.dataStore.PendingActions().Create(&migrateEnvPendingAction)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environmentID)
|
log.Error().Err(err).Msgf("Error creating pending action for environment %d", environmentID)
|
||||||
}
|
}
|
||||||
|
|
|
@ -312,10 +312,7 @@ func updateEndpointStatus(tx dataservices.DataStoreTx, endpoint *portainer.Endpo
|
||||||
|
|
||||||
// Run the pending actions
|
// Run the pending actions
|
||||||
if latestEndpointReference.Status == portainer.EndpointStatusUp {
|
if latestEndpointReference.Status == portainer.EndpointStatusUp {
|
||||||
err = pendingActionsService.Execute(endpoint.ID)
|
pendingActionsService.Execute(endpoint.ID)
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Msg("background schedule error (environment snapshot), unable to execute pending actions")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,6 @@ package actions
|
||||||
|
|
||||||
const (
|
const (
|
||||||
CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies"
|
CleanNAPWithOverridePolicies = "CleanNAPWithOverridePolicies"
|
||||||
DeleteK8sRegistrySecrets = "DeleteK8sRegistrySecrets"
|
DeletePortainerK8sRegistrySecrets = "DeletePortainerK8sRegistrySecrets"
|
||||||
PostInitMigrateEnvironment = "PostInitMigrateEnvironment"
|
PostInitMigrateEnvironment = "PostInitMigrateEnvironment"
|
||||||
)
|
)
|
||||||
|
|
|
@ -25,7 +25,7 @@ type (
|
||||||
func NewDeleteK8sRegistrySecrets(endpointID portainer.EndpointID, registryID portainer.RegistryID, namespaces []string) portainer.PendingAction {
|
func NewDeleteK8sRegistrySecrets(endpointID portainer.EndpointID, registryID portainer.RegistryID, namespaces []string) portainer.PendingAction {
|
||||||
return portainer.PendingAction{
|
return portainer.PendingAction{
|
||||||
EndpointID: endpointID,
|
EndpointID: endpointID,
|
||||||
Action: actions.DeleteK8sRegistrySecrets,
|
Action: actions.DeletePortainerK8sRegistrySecrets,
|
||||||
ActionData: &deleteK8sRegistrySecretsData{
|
ActionData: &deleteK8sRegistrySecretsData{
|
||||||
RegistryID: registryID,
|
RegistryID: registryID,
|
||||||
Namespaces: namespaces,
|
Namespaces: namespaces,
|
||||||
|
|
|
@ -2,6 +2,7 @@ package pendingactions
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
|
@ -34,72 +35,87 @@ func (service *PendingActionsService) RegisterHandler(name string, handler porta
|
||||||
handlers[name] = handler
|
handlers[name] = handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *PendingActionsService) Create(r portainer.PendingAction) error {
|
func (service *PendingActionsService) Create(action portainer.PendingAction) error {
|
||||||
return service.dataStore.PendingActions().Create(&r)
|
// Check if this pendingAction already exists
|
||||||
|
pendingActions, err := service.dataStore.PendingActions().ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to retrieve pending actions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dba := range pendingActions {
|
||||||
|
// Same endpoint, same action and data, don't create a repeat
|
||||||
|
if dba.EndpointID == action.EndpointID && dba.Action == action.Action &&
|
||||||
|
reflect.DeepEqual(dba.ActionData, action.ActionData) {
|
||||||
|
log.Debug().Msgf("pending action %s already exists for environment %d, skipping...", action.Action, action.EndpointID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return service.dataStore.PendingActions().Create(&action)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *PendingActionsService) Execute(id portainer.EndpointID) error {
|
func (service *PendingActionsService) Execute(id portainer.EndpointID) {
|
||||||
service.mu.Lock()
|
service.mu.Lock()
|
||||||
defer service.mu.Unlock()
|
defer service.mu.Unlock()
|
||||||
|
|
||||||
endpoint, err := service.dataStore.Endpoint().Endpoint(id)
|
endpoint, err := service.dataStore.Endpoint().Endpoint(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to retrieve environment %d: %w", id, err)
|
log.Debug().Msgf("failed to retrieve environment %d: %v", id, err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
isKubernetesEndpoint := endpointutils.IsKubernetesEndpoint(endpoint) && !endpointutils.IsEdgeEndpoint(endpoint)
|
isKubernetesEndpoint := endpointutils.IsKubernetesEndpoint(endpoint) && !endpointutils.IsEdgeEndpoint(endpoint)
|
||||||
|
|
||||||
// EndpointStatusUp is only relevant for non-Kubernetes endpoints
|
// EndpointStatusUp is only relevant for non-Kubernetes endpoints
|
||||||
// Sometimes the endpoint is UP but the status is not updated in the database
|
// Sometimes the endpoint is UP but the status is not updated in the database
|
||||||
if !isKubernetesEndpoint && endpoint.Status != portainer.EndpointStatusUp {
|
if !isKubernetesEndpoint {
|
||||||
log.Debug().Msgf("Environment %q (id: %d) is not up", endpoint.Name, id)
|
if endpoint.Status != portainer.EndpointStatusUp {
|
||||||
return fmt.Errorf("environment %q (id: %d) is not up", endpoint.Name, id)
|
return
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
// For Kubernetes endpoints, we need to check if the endpoint is up by creating a kube client
|
// For Kubernetes endpoints, we need to check if the client can be created
|
||||||
if isKubernetesEndpoint {
|
if _, err := service.kubeFactory.GetKubeClient(endpoint); err != nil {
|
||||||
_, err := service.kubeFactory.GetKubeClient(endpoint)
|
return
|
||||||
if err != nil {
|
|
||||||
log.Debug().Err(err).Msgf("Environment %q (id: %d) is not up", endpoint.Name, id)
|
|
||||||
return fmt.Errorf("environment %q (id: %d) is not up", endpoint.Name, id)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pendingActions, err := service.dataStore.PendingActions().ReadAll()
|
pendingActions, err := service.dataStore.PendingActions().ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msgf("failed to retrieve pending actions")
|
log.Warn().Msgf("failed to read pending actions: %v", err)
|
||||||
return fmt.Errorf("failed to retrieve pending actions for environment %d: %w", id, err)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, endpointPendingAction := range pendingActions {
|
log.Debug().Msgf("Executing pending actions for environment %d", id)
|
||||||
if endpointPendingAction.EndpointID == id {
|
for _, pendingAction := range pendingActions {
|
||||||
err := service.executePendingAction(endpointPendingAction, endpoint)
|
if pendingAction.EndpointID == id {
|
||||||
|
log.Debug().Msgf("executing pending action id=%d, action=%s", pendingAction.ID, pendingAction.Action)
|
||||||
|
err := service.executePendingAction(pendingAction, endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn().Err(err).Msgf("failed to execute pending action")
|
log.Warn().Msgf("failed to execute pending action: %v", err)
|
||||||
return fmt.Errorf("failed to execute pending action: %w", err)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.dataStore.PendingActions().Delete(endpointPendingAction.ID)
|
err = service.dataStore.PendingActions().Delete(pendingAction.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msgf("failed to delete pending action")
|
log.Warn().Msgf("failed to delete pending action: %v", err)
|
||||||
return fmt.Errorf("failed to delete pending action: %w", err)
|
return
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
log.Debug().Msgf("pending action %d finished", pendingAction.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingAction, endpoint *portainer.Endpoint) error {
|
func (service *PendingActionsService) executePendingAction(pendingAction portainer.PendingAction, endpoint *portainer.Endpoint) error {
|
||||||
log.Debug().Msgf("Executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID)
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Debug().Msgf("End executing pending action %s for environment %d", pendingAction.Action, pendingAction.EndpointID)
|
if r := recover(); r != nil {
|
||||||
|
log.Error().Msgf("recovered from panic while executing pending action %s for environment %d: %v", pendingAction.Action, pendingAction.EndpointID, r)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
handler, ok := handlers[pendingAction.Action]
|
handler, ok := handlers[pendingAction.Action]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warn().Msgf("No handler found for pending action %s", pendingAction.Action)
|
log.Warn().Msgf("no handler found for pending action %s", pendingAction.Action)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue