1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-07-19 13:29:41 +02:00

fix(edgestacks): add a status update coordinator to increase performance BE-11572 (#337)

This commit is contained in:
andres-portainer 2025-01-22 20:24:54 -03:00 committed by GitHub
parent d35d8a7307
commit 9a86737caa
3 changed files with 182 additions and 39 deletions

View file

@ -8,7 +8,6 @@ import (
"time"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
httperror "github.com/portainer/portainer/pkg/libhttp/error"
"github.com/portainer/portainer/pkg/libhttp/request"
"github.com/portainer/portainer/pkg/libhttp/response"
@ -69,15 +68,17 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req
return httperror.BadRequest("Invalid request payload", fmt.Errorf("edge polling error: %w. Environment ID: %d", err, payload.EndpointID))
}
var stack *portainer.EdgeStack
if err := handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
if err := r.Context().Err(); err != nil {
return err
}
endpoint, err := handler.DataStore.Endpoint().Endpoint(payload.EndpointID)
if err != nil {
return handlerDBErr(fmt.Errorf("unable to find the environment from the database: %w. Environment ID: %d", err, payload.EndpointID), "unable to find the environment")
}
stack, err = handler.updateEdgeStackStatus(tx, r, portainer.EdgeStackID(stackID), payload)
return err
}); err != nil {
if err := handler.requestBouncer.AuthorizedEdgeEndpointOperation(r, endpoint); err != nil {
return httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name))
}
stack, err := handler.stackCoordinator.UpdateStatus(r, endpoint, portainer.EdgeStackID(stackID), payload)
if err != nil {
var httpErr *httperror.HandlerError
if errors.As(err, &httpErr) {
return httpErr
@ -93,36 +94,11 @@ func (handler *Handler) edgeStackStatusUpdate(w http.ResponseWriter, r *http.Req
return response.JSON(w, stack)
}
func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) {
stack, err := tx.EdgeStack().EdgeStack(stackID)
if err != nil {
if dataservices.IsErrObjectNotFound(err) {
// Skip error because agent tries to report on deleted stack
log.Debug().
Err(err).
Int("stackID", int(stackID)).
Int("status", int(*payload.Status)).
Msg("Unable to find a stack inside the database, skipping error")
return nil, nil
}
return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w. Environment ID: %d", err, payload.EndpointID)
}
func (handler *Handler) updateEdgeStackStatus(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error) {
if payload.Version > 0 && payload.Version < stack.Version {
return stack, nil
}
endpoint, err := tx.Endpoint().Endpoint(payload.EndpointID)
if err != nil {
return nil, handlerDBErr(fmt.Errorf("unable to find the environment from the database: %w. Environment ID: %d", err, payload.EndpointID), "unable to find the environment")
}
if err := handler.requestBouncer.AuthorizedEdgeEndpointOperation(r, endpoint); err != nil {
return nil, httperror.Forbidden("Permission denied to access environment", fmt.Errorf("unauthorized edge endpoint operation: %w. Environment name: %s", err, endpoint.Name))
}
status := *payload.Status
log.Debug().
@ -138,10 +114,6 @@ func (handler *Handler) updateEdgeStackStatus(tx dataservices.DataStoreTx, r *ht
updateEnvStatus(payload.EndpointID, stack, deploymentStatus)
if err := tx.EdgeStack().UpdateEdgeStack(stackID, stack); err != nil {
return nil, handlerDBErr(fmt.Errorf("unable to update Edge stack to the database: %w. Environment name: %s", err, endpoint.Name), "unable to update Edge stack")
}
return stack, nil
}

View file

@ -0,0 +1,166 @@
package edgestacks
import (
"errors"
"fmt"
"net/http"
portainer "github.com/portainer/portainer/api"
"github.com/portainer/portainer/api/dataservices"
"github.com/rs/zerolog/log"
)
type statusRequest struct {
r *http.Request
respCh chan statusResponse
endpoint *portainer.Endpoint
stackID portainer.EdgeStackID
payload updateStatusPayload
}
type statusResponse struct {
Stack *portainer.EdgeStack
Error error
}
type statusUpdateFn func(stack *portainer.EdgeStack, endpoint *portainer.Endpoint, r *http.Request, stackID portainer.EdgeStackID, payload updateStatusPayload) (*portainer.EdgeStack, error)
type EdgeStackStatusUpdateCoordinator struct {
updateCh chan statusRequest
dataStore dataservices.DataStore
statusUpdateFn statusUpdateFn
}
var errAnotherStackUpdateInProgress = errors.New("another stack update is in progress")
func NewEdgeStackStatusUpdateCoordinator(
dataStore dataservices.DataStore,
statusUpdateFn statusUpdateFn,
) *EdgeStackStatusUpdateCoordinator {
return &EdgeStackStatusUpdateCoordinator{
updateCh: make(chan statusRequest),
dataStore: dataStore,
statusUpdateFn: statusUpdateFn,
}
}
func (c *EdgeStackStatusUpdateCoordinator) Start() {
for {
c.loop()
}
}
func (c *EdgeStackStatusUpdateCoordinator) loop() {
u := <-c.updateCh
respChs := []chan statusResponse{u.respCh}
var stack *portainer.EdgeStack
err := c.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
// 1. Load the edge stack
var err error
stack, err = loadEdgeStack(tx, u.stackID)
if err != nil {
return err
}
// 2. Mutate the edge stack opportunistically until there are no more pending updates
for {
stack, err = c.statusUpdateFn(stack, u.endpoint, u.r, stack.ID, u.payload)
if err != nil {
return err
}
if m, ok := c.getNextUpdate(stack.ID); ok {
u = m
} else {
break
}
respChs = append(respChs, u.respCh)
}
// 3. Save the changes back to the database
if err := tx.EdgeStack().UpdateEdgeStack(stack.ID, stack); err != nil {
return handlerDBErr(fmt.Errorf("unable to update Edge stack: %w.", err), "Unable to persist the stack changes inside the database")
}
return nil
})
// 4. Send back the responses
for _, ch := range respChs {
ch <- statusResponse{Stack: stack, Error: err}
}
}
func loadEdgeStack(tx dataservices.DataStoreTx, stackID portainer.EdgeStackID) (*portainer.EdgeStack, error) {
stack, err := tx.EdgeStack().EdgeStack(stackID)
if err != nil {
if dataservices.IsErrObjectNotFound(err) {
// Skip the error when the agent tries to update the status on a deleted stack
log.Debug().
Err(err).
Int("stackID", int(stackID)).
Msg("Unable to find a stack inside the database, skipping error")
return nil, nil
}
return nil, fmt.Errorf("unable to retrieve Edge stack from the database: %w.", err)
}
return stack, nil
}
func (c *EdgeStackStatusUpdateCoordinator) getNextUpdate(stackID portainer.EdgeStackID) (statusRequest, bool) {
for {
select {
case u := <-c.updateCh:
// Discard the update and let the agent retry
if u.stackID != stackID {
u.respCh <- statusResponse{Error: errAnotherStackUpdateInProgress}
continue
}
return u, true
default:
return statusRequest{}, false
}
}
}
func (c *EdgeStackStatusUpdateCoordinator) UpdateStatus(
r *http.Request,
endpoint *portainer.Endpoint,
stackID portainer.EdgeStackID,
payload updateStatusPayload) (
*portainer.EdgeStack,
error,
) {
respCh := make(chan statusResponse)
defer close(respCh)
msg := statusRequest{
respCh: respCh,
r: r,
endpoint: endpoint,
stackID: stackID,
payload: payload,
}
select {
case c.updateCh <- msg:
r := <-respCh
return r.Stack, r.Error
case <-r.Context().Done():
return nil, r.Context().Err()
}
}

View file

@ -22,6 +22,7 @@ type Handler struct {
GitService portainer.GitService
edgeStacksService *edgestackservice.Service
KubernetesDeployer portainer.KubernetesDeployer
stackCoordinator *EdgeStackStatusUpdateCoordinator
}
// NewHandler creates a handler to manage environment(endpoint) group operations.
@ -33,6 +34,10 @@ func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStor
edgeStacksService: edgeStacksService,
}
h.stackCoordinator = NewEdgeStackStatusUpdateCoordinator(dataStore, h.updateEdgeStackStatus)
go h.stackCoordinator.Start()
h.Handle("/edge_stacks/create/{method}",
bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeStackCreate)))).Methods(http.MethodPost)
h.Handle("/edge_stacks",