From 62128d1069e2bfd073cf70e391e2f73eff601f86 Mon Sep 17 00:00:00 2001 From: andres-portainer <91705312+andres-portainer@users.noreply.github.com> Date: Mon, 10 Apr 2023 15:59:34 -0300 Subject: [PATCH] fix(edgejobs): migrate to transactional code EE-5324 (#8747) --- api/http/handler/edgejobs/edgejob_create.go | 127 ++++++++---------- api/http/handler/edgejobs/edgejob_delete.go | 34 ++++- .../edgejobs/edgejob_tasklogs_clear.go | 79 ++++++++--- .../edgejobs/edgejob_tasklogs_collect.go | 2 +- .../handler/edgejobs/edgejob_tasks_list.go | 31 +++-- api/http/handler/edgejobs/edgejob_update.go | 64 +++++---- api/http/handler/edgejobs/handler.go | 17 ++- api/portainer.go | 8 +- 8 files changed, 229 insertions(+), 133 deletions(-) diff --git a/api/http/handler/edgejobs/edgejob_create.go b/api/http/handler/edgejobs/edgejob_create.go index 41b9baacd..75cbe2bab 100644 --- a/api/http/handler/edgejobs/edgejob_create.go +++ b/api/http/handler/edgejobs/edgejob_create.go @@ -7,16 +7,26 @@ import ( "strings" "time" - "github.com/asaskevich/govalidator" httperror "github.com/portainer/libhttp/error" "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/maps" + "github.com/portainer/portainer/pkg/featureflags" + + "github.com/asaskevich/govalidator" ) +type edgeJobBasePayload struct { + Name string + CronExpression string + Recurring bool + Endpoints []portainer.EndpointID + EdgeGroups []portainer.EdgeGroupID +} + // @id EdgeJobCreate // @summary Create an EdgeJob // @description **Access policy**: administrator @@ -48,12 +58,8 @@ func (handler *Handler) edgeJobCreate(w http.ResponseWriter, r *http.Request) *h } type edgeJobCreateFromFileContentPayload struct { - Name string - CronExpression string - Recurring bool - Endpoints []portainer.EndpointID - EdgeGroups []portainer.EdgeGroupID - FileContent string + edgeJobBasePayload + FileContent string } func (payload *edgeJobCreateFromFileContentPayload) Validate(r *http.Request) error { @@ -87,32 +93,44 @@ func (handler *Handler) createEdgeJobFromFileContent(w http.ResponseWriter, r *h return httperror.BadRequest("Invalid request payload", err) } - edgeJob := handler.createEdgeJobObjectFromFileContentPayload(&payload) + var edgeJob *portainer.EdgeJob + if featureflags.IsEnabled(portainer.FeatureNoTx) { + edgeJob, err = handler.createEdgeJob(handler.DataStore, &payload.edgeJobBasePayload, []byte(payload.FileContent)) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + edgeJob, err = handler.createEdgeJob(tx, &payload.edgeJobBasePayload, []byte(payload.FileContent)) + + return err + }) + } + + return txResponse(w, edgeJob, err) +} + +func (handler *Handler) createEdgeJob(tx dataservices.DataStoreTx, payload *edgeJobBasePayload, fileContent []byte) (*portainer.EdgeJob, error) { + var err error + + edgeJob := handler.createEdgeJobObjectFromPayload(tx, payload) var endpoints []portainer.EndpointID if len(edgeJob.EdgeGroups) > 0 { - endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore) + endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, tx) if err != nil { - return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + return nil, httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) } } - err = handler.addAndPersistEdgeJob(edgeJob, []byte(payload.FileContent), endpoints) - + err = handler.addAndPersistEdgeJob(tx, edgeJob, fileContent, endpoints) if err != nil { - return httperror.InternalServerError("Unable to schedule Edge job", err) + return nil, httperror.InternalServerError("Unable to schedule Edge job", err) } - return response.JSON(w, edgeJob) + return edgeJob, nil } type edgeJobCreateFromFilePayload struct { - Name string - CronExpression string - Recurring bool - Endpoints []portainer.EndpointID - EdgeGroups []portainer.EdgeGroupID - File []byte + edgeJobBasePayload + File []byte } func (payload *edgeJobCreateFromFilePayload) Validate(r *http.Request) error { @@ -166,66 +184,35 @@ func (handler *Handler) createEdgeJobFromFile(w http.ResponseWriter, r *http.Req return httperror.BadRequest("Invalid request payload", err) } - edgeJob := handler.createEdgeJobObjectFromFilePayload(payload) + var edgeJob *portainer.EdgeJob + if featureflags.IsEnabled(portainer.FeatureNoTx) { + edgeJob, err = handler.createEdgeJob(handler.DataStore, &payload.edgeJobBasePayload, payload.File) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + edgeJob, err = handler.createEdgeJob(tx, &payload.edgeJobBasePayload, payload.File) - var endpoints []portainer.EndpointID - if len(edgeJob.EdgeGroups) > 0 { - endpoints, err = edge.GetEndpointsFromEdgeGroups(payload.EdgeGroups, handler.DataStore) - if err != nil { - return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) - } + return err + }) } - err = handler.addAndPersistEdgeJob(edgeJob, payload.File, endpoints) - - if err != nil { - return httperror.InternalServerError("Unable to schedule Edge job", err) - } - - return response.JSON(w, edgeJob) + return txResponse(w, edgeJob, err) } -func (handler *Handler) createEdgeJobObjectFromFilePayload(payload *edgeJobCreateFromFilePayload) *portainer.EdgeJob { - edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) - - endpoints := convertEndpointsToMetaObject(payload.Endpoints) - - edgeJob := &portainer.EdgeJob{ - ID: edgeJobIdentifier, +func (handler *Handler) createEdgeJobObjectFromPayload(tx dataservices.DataStoreTx, payload *edgeJobBasePayload) *portainer.EdgeJob { + return &portainer.EdgeJob{ + ID: portainer.EdgeJobID(tx.EdgeJob().GetNextIdentifier()), Name: payload.Name, CronExpression: payload.CronExpression, Recurring: payload.Recurring, Created: time.Now().Unix(), - Endpoints: endpoints, + Endpoints: convertEndpointsToMetaObject(payload.Endpoints), EdgeGroups: payload.EdgeGroups, Version: 1, GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}, } - - return edgeJob } -func (handler *Handler) createEdgeJobObjectFromFileContentPayload(payload *edgeJobCreateFromFileContentPayload) *portainer.EdgeJob { - edgeJobIdentifier := portainer.EdgeJobID(handler.DataStore.EdgeJob().GetNextIdentifier()) - - endpoints := convertEndpointsToMetaObject(payload.Endpoints) - - edgeJob := &portainer.EdgeJob{ - ID: edgeJobIdentifier, - Name: payload.Name, - CronExpression: payload.CronExpression, - Recurring: payload.Recurring, - Created: time.Now().Unix(), - Endpoints: endpoints, - EdgeGroups: payload.EdgeGroups, - Version: 1, - GroupLogsCollection: map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{}, - } - - return edgeJob -} - -func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error { +func (handler *Handler) addAndPersistEdgeJob(tx dataservices.DataStoreTx, edgeJob *portainer.EdgeJob, file []byte, endpointsFromGroups []portainer.EndpointID) error { edgeCronExpression := strings.Split(edgeJob.CronExpression, " ") if len(edgeCronExpression) == 6 { edgeCronExpression = edgeCronExpression[1:] @@ -233,7 +220,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] edgeJob.CronExpression = strings.Join(edgeCronExpression, " ") for ID := range edgeJob.Endpoints { - endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) + endpoint, err := tx.Endpoint().Endpoint(ID) if err != nil { return err } @@ -254,7 +241,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] endpointsMap = convertEndpointsToMetaObject(endpointsFromGroups) for ID := range endpointsMap { - endpoint, err := handler.DataStore.Endpoint().Endpoint(ID) + endpoint, err := tx.Endpoint().Endpoint(ID) if err != nil { return err } @@ -274,7 +261,7 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] } for endpointID := range endpointsMap { - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + endpoint, err := tx.Endpoint().Endpoint(endpointID) if err != nil { return err } @@ -282,5 +269,5 @@ func (handler *Handler) addAndPersistEdgeJob(edgeJob *portainer.EdgeJob, file [] handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) } - return handler.DataStore.EdgeJob().Create(edgeJob.ID, edgeJob) + return tx.EdgeJob().Create(edgeJob.ID, edgeJob) } diff --git a/api/http/handler/edgejobs/edgejob_delete.go b/api/http/handler/edgejobs/edgejob_delete.go index 471d374dc..55c4c6ad4 100644 --- a/api/http/handler/edgejobs/edgejob_delete.go +++ b/api/http/handler/edgejobs/edgejob_delete.go @@ -8,8 +8,10 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/maps" + "github.com/portainer/portainer/pkg/featureflags" "github.com/rs/zerolog/log" ) @@ -31,14 +33,34 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h return httperror.BadRequest("Invalid Edge job identifier route variable", err) } - edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) - if handler.DataStore.IsErrObjectNotFound(err) { + if featureflags.IsEnabled(portainer.FeatureNoTx) { + err = handler.deleteEdgeJob(handler.DataStore, portainer.EdgeJobID(edgeJobID)) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + return handler.deleteEdgeJob(tx, portainer.EdgeJobID(edgeJobID)) + }) + } + + if err != nil { + if httpErr, ok := err.(*httperror.HandlerError); ok { + return httpErr + } + + return httperror.InternalServerError("Unexpected error", err) + } + + return response.Empty(w) +} + +func (handler *Handler) deleteEdgeJob(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID) error { + edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if tx.IsErrObjectNotFound(err) { return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) } else if err != nil { return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) } - edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(edgeJobID)) + edgeJobFolder := handler.FileService.GetEdgeJobFolder(strconv.Itoa(int(edgeJobID))) err = handler.FileService.RemoveDirectory(edgeJobFolder) if err != nil { log.Warn().Err(err).Msg("Unable to remove the files associated to the Edge job on the filesystem") @@ -48,7 +70,7 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h var endpointsMap map[portainer.EndpointID]portainer.EdgeJobEndpointMeta if len(edgeJob.EdgeGroups) > 0 { - endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) if err != nil { return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) } @@ -63,10 +85,10 @@ func (handler *Handler) edgeJobDelete(w http.ResponseWriter, r *http.Request) *h handler.ReverseTunnelService.RemoveEdgeJobFromEndpoint(endpointID, edgeJob.ID) } - err = handler.DataStore.EdgeJob().DeleteEdgeJob(edgeJob.ID) + err = tx.EdgeJob().DeleteEdgeJob(edgeJob.ID) if err != nil { return httperror.InternalServerError("Unable to remove the Edge job from the database", err) } - return response.Empty(w) + return nil } diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go index 109907f27..67ee57296 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_clear.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_clear.go @@ -8,8 +8,10 @@ import ( "github.com/portainer/libhttp/request" "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/slices" + "github.com/portainer/portainer/pkg/featureflags" ) // @id EdgeJobTasksClear @@ -37,53 +39,86 @@ func (handler *Handler) edgeJobTasksClear(w http.ResponseWriter, r *http.Request return httperror.BadRequest("Invalid Task identifier route variable", err) } - edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) - if handler.DataStore.IsErrObjectNotFound(err) { + mutationFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) { + if slices.Contains(endpointsFromGroups, endpointID) { + edgeJob.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{ + CollectLogs: false, + LogsStatus: portainer.EdgeJobLogsStatusIdle, + } + } else { + meta := edgeJob.Endpoints[endpointID] + meta.CollectLogs = false + meta.LogsStatus = portainer.EdgeJobLogsStatusIdle + edgeJob.Endpoints[endpointID] = meta + } + } + + if featureflags.IsEnabled(portainer.FeatureNoTx) { + + updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error { + return handler.DataStore.EdgeJob().UpdateEdgeJobFunc(edgeJob.ID, func(j *portainer.EdgeJob) { + mutationFn(j, endpointID, endpointsFromGroups) + }) + } + + err = handler.clearEdgeJobTaskLogs(handler.DataStore, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + updateEdgeJobFn := func(edgeJob *portainer.EdgeJob, endpointID portainer.EndpointID, endpointsFromGroups []portainer.EndpointID) error { + mutationFn(edgeJob, endpointID, endpointsFromGroups) + + return tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + } + + return handler.clearEdgeJobTaskLogs(tx, portainer.EdgeJobID(edgeJobID), portainer.EndpointID(taskID), updateEdgeJobFn) + }) + } + + if err != nil { + if httpErr, ok := err.(*httperror.HandlerError); ok { + return httpErr + } + + return httperror.InternalServerError("Unexpected error", err) + } + + return response.Empty(w) +} + +func (handler *Handler) clearEdgeJobTaskLogs(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID, endpointID portainer.EndpointID, updateEdgeJob func(*portainer.EdgeJob, portainer.EndpointID, []portainer.EndpointID) error) error { + edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if tx.IsErrObjectNotFound(err) { return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) } else if err != nil { return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) } - err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID)) + err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))) if err != nil { return httperror.InternalServerError("Unable to clear log file from disk", err) } - endpointID := portainer.EndpointID(taskID) - endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + endpointsFromGroups, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) if err != nil { return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) } - err = handler.DataStore.EdgeJob().UpdateEdgeJobFunc(edgeJob.ID, func(j *portainer.EdgeJob) { - if slices.Contains(endpointsFromGroups, endpointID) { - j.GroupLogsCollection[endpointID] = portainer.EdgeJobEndpointMeta{ - CollectLogs: false, - LogsStatus: portainer.EdgeJobLogsStatusIdle, - } - } else { - meta := j.Endpoints[endpointID] - meta.CollectLogs = false - meta.LogsStatus = portainer.EdgeJobLogsStatusIdle - j.Endpoints[endpointID] = meta - } - }) - + err = updateEdgeJob(edgeJob, endpointID, endpointsFromGroups) if err != nil { return httperror.InternalServerError("Unable to persist Edge job changes in the database", err) } - err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(edgeJobID), strconv.Itoa(taskID)) + err = handler.FileService.ClearEdgeJobTaskLogs(strconv.Itoa(int(edgeJobID)), strconv.Itoa(int(endpointID))) if err != nil { return httperror.InternalServerError("Unable to clear log file from disk", err) } - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + endpoint, err := tx.Endpoint().Endpoint(endpointID) if err != nil { return httperror.NotFound("Unable to retrieve environment from the database", err) } handler.ReverseTunnelService.AddEdgeJob(endpoint, edgeJob) - return response.Empty(w) + return nil } diff --git a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go index f6045ec0e..c182f82f5 100644 --- a/api/http/handler/edgejobs/edgejob_tasklogs_collect.go +++ b/api/http/handler/edgejobs/edgejob_tasklogs_collect.go @@ -39,7 +39,7 @@ func (handler *Handler) edgeJobTasksCollect(w http.ResponseWriter, r *http.Reque err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) - if handler.DataStore.IsErrObjectNotFound(err) { + if tx.IsErrObjectNotFound(err) { return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) } else if err != nil { return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) diff --git a/api/http/handler/edgejobs/edgejob_tasks_list.go b/api/http/handler/edgejobs/edgejob_tasks_list.go index 8d30716b3..b8a19d134 100644 --- a/api/http/handler/edgejobs/edgejob_tasks_list.go +++ b/api/http/handler/edgejobs/edgejob_tasks_list.go @@ -6,10 +6,11 @@ import ( httperror "github.com/portainer/libhttp/error" "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/maps" + "github.com/portainer/portainer/pkg/featureflags" ) type taskContainer struct { @@ -37,20 +38,34 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request) return httperror.BadRequest("Invalid Edge job identifier route variable", err) } - edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) - if handler.DataStore.IsErrObjectNotFound(err) { - return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) + var tasks []taskContainer + if featureflags.IsEnabled(portainer.FeatureNoTx) { + tasks, err = listEdgeJobTasks(handler.DataStore, portainer.EdgeJobID(edgeJobID)) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + tasks, err = listEdgeJobTasks(tx, portainer.EdgeJobID(edgeJobID)) + return err + }) + } + + return txResponse(w, tasks, err) +} + +func listEdgeJobTasks(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID) ([]taskContainer, error) { + edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if tx.IsErrObjectNotFound(err) { + return nil, httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) } else if err != nil { - return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) + return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) } tasks := make([]taskContainer, 0) endpointsMap := map[portainer.EndpointID]portainer.EdgeJobEndpointMeta{} if len(edgeJob.EdgeGroups) > 0 { - endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) if err != nil { - return httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) + return nil, httperror.InternalServerError("Unable to get Endpoints from EdgeGroups", err) } endpointsMap = convertEndpointsToMetaObject(endpoints) @@ -67,5 +82,5 @@ func (handler *Handler) edgeJobTasksList(w http.ResponseWriter, r *http.Request) }) } - return response.JSON(w, tasks) + return tasks, nil } diff --git a/api/http/handler/edgejobs/edgejob_update.go b/api/http/handler/edgejobs/edgejob_update.go index 99a95d499..90ca51854 100644 --- a/api/http/handler/edgejobs/edgejob_update.go +++ b/api/http/handler/edgejobs/edgejob_update.go @@ -7,12 +7,13 @@ import ( httperror "github.com/portainer/libhttp/error" "github.com/portainer/libhttp/request" - "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" + "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/internal/edge" "github.com/portainer/portainer/api/internal/endpointutils" "github.com/portainer/portainer/api/internal/maps" "github.com/portainer/portainer/api/internal/slices" + "github.com/portainer/portainer/pkg/featureflags" "github.com/asaskevich/govalidator" ) @@ -30,6 +31,7 @@ func (payload *edgeJobUpdatePayload) Validate(r *http.Request) error { if payload.Name != nil && !govalidator.Matches(*payload.Name, `^[a-zA-Z0-9][a-zA-Z0-9_.-]+$`) { return errors.New("invalid Edge job name format. Allowed characters are: [a-zA-Z0-9_.-]") } + return nil } @@ -60,27 +62,41 @@ func (handler *Handler) edgeJobUpdate(w http.ResponseWriter, r *http.Request) *h return httperror.BadRequest("Invalid request payload", err) } - edgeJob, err := handler.DataStore.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) - if handler.DataStore.IsErrObjectNotFound(err) { - return httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) - } else if err != nil { - return httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) + var edgeJob *portainer.EdgeJob + if featureflags.IsEnabled(portainer.FeatureNoTx) { + edgeJob, err = handler.updateEdgeJob(handler.DataStore, portainer.EdgeJobID(edgeJobID), payload) + } else { + err = handler.DataStore.UpdateTx(func(tx dataservices.DataStoreTx) error { + edgeJob, err = handler.updateEdgeJob(tx, portainer.EdgeJobID(edgeJobID), payload) + return err + }) } - err = handler.updateEdgeSchedule(edgeJob, &payload) - if err != nil { - return httperror.InternalServerError("Unable to update Edge job", err) - } - - err = handler.DataStore.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) - if err != nil { - return httperror.InternalServerError("Unable to persist Edge job changes inside the database", err) - } - - return response.JSON(w, edgeJob) + return txResponse(w, edgeJob, err) } -func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload *edgeJobUpdatePayload) error { +func (handler *Handler) updateEdgeJob(tx dataservices.DataStoreTx, edgeJobID portainer.EdgeJobID, payload edgeJobUpdatePayload) (*portainer.EdgeJob, error) { + edgeJob, err := tx.EdgeJob().EdgeJob(portainer.EdgeJobID(edgeJobID)) + if tx.IsErrObjectNotFound(err) { + return nil, httperror.NotFound("Unable to find an Edge job with the specified identifier inside the database", err) + } else if err != nil { + return nil, httperror.InternalServerError("Unable to find an Edge job with the specified identifier inside the database", err) + } + + err = handler.updateEdgeSchedule(tx, edgeJob, &payload) + if err != nil { + return nil, httperror.InternalServerError("Unable to update Edge job", err) + } + + err = tx.EdgeJob().UpdateEdgeJob(edgeJob.ID, edgeJob) + if err != nil { + return nil, httperror.InternalServerError("Unable to persist Edge job changes inside the database", err) + } + + return edgeJob, nil +} + +func (handler *Handler) updateEdgeSchedule(tx dataservices.DataStoreTx, edgeJob *portainer.EdgeJob, payload *edgeJobUpdatePayload) error { if payload.Name != nil { edgeJob.Name = *payload.Name } @@ -99,7 +115,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * } for _, endpointID := range payload.Endpoints { - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + endpoint, err := tx.Endpoint().Endpoint(endpointID) if err != nil { return err } @@ -120,7 +136,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * } if len(payload.EdgeGroups) == 0 && len(edgeJob.EdgeGroups) > 0 { - endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, handler.DataStore) + endpoints, err := edge.GetEndpointsFromEdgeGroups(edgeJob.EdgeGroups, tx) if err != nil { return errors.New("unable to get endpoints from edge groups") } @@ -138,7 +154,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * if len(payload.EdgeGroups) > 0 { for _, edgeGroupID := range payload.EdgeGroups { - _, err := handler.DataStore.EdgeGroup().EdgeGroup(edgeGroupID) + _, err := tx.EdgeGroup().EdgeGroup(edgeGroupID) if err != nil { return err } @@ -148,7 +164,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * } } - endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, handler.DataStore) + endpointsFromGroupsToAdd, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToAdd, tx) if err != nil { return errors.New("unable to get endpoints from edge groups") } @@ -165,7 +181,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * } } - endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, handler.DataStore) + endpointsFromGroupsToRemove, err := edge.GetEndpointsFromEdgeGroups(edgeGroupsToRemove, tx) if err != nil { return errors.New("unable to get endpoints from edge groups") } @@ -212,7 +228,7 @@ func (handler *Handler) updateEdgeSchedule(edgeJob *portainer.EdgeJob, payload * maps.Copy(endpointsFromGroupsToAddMap, edgeJob.Endpoints) for endpointID := range endpointsFromGroupsToAddMap { - endpoint, err := handler.DataStore.Endpoint().Endpoint(endpointID) + endpoint, err := tx.Endpoint().Endpoint(endpointID) if err != nil { return err } diff --git a/api/http/handler/edgejobs/handler.go b/api/http/handler/edgejobs/handler.go index f7a7edfd1..3f409db6b 100644 --- a/api/http/handler/edgejobs/handler.go +++ b/api/http/handler/edgejobs/handler.go @@ -3,11 +3,13 @@ package edgejobs import ( "net/http" - "github.com/gorilla/mux" httperror "github.com/portainer/libhttp/error" + "github.com/portainer/libhttp/response" portainer "github.com/portainer/portainer/api" "github.com/portainer/portainer/api/dataservices" "github.com/portainer/portainer/api/http/security" + + "github.com/gorilla/mux" ) // Handler is the HTTP handler used to handle Edge job operations. @@ -44,6 +46,7 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler { bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksCollect)))).Methods(http.MethodPost) h.Handle("/edge_jobs/{id}/tasks/{taskID}/logs", bouncer.AdminAccess(bouncer.EdgeComputeOperation(httperror.LoggerHandler(h.edgeJobTasksClear)))).Methods(http.MethodDelete) + return h } @@ -56,3 +59,15 @@ func convertEndpointsToMetaObject(endpoints []portainer.EndpointID) map[portaine return endpointsMap } + +func txResponse(w http.ResponseWriter, r any, err error) *httperror.HandlerError { + if err != nil { + if httpErr, ok := err.(*httperror.HandlerError); ok { + return httpErr + } + + return httperror.InternalServerError("Unexpected error", err) + } + + return response.JSON(w, r) +} diff --git a/api/portainer.go b/api/portainer.go index 0d1771cfb..9b0aee208 100644 --- a/api/portainer.go +++ b/api/portainer.go @@ -1565,8 +1565,14 @@ const ( ) // List of supported features +const ( + FeatureFdo = "fdo" + FeatureNoTx = "noTx" +) + var SupportedFeatureFlags = []featureflags.Feature{ - "fdo", + FeatureFdo, + FeatureNoTx, } const (