From a09fe7e10c4c6a2b21bed2d8fb31315e7b5651f9 Mon Sep 17 00:00:00 2001 From: matias-portainer <104775949+matias-portainer@users.noreply.github.com> Date: Thu, 26 Jan 2023 11:32:11 -0300 Subject: [PATCH] chore(edgejobs): AddEdgeJob disregards async mode EE-4855 (#8287) --- api/chisel/schedules.go | 10 ++++-- api/chisel/tunnel.go | 5 +++ api/datastore/migrate_data.go | 1 + api/datastore/migrator/migrate_dbversion81.go | 36 +++++++++++++++++++ api/datastore/migrator/migrator.go | 6 +++- .../test_data/output_24_to_latest.json | 2 +- .../handler/edgegroups/edgegroup_update.go | 8 ++--- api/http/handler/edgejobs/edgejob_create.go | 7 +++- .../edgejobs/edgejob_tasklogs_clear.go | 7 +++- api/http/handler/edgejobs/edgejob_update.go | 7 +++- .../endpointedge/endpoint_edgejob_logs.go | 2 +- .../endpointedge_status_inspect_test.go | 2 +- api/internal/edge/edgejob.go | 7 +++- api/portainer.go | 2 +- 14 files changed, 86 insertions(+), 16 deletions(-) create mode 100644 api/datastore/migrator/migrate_dbversion81.go diff --git a/api/chisel/schedules.go b/api/chisel/schedules.go index f6c950bca..4d3267edf 100644 --- a/api/chisel/schedules.go +++ b/api/chisel/schedules.go @@ -6,9 +6,13 @@ import ( ) // AddEdgeJob register an EdgeJob inside the tunnel details associated to an environment(endpoint). -func (service *Service) AddEdgeJob(endpointID portainer.EndpointID, edgeJob *portainer.EdgeJob) { +func (service *Service) AddEdgeJob(endpoint *portainer.Endpoint, edgeJob *portainer.EdgeJob) { + if endpoint.Edge.AsyncMode { + return + } + service.mu.Lock() - tunnel := service.getTunnelDetails(endpointID) + tunnel := service.getTunnelDetails(endpoint.ID) existingJobIndex := -1 for idx, existingJob := range tunnel.Jobs { @@ -24,7 +28,7 @@ func (service *Service) AddEdgeJob(endpointID portainer.EndpointID, edgeJob *por tunnel.Jobs[existingJobIndex] = *edgeJob } - cache.Del(endpointID) + cache.Del(endpoint.ID) service.mu.Unlock() } diff --git a/api/chisel/tunnel.go b/api/chisel/tunnel.go index 41ca079b9..64e0afeb9 100644 --- a/api/chisel/tunnel.go +++ b/api/chisel/tunnel.go @@ -2,6 +2,7 @@ package chisel import ( "encoding/base64" + "errors" "fmt" "math/rand" "strings" @@ -66,6 +67,10 @@ func (service *Service) GetTunnelDetails(endpointID portainer.EndpointID) portai // GetActiveTunnel retrieves an active tunnel which allows communicating with edge agent func (service *Service) GetActiveTunnel(endpoint *portainer.Endpoint) (portainer.TunnelDetails, error) { + if endpoint.Edge.AsyncMode { + return portainer.TunnelDetails{}, errors.New("cannot open tunnel on async endpoint") + } + tunnel := service.GetTunnelDetails(endpoint.ID) if tunnel.Status == portainer.EdgeAgentActive { diff --git a/api/datastore/migrate_data.go b/api/datastore/migrate_data.go index a17b55742..15cfa4638 100644 --- a/api/datastore/migrate_data.go +++ b/api/datastore/migrate_data.go @@ -82,6 +82,7 @@ func (store *Store) newMigratorParameters(version *models.Version) *migrator.Mig DockerhubService: store.DockerHubService, AuthorizationService: authorization.NewService(store), EdgeStackService: store.EdgeStackService, + EdgeJobService: store.EdgeJobService, } } diff --git a/api/datastore/migrator/migrate_dbversion81.go b/api/datastore/migrator/migrate_dbversion81.go new file mode 100644 index 000000000..1e28096e1 --- /dev/null +++ b/api/datastore/migrator/migrate_dbversion81.go @@ -0,0 +1,36 @@ +package migrator + +import ( + "github.com/rs/zerolog/log" + + portainerDsErrors "github.com/portainer/portainer/api/dataservices/errors" +) + +func (m *Migrator) migrateDBVersionToDB81() error { + return m.updateEdgeStackStatusForDB81() +} + +func (m *Migrator) updateEdgeStackStatusForDB81() error { + log.Info().Msg("clean up deleted endpoints from edge jobs") + + edgeJobs, err := m.edgeJobService.EdgeJobs() + if err != nil { + return err + } + + for _, edgeJob := range edgeJobs { + for endpointId := range edgeJob.Endpoints { + _, err := m.endpointService.Endpoint(endpointId) + if err == portainerDsErrors.ErrObjectNotFound { + delete(edgeJob.Endpoints, endpointId) + + err = m.edgeJobService.UpdateEdgeJob(edgeJob.ID, &edgeJob) + if err != nil { + return err + } + } + } + } + + return nil +} diff --git a/api/datastore/migrator/migrator.go b/api/datastore/migrator/migrator.go index 747c059d1..cd2a07749 100644 --- a/api/datastore/migrator/migrator.go +++ b/api/datastore/migrator/migrator.go @@ -3,6 +3,7 @@ package migrator import ( "errors" + "github.com/portainer/portainer/api/dataservices/edgejob" "github.com/portainer/portainer/api/dataservices/edgestack" "github.com/Masterminds/semver" @@ -56,6 +57,7 @@ type ( authorizationService *authorization.Service dockerhubService *dockerhub.Service edgeStackService *edgestack.Service + edgeJobService *edgejob.Service } // MigratorParameters represents the required parameters to create a new Migrator instance. @@ -81,6 +83,7 @@ type ( AuthorizationService *authorization.Service DockerhubService *dockerhub.Service EdgeStackService *edgestack.Service + EdgeJobService *edgejob.Service } ) @@ -108,6 +111,7 @@ func NewMigrator(parameters *MigratorParameters) *Migrator { authorizationService: parameters.AuthorizationService, dockerhubService: parameters.DockerhubService, edgeStackService: parameters.EdgeStackService, + edgeJobService: parameters.EdgeJobService, } migrator.initMigrations() @@ -205,7 +209,7 @@ func (m *Migrator) initMigrations() { m.addMigrations("2.16", m.migrateDBVersionToDB70) m.addMigrations("2.16.1", m.migrateDBVersionToDB71) m.addMigrations("2.17", m.migrateDBVersionToDB80) - m.addMigrations("2.18") + m.addMigrations("2.18", m.migrateDBVersionToDB81) // Add new migrations below... // One function per migration, each versions migration funcs in the same file. diff --git a/api/datastore/test_data/output_24_to_latest.json b/api/datastore/test_data/output_24_to_latest.json index 2a9a8ead3..ff4a682b9 100644 --- a/api/datastore/test_data/output_24_to_latest.json +++ b/api/datastore/test_data/output_24_to_latest.json @@ -934,6 +934,6 @@ } ], "version": { - "VERSION": "{\"SchemaVersion\":\"2.18.0\",\"MigratorCount\":0,\"Edition\":1,\"InstanceID\":\"463d5c47-0ea5-4aca-85b1-405ceefee254\"}" + "VERSION": "{\"SchemaVersion\":\"2.18.0\",\"MigratorCount\":1,\"Edition\":1,\"InstanceID\":\"463d5c47-0ea5-4aca-85b1-405ceefee254\"}" } } \ No newline at end of file diff --git a/api/http/handler/edgegroups/edgegroup_update.go b/api/http/handler/edgegroups/edgegroup_update.go index a0ce97860..ee1b8b3a5 100644 --- a/api/http/handler/edgegroups/edgegroup_update.go +++ b/api/http/handler/edgegroups/edgegroup_update.go @@ -153,7 +153,7 @@ func (handler *Handler) edgeGroupUpdate(w http.ResponseWriter, r *http.Request) continue } - err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpointID, edgeJobs, operation) + err = handler.updateEndpointEdgeJobs(edgeGroup.ID, endpoint, edgeJobs, operation) if err != nil { return httperror.InternalServerError("Unable to persist Environment Edge Jobs changes inside the database", err) } @@ -200,7 +200,7 @@ func (handler *Handler) updateEndpointStacks(endpointID portainer.EndpointID) er return handler.DataStore.EndpointRelation().UpdateEndpointRelation(endpoint.ID, relation) } -func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpointID portainer.EndpointID, edgeJobs []portainer.EdgeJob, operation string) error { +func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID, endpoint *portainer.Endpoint, edgeJobs []portainer.EdgeJob, operation string) error { for _, edgeJob := range edgeJobs { if !slices.Contains(edgeJob.EdgeGroups, edgeGroupID) { continue @@ -208,9 +208,9 @@ func (handler *Handler) updateEndpointEdgeJobs(edgeGroupID portainer.EdgeGroupID switch operation { case "add": - handler.ReverseTunnelService.AddEdgeJob(endpointID, &edgeJob) + handler.ReverseTunnelService.AddEdgeJob(endpoint, &edgeJob) case "remove": - handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) + handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpoint.ID, edgeJob.ID) } } diff --git a/api/http/handler/edgejobs/edgejob_create.go b/api/http/handler/edgejobs/edgejob_create.go index 1412db4d0..41b9baacd 100644 --- a/api/http/handler/edgejobs/edgejob_create.go +++ b/api/http/handler/edgejobs/edgejob_create.go @@ -274,7 +274,12 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] } for endpointID := range endpointsMap { - handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return err + } + + handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) } return handler.DataStore.EdgeJob().Create(edgeJob.ID, edgeJob) diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go index e4d3cdced..7d339b9e8 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go @@ -67,7 +67,12 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request return httperror.InternalServerError("Unable to clear log file from disk", err) } - handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return httperror.NotFound("Unable to retrieve environment from the database", err) + } + + handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) if err != nil { diff --git a/api/http/handler/edgejobs/edgejob_update.go b/api/http/handler/edgejobs/edgejob_update.go index 21dd2608e..00d3bdae9 100644 --- a/api/http/handler/edgejobs/edgejob_update.go +++ b/api/http/handler/edgejobs/edgejob_update.go @@ -212,7 +212,12 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints) for endpointID := range endpointsFromGroupsToAddMap { - handler.ReverseTunnelService.AddEdgeJob(endpointID, edgeJob) + endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return err + } + + handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) } for endpointID := range endpointsToRemove { diff --git a/api/http/handler/endpointedge/endpoint_edgejob_logs.go b/api/http/handler/endpointedge/endpoint_edgejob_logs.go index 1a38e8928..a06c11fed 100644 --- a/api/http/handler/endpointedge/endpoint_edgejob_logs.go +++ b/api/http/handler/endpointedge/endpoint_edgejob_logs.go @@ -74,7 +74,7 @@ func (handler *Handler) endpointEdgeJobsLogs(w http.ResponseWriter, r *http.Requ err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) - handler.ReverseTunnelService.AddEdgeJob(endpoint.ID, edgeJob) + handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) if err != nil { return httperror.InternalServerError("Unable to persist edge job changes to the database", err) diff --git a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go index 788c03cc5..442f86807 100644 --- a/api/http/handler/endpointedge/endpointedge_status_inspect_test.go +++ b/api/http/handler/endpointedge/endpointedge_status_inspect_test.go @@ -416,7 +416,7 @@ func TestEdgeJobsResponse(t *testing.T) { Version: 57, } - handler.ReverseTunnelService.AddEdgeJob(endpoint.ID, &edgeJob) + handler.ReverseTunnelService.AddEdgeJob(&endpoint, &edgeJob) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("/api/endpoints/%d/edge/status", endpoint.ID), nil) if err != nil { diff --git a/api/internal/edge/edgejob.go b/api/internal/edge/edgejob.go index ab72500ba..6869ecc8b 100644 --- a/api/internal/edge/edgejob.go +++ b/api/internal/edge/edgejob.go @@ -14,7 +14,12 @@ func LoadEdgeJobs(dataStore dataservices.DataStore, reverseTunnelService portain for _, edgeJob := range edgeJobs { for endpointID := range edgeJob.Endpoints { - reverseTunnelService.AddEdgeJob(endpointID, &edgeJob) + endpoint, err := dataStore.Endpoint().Endpoint(endpointID) + if err != nil { + return err + } + + reverseTunnelService.AddEdgeJob(endpoint, &edgeJob) } } diff --git a/api/portainer.go b/api/portainer.go index 5ca163066..146c72aa7 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -1454,7 +1454,7 @@ type ( KeepTunnelAlive(endpointID EndpointID, ctx context.Context, maxKeepAlive time.Duration) GetTunnelDetails(endpointID EndpointID) TunnelDetails GetActiveTunnel(endpoint *Endpoint) (TunnelDetails, error) - AddEdgeJob(endpointID EndpointID, edgeJob *EdgeJob) + AddEdgeJob(endpoint *Endpoint, edgeJob *EdgeJob) RemoveEdgeJob(edgeJobID EdgeJobID) RemoveEdgeJobFromEndpoint(endpointID EndpointID, edgeJobID EdgeJobID) }