1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-07-19 21:39:40 +02:00

feat(api): revamp scheduling to introduce system schedules (#2433)

* feat(api): revamp scheduling to introduce system schedules

* fix(api): fix linting issues

* fix(api): fix lint issues

* refactor(api): fix lint issues
This commit is contained in:
Anthony Lapenna 2018-11-06 22:49:48 +13:00 committed by GitHub
parent dbbea0a20f
commit 110fcc46a6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 475 additions and 297 deletions

View file

@ -77,6 +77,32 @@ func (service *Service) Schedules() ([]portainer.Schedule, error) {
return schedules, err return schedules, err
} }
// SchedulesByJobType return a array containing all the schedules
// with the specified JobType.
func (service *Service) SchedulesByJobType(jobType portainer.JobType) ([]portainer.Schedule, error) {
var schedules = make([]portainer.Schedule, 0)
err := service.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(BucketName))
cursor := bucket.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var schedule portainer.Schedule
err := internal.UnmarshalObject(v, &schedule)
if err != nil {
return err
}
if schedule.JobType == jobType {
schedules = append(schedules, schedule)
}
}
return nil
})
return schedules, err
}
// CreateSchedule assign an ID to a new schedule and saves it. // CreateSchedule assign an ID to a new schedule and saves it.
func (service *Service) CreateSchedule(schedule *portainer.Schedule) error { func (service *Service) CreateSchedule(schedule *portainer.Schedule) error {
return service.db.Update(func(tx *bolt.Tx) error { return service.db.Update(func(tx *bolt.Tx) error {

View file

@ -110,39 +110,80 @@ func initSnapshotter(clientFactory *docker.ClientFactory) portainer.Snapshotter
return docker.NewSnapshotter(clientFactory) return docker.NewSnapshotter(clientFactory)
} }
func initJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter, flags *portainer.CLIFlags) (portainer.JobScheduler, error) { func initJobScheduler() portainer.JobScheduler {
jobScheduler := cron.NewJobScheduler() return cron.NewJobScheduler()
}
func loadSnapshotSystemSchedule(jobScheduler portainer.JobScheduler, snapshotter portainer.Snapshotter, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error {
if !*flags.Snapshot {
return nil
}
schedules, err := scheduleService.SchedulesByJobType(portainer.SnapshotJobType)
if err != nil {
return err
}
if len(schedules) != 0 {
return nil
}
snapshotJob := &portainer.SnapshotJob{}
snapshotSchedule := &portainer.Schedule{
ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()),
Name: "system_snapshot",
CronExpression: "@every " + *flags.SnapshotInterval,
JobType: portainer.SnapshotJobType,
SnapshotJob: snapshotJob,
}
snapshotJobContext := cron.NewSnapshotJobContext(endpointService, snapshotter)
snapshotJobRunner := cron.NewSnapshotJobRunner(snapshotJob, snapshotJobContext)
err = jobScheduler.CreateSchedule(snapshotSchedule, snapshotJobRunner)
if err != nil {
return err
}
return scheduleService.CreateSchedule(snapshotSchedule)
}
func loadEndpointSyncSystemSchedule(jobScheduler portainer.JobScheduler, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, flags *portainer.CLIFlags) error {
if *flags.ExternalEndpoints == "" {
return nil
}
if *flags.ExternalEndpoints != "" {
log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.") log.Println("Using external endpoint definition. Endpoint management via the API will be disabled.")
endpointSyncTaskContext := &cron.EndpointSyncTaskContext{ schedules, err := scheduleService.SchedulesByJobType(portainer.EndpointSyncJobType)
EndpointService: endpointService,
EndpointFilePath: *flags.ExternalEndpoints,
}
endpointSyncTask := cron.NewEndpointSyncTask(endpointSyncTaskContext)
err := jobScheduler.ScheduleTask("@every "+*flags.SyncInterval, endpointSyncTask)
if err != nil { if err != nil {
return nil, err return err
}
} }
if *flags.Snapshot { if len(schedules) != 0 {
return nil
endpointSnapshotTaskContext := &cron.SnapshotTaskContext{
EndpointService: endpointService,
Snapshotter: snapshotter,
} }
endpointSnapshotTask := cron.NewSnapshotTask(endpointSnapshotTaskContext)
err := jobScheduler.ScheduleTask("@every "+*flags.SnapshotInterval, endpointSnapshotTask) endpointSyncJob := &portainer.EndpointSyncJob{}
endointSyncSchedule := &portainer.Schedule{
ID: portainer.ScheduleID(scheduleService.GetNextIdentifier()),
Name: "system_endpointsync",
CronExpression: "@every " + *flags.SyncInterval,
JobType: portainer.EndpointSyncJobType,
EndpointSyncJob: endpointSyncJob,
}
endpointSyncJobContext := cron.NewEndpointSyncJobContext(endpointService, *flags.ExternalEndpoints)
endpointSyncJobRunner := cron.NewEndpointSyncJobRunner(endpointSyncJob, endpointSyncJobContext)
err = jobScheduler.CreateSchedule(endointSyncSchedule, endpointSyncJobRunner)
if err != nil { if err != nil {
return nil, err return err
}
} }
return jobScheduler, nil return scheduleService.CreateSchedule(endointSyncSchedule)
} }
func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, fileService portainer.FileService) error { func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService portainer.JobService, scheduleService portainer.ScheduleService, endpointService portainer.EndpointService, fileService portainer.FileService) error {
@ -152,16 +193,11 @@ func loadSchedulesFromDatabase(jobScheduler portainer.JobScheduler, jobService p
} }
for _, schedule := range schedules { for _, schedule := range schedules {
taskContext := &cron.ScriptTaskContext{
JobService: jobService,
EndpointService: endpointService,
FileService: fileService,
ScheduleID: schedule.ID,
TargetEndpoints: schedule.Endpoints,
}
schedule.Task.(cron.ScriptTask).SetContext(taskContext) jobContext := cron.NewScriptExecutionJobContext(jobService, endpointService, fileService)
err = jobScheduler.ScheduleTask(schedule.CronExpression, schedule.Task) jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext)
err = jobScheduler.CreateSchedule(&schedule, jobRunner)
if err != nil { if err != nil {
return err return err
} }
@ -455,12 +491,19 @@ func main() {
snapshotter := initSnapshotter(clientFactory) snapshotter := initSnapshotter(clientFactory)
jobScheduler, err := initJobScheduler(store.EndpointService, snapshotter, flags) jobScheduler := initJobScheduler()
err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = loadSchedulesFromDatabase(jobScheduler, jobService, store.ScheduleService, store.EndpointService, fileService) err = loadEndpointSyncSystemSchedule(jobScheduler, store.ScheduleService, store.EndpointService, flags)
if err != nil {
log.Fatal(err)
}
err = loadSnapshotSystemSchedule(jobScheduler, snapshotter, store.ScheduleService, store.EndpointService, flags)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View file

@ -9,27 +9,41 @@ import (
"github.com/portainer/portainer" "github.com/portainer/portainer"
) )
type ( // EndpointSyncJobRunner is used to run a EndpointSyncJob
// EndpointSyncTask represents a task used to synchronize endpoints type EndpointSyncJobRunner struct {
// based on an external file. It can be scheduled. job *portainer.EndpointSyncJob
EndpointSyncTask struct { context *EndpointSyncJobContext
context *EndpointSyncTaskContext
} }
// EndpointSyncTaskContext represents the context required for the execution // EndpointSyncJobContext represents the context of execution of a EndpointSyncJob
// of an EndpointSyncTask. type EndpointSyncJobContext struct {
EndpointSyncTaskContext struct { endpointService portainer.EndpointService
EndpointService portainer.EndpointService endpointFilePath string
EndpointFilePath string
} }
synchronization struct { // NewEndpointSyncJobContext returns a new context that can be used to execute a EndpointSyncJob
func NewEndpointSyncJobContext(endpointService portainer.EndpointService, endpointFilePath string) *EndpointSyncJobContext {
return &EndpointSyncJobContext{
endpointService: endpointService,
endpointFilePath: endpointFilePath,
}
}
// NewEndpointSyncJobRunner returns a new runner that can be scheduled
func NewEndpointSyncJobRunner(job *portainer.EndpointSyncJob, context *EndpointSyncJobContext) *EndpointSyncJobRunner {
return &EndpointSyncJobRunner{
job: job,
context: context,
}
}
type synchronization struct {
endpointsToCreate []*portainer.Endpoint endpointsToCreate []*portainer.Endpoint
endpointsToUpdate []*portainer.Endpoint endpointsToUpdate []*portainer.Endpoint
endpointsToDelete []*portainer.Endpoint endpointsToDelete []*portainer.Endpoint
} }
fileEndpoint struct { type fileEndpoint struct {
Name string `json:"Name"` Name string `json:"Name"`
URL string `json:"URL"` URL string `json:"URL"`
TLS bool `json:"TLS,omitempty"` TLS bool `json:"TLS,omitempty"`
@ -38,19 +52,25 @@ type (
TLSCert string `json:"TLSCert,omitempty"` TLSCert string `json:"TLSCert,omitempty"`
TLSKey string `json:"TLSKey,omitempty"` TLSKey string `json:"TLSKey,omitempty"`
} }
)
// NewEndpointSyncTask creates a new EndpointSyncTask using the specified // GetScheduleID returns the schedule identifier associated to the runner
// context. func (runner *EndpointSyncJobRunner) GetScheduleID() portainer.ScheduleID {
func NewEndpointSyncTask(context *EndpointSyncTaskContext) EndpointSyncTask { return runner.job.ScheduleID
return EndpointSyncTask{
context: context,
} }
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *EndpointSyncJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *EndpointSyncJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
} }
// Run triggers the execution of the endpoint synchronization process. // Run triggers the execution of the endpoint synchronization process.
func (task EndpointSyncTask) Run() { func (runner *EndpointSyncJobRunner) Run() {
data, err := ioutil.ReadFile(task.context.EndpointFilePath) data, err := ioutil.ReadFile(runner.context.endpointFilePath)
if endpointSyncError(err) { if endpointSyncError(err) {
return return
} }
@ -62,11 +82,11 @@ func (task EndpointSyncTask) Run() {
} }
if len(fileEndpoints) == 0 { if len(fileEndpoints) == 0 {
log.Println("background task error (endpoint synchronization). External endpoint source is empty") log.Println("background job error (endpoint synchronization). External endpoint source is empty")
return return
} }
storedEndpoints, err := task.context.EndpointService.Endpoints() storedEndpoints, err := runner.context.endpointService.Endpoints()
if endpointSyncError(err) { if endpointSyncError(err) {
return return
} }
@ -75,7 +95,7 @@ func (task EndpointSyncTask) Run() {
sync := prepareSyncData(storedEndpoints, convertedFileEndpoints) sync := prepareSyncData(storedEndpoints, convertedFileEndpoints)
if sync.requireSync() { if sync.requireSync() {
err = task.context.EndpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete) err = runner.context.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
if endpointSyncError(err) { if endpointSyncError(err) {
return return
} }
@ -85,7 +105,7 @@ func (task EndpointSyncTask) Run() {
func endpointSyncError(err error) bool { func endpointSyncError(err error) bool {
if err != nil { if err != nil {
log.Printf("background task error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err) log.Printf("background job error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err)
return true return true
} }
return false return false

View file

@ -0,0 +1,76 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// ScriptExecutionJobRunner is used to run a ScriptExecutionJob
type ScriptExecutionJobRunner struct {
job *portainer.ScriptExecutionJob
context *ScriptExecutionJobContext
}
// ScriptExecutionJobContext represents the context of execution of a ScriptExecutionJob
type ScriptExecutionJobContext struct {
jobService portainer.JobService
endpointService portainer.EndpointService
fileService portainer.FileService
}
// NewScriptExecutionJobContext returns a new context that can be used to execute a ScriptExecutionJob
func NewScriptExecutionJobContext(jobService portainer.JobService, endpointService portainer.EndpointService, fileService portainer.FileService) *ScriptExecutionJobContext {
return &ScriptExecutionJobContext{
jobService: jobService,
endpointService: endpointService,
fileService: fileService,
}
}
// NewScriptExecutionJobRunner returns a new runner that can be scheduled
func NewScriptExecutionJobRunner(job *portainer.ScriptExecutionJob, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner {
return &ScriptExecutionJobRunner{
job: job,
context: context,
}
}
// Run triggers the execution of the job.
// It will iterate through all the endpoints specified in the context to
// execute the script associated to the job.
func (runner *ScriptExecutionJobRunner) Run() {
scriptFile, err := runner.context.fileService.GetFileContent(runner.job.ScriptPath)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve script file (err=%s)\n", err)
return
}
for _, endpointID := range runner.job.Endpoints {
endpoint, err := runner.context.endpointService.Endpoint(endpointID)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err)
return
}
err = runner.context.jobService.Execute(endpoint, "", runner.job.Image, scriptFile)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err)
}
}
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *ScriptExecutionJobRunner) GetJobType() portainer.JobType {
return portainer.ScriptExecutionJobType
}

84
api/cron/job_snapshot.go Normal file
View file

@ -0,0 +1,84 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// SnapshotJobRunner is used to run a SnapshotJob
type SnapshotJobRunner struct {
job *portainer.SnapshotJob
context *SnapshotJobContext
}
// SnapshotJobContext represents the context of execution of a SnapshotJob
type SnapshotJobContext struct {
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
}
// NewSnapshotJobContext returns a new context that can be used to execute a SnapshotJob
func NewSnapshotJobContext(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *SnapshotJobContext {
return &SnapshotJobContext{
endpointService: endpointService,
snapshotter: snapshotter,
}
}
// NewSnapshotJobRunner returns a new runner that can be scheduled
func NewSnapshotJobRunner(job *portainer.SnapshotJob, context *SnapshotJobContext) *SnapshotJobRunner {
return &SnapshotJobRunner{
job: job,
context: context,
}
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *SnapshotJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
}
// Run triggers the execution of the job.
// It will iterate through all the endpoints available in the database to
// create a snapshot of each one of them.
func (runner *SnapshotJobRunner) Run() {
endpoints, err := runner.context.endpointService.Endpoints()
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
return
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := runner.context.snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = runner.context.endpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
return
}
}
}

View file

@ -5,52 +5,49 @@ import (
"github.com/robfig/cron" "github.com/robfig/cron"
) )
// JobScheduler represents a service for managing crons. // JobScheduler represents a service for managing crons
type JobScheduler struct { type JobScheduler struct {
cron *cron.Cron cron *cron.Cron
} }
// NewJobScheduler initializes a new service. // NewJobScheduler initializes a new service
func NewJobScheduler() *JobScheduler { func NewJobScheduler() *JobScheduler {
return &JobScheduler{ return &JobScheduler{
cron: cron.New(), cron: cron.New(),
} }
} }
// UpdateScheduledTask updates a specific scheduled task by re-creating a new cron // CreateSchedule schedules the execution of a job via a runner
// and adding all the existing jobs. It will then re-schedule the new task func (scheduler *JobScheduler) CreateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
// based on the updatedTask parameter. runner.SetScheduleID(schedule.ID)
return scheduler.cron.AddJob(schedule.CronExpression, runner)
}
// UpdateSchedule updates a specific scheduled job by re-creating a new cron
// and adding all the existing jobs. It will then re-schedule the new job
// via the specified JobRunner parameter.
// NOTE: the cron library do not support updating schedules directly // NOTE: the cron library do not support updating schedules directly
// hence the work-around. // hence the work-around
func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.ScheduleID, cronExpression string, updatedTask portainer.Task) error { func (scheduler *JobScheduler) UpdateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
jobs := scheduler.cron.Entries() cronEntries := scheduler.cron.Entries()
newCron := cron.New() newCron := cron.New()
for _, job := range jobs { for _, entry := range cronEntries {
switch task := job.Job.(type) { if entry.Job.(portainer.JobRunner).GetScheduleID() == schedule.ID {
case ScriptTask:
if task.context.ScheduleID == scheduleID { var jobRunner cron.Job = runner
err := newCron.AddJob(cronExpression, updatedTask) if entry.Job.(portainer.JobRunner).GetJobType() == portainer.SnapshotJobType {
jobRunner = entry.Job
}
err := newCron.AddJob(schedule.CronExpression, jobRunner)
if err != nil { if err != nil {
return err return err
} }
continue
}
case SnapshotTask:
_, ok := updatedTask.(SnapshotTask)
if ok {
err := newCron.AddJob(cronExpression, job.Job)
if err != nil {
return err
} }
continue newCron.Schedule(entry.Schedule, entry.Job)
}
}
newCron.Schedule(job.Schedule, job.Job)
} }
scheduler.cron.Stop() scheduler.cron.Stop()
@ -59,25 +56,21 @@ func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.Schedule
return nil return nil
} }
// UnscheduleTask remove a schedule by re-creating a new cron // RemoveSchedule remove a scheduled job by re-creating a new cron
// and adding all the existing jobs except for the one specified via scheduleID. // and adding all the existing jobs except for the one specified via scheduleID.
// NOTE: the cron library do not support removing schedules directly // NOTE: the cron library do not support removing schedules directly
// hence the work-around. // hence the work-around
func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) { func (scheduler *JobScheduler) RemoveSchedule(scheduleID portainer.ScheduleID) {
jobs := scheduler.cron.Entries() cronEntries := scheduler.cron.Entries()
newCron := cron.New() newCron := cron.New()
for _, job := range jobs { for _, entry := range cronEntries {
switch task := job.Job.(type) { if entry.Job.(portainer.JobRunner).GetScheduleID() == scheduleID {
case ScriptTask:
if task.context.ScheduleID == scheduleID {
continue continue
} }
}
newCron.Schedule(job.Schedule, job.Job) newCron.Schedule(entry.Schedule, entry.Job)
} }
scheduler.cron.Stop() scheduler.cron.Stop()
@ -85,11 +78,6 @@ func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) {
scheduler.cron.Start() scheduler.cron.Start()
} }
// ScheduleTask adds a new task to be scheduled in the cron.
func (scheduler *JobScheduler) ScheduleTask(cronExpression string, task portainer.Task) error {
return scheduler.cron.AddJob(cronExpression, task)
}
// Start starts the scheduled jobs // Start starts the scheduled jobs
func (scheduler *JobScheduler) Start() { func (scheduler *JobScheduler) Start() {
if len(scheduler.cron.Entries()) > 0 { if len(scheduler.cron.Entries()) > 0 {

View file

@ -1,63 +0,0 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// ScriptTaskContext represents the context required for the execution
// of a ScriptTask.
type ScriptTaskContext struct {
JobService portainer.JobService
EndpointService portainer.EndpointService
FileService portainer.FileService
ScheduleID portainer.ScheduleID
TargetEndpoints []portainer.EndpointID
}
// ScriptTask represents a task used to execute a script inside a privileged
// container. It can be scheduled.
type ScriptTask struct {
Image string
ScriptPath string
context *ScriptTaskContext
}
// NewScriptTask creates a new ScriptTask using the specified context.
func NewScriptTask(image, scriptPath string, context *ScriptTaskContext) ScriptTask {
return ScriptTask{
Image: image,
ScriptPath: scriptPath,
context: context,
}
}
// SetContext can be used to set/override the task context
func (task ScriptTask) SetContext(context *ScriptTaskContext) {
task.context = context
}
// Run triggers the execution of the task.
// It will iterate through all the endpoints specified in the context to
// execute the script associated to the task.
func (task ScriptTask) Run() {
scriptFile, err := task.context.FileService.GetFileContent(task.ScriptPath)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to retrieve script file (err=%s)\n", err)
return
}
for _, endpointID := range task.context.TargetEndpoints {
endpoint, err := task.context.EndpointService.Endpoint(endpointID)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err)
return
}
err = task.context.JobService.Execute(endpoint, "", task.Image, scriptFile)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err)
}
}
}

View file

@ -1,61 +0,0 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// SnapshotTaskContext represents the context required for the execution
// of a SnapshotTask.
type SnapshotTaskContext struct {
EndpointService portainer.EndpointService
Snapshotter portainer.Snapshotter
}
// SnapshotTask represents a task used to create endpoint snapshots.
// It can be scheduled.
type SnapshotTask struct {
context *SnapshotTaskContext
}
// NewSnapshotTask creates a new ScriptTask using the specified context.
func NewSnapshotTask(context *SnapshotTaskContext) SnapshotTask {
return SnapshotTask{
context: context,
}
}
// Run triggers the execution of the task.
// It will iterate through all the endpoints available in the database to
// create a snapshot of each one of them.
func (task SnapshotTask) Run() {
endpoints, err := task.context.EndpointService.Endpoints()
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
return
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := task.context.Snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = task.context.EndpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
return
}
}
}

View file

@ -6,7 +6,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
httperror "github.com/portainer/libhttp/error" httperror "github.com/portainer/libhttp/error"
"github.com/portainer/portainer" "github.com/portainer/portainer"
"github.com/portainer/portainer/cron"
"github.com/portainer/portainer/http/security" "github.com/portainer/portainer/http/security"
) )
@ -39,13 +38,3 @@ func NewHandler(bouncer *security.RequestBouncer) *Handler {
return h return h
} }
func (handler *Handler) createTaskExecutionContext(scheduleID portainer.ScheduleID, endpoints []portainer.EndpointID) *cron.ScriptTaskContext {
return &cron.ScriptTaskContext{
JobService: handler.JobService,
EndpointService: handler.EndpointService,
FileService: handler.FileService,
ScheduleID: scheduleID,
TargetEndpoints: endpoints,
}
}

View file

@ -142,20 +142,27 @@ func (handler *Handler) createSchedule(name, image, cronExpression string, endpo
return nil, err return nil, err
} }
taskContext := handler.createTaskExecutionContext(scheduleIdentifier, endpoints) job := &portainer.ScriptExecutionJob{
task := cron.NewScriptTask(image, scriptPath, taskContext) Endpoints: endpoints,
Image: image,
err = handler.JobScheduler.ScheduleTask(cronExpression, task) ScriptPath: scriptPath,
if err != nil { ScheduleID: scheduleIdentifier,
return nil, err
} }
schedule := &portainer.Schedule{ schedule := &portainer.Schedule{
ID: scheduleIdentifier, ID: scheduleIdentifier,
Name: name, Name: name,
Endpoints: endpoints,
CronExpression: cronExpression, CronExpression: cronExpression,
Task: task, JobType: portainer.ScriptExecutionJobType,
ScriptExecutionJob: job,
}
jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService)
jobRunner := cron.NewScriptExecutionJobRunner(job, jobContext)
err = handler.JobScheduler.CreateSchedule(schedule, jobRunner)
if err != nil {
return nil, err
} }
err = handler.ScheduleService.CreateSchedule(schedule) err = handler.ScheduleService.CreateSchedule(schedule)

View file

@ -1,6 +1,7 @@
package schedules package schedules
import ( import (
"errors"
"net/http" "net/http"
httperror "github.com/portainer/libhttp/error" httperror "github.com/portainer/libhttp/error"
@ -15,7 +16,16 @@ func (handler *Handler) scheduleDelete(w http.ResponseWriter, r *http.Request) *
return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err} return &httperror.HandlerError{http.StatusBadRequest, "Invalid schedule identifier route variable", err}
} }
handler.JobScheduler.UnscheduleTask(portainer.ScheduleID(scheduleID)) schedule, err := handler.ScheduleService.Schedule(portainer.ScheduleID(scheduleID))
if err == portainer.ErrObjectNotFound {
return &httperror.HandlerError{http.StatusNotFound, "Unable to find a schedule with the specified identifier inside the database", err}
} else if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err}
}
if schedule.JobType == portainer.SnapshotJobType || schedule.JobType == portainer.EndpointSyncJobType {
return &httperror.HandlerError{http.StatusBadRequest, "Cannot remove system schedules", errors.New("Cannot remove system schedule")}
}
scheduleFolder := handler.FileService.GetScheduleFolder(portainer.ScheduleID(scheduleID)) scheduleFolder := handler.FileService.GetScheduleFolder(portainer.ScheduleID(scheduleID))
err = handler.FileService.RemoveDirectory(scheduleFolder) err = handler.FileService.RemoveDirectory(scheduleFolder)

View file

@ -40,14 +40,14 @@ func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) *
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err} return &httperror.HandlerError{http.StatusInternalServerError, "Unable to find a schedule with the specified identifier inside the database", err}
} }
updateTaskSchedule := updateSchedule(schedule, &payload) updateJobSchedule := updateSchedule(schedule, &payload)
if updateTaskSchedule { if updateJobSchedule {
taskContext := handler.createTaskExecutionContext(schedule.ID, schedule.Endpoints)
schedule.Task.(cron.ScriptTask).SetContext(taskContext)
err := handler.JobScheduler.UpdateScheduledTask(schedule.ID, schedule.CronExpression, schedule.Task) jobContext := cron.NewScriptExecutionJobContext(handler.JobService, handler.EndpointService, handler.FileService)
jobRunner := cron.NewScriptExecutionJobRunner(schedule.ScriptExecutionJob, jobContext)
err := handler.JobScheduler.UpdateSchedule(schedule, jobRunner)
if err != nil { if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update job scheduler", err}
} }
} }
@ -60,28 +60,26 @@ func (handler *Handler) scheduleUpdate(w http.ResponseWriter, r *http.Request) *
} }
func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool { func updateSchedule(schedule *portainer.Schedule, payload *scheduleUpdatePayload) bool {
updateTaskSchedule := false updateJobSchedule := false
if payload.Name != nil { if payload.Name != nil {
schedule.Name = *payload.Name schedule.Name = *payload.Name
} }
if payload.Endpoints != nil { if payload.Endpoints != nil {
schedule.Endpoints = payload.Endpoints schedule.ScriptExecutionJob.Endpoints = payload.Endpoints
updateTaskSchedule = true updateJobSchedule = true
} }
if payload.CronExpression != nil { if payload.CronExpression != nil {
schedule.CronExpression = *payload.CronExpression schedule.CronExpression = *payload.CronExpression
updateTaskSchedule = true updateJobSchedule = true
} }
if payload.Image != nil { if payload.Image != nil {
t := schedule.Task.(cron.ScriptTask) schedule.ScriptExecutionJob.Image = *payload.Image
t.Image = *payload.Image updateJobSchedule = true
updateTaskSchedule = true
} }
return updateTaskSchedule return updateJobSchedule
} }

View file

@ -16,6 +16,7 @@ type Handler struct {
LDAPService portainer.LDAPService LDAPService portainer.LDAPService
FileService portainer.FileService FileService portainer.FileService
JobScheduler portainer.JobScheduler JobScheduler portainer.JobScheduler
ScheduleService portainer.ScheduleService
} }
// NewHandler creates a handler to manage settings operations. // NewHandler creates a handler to manage settings operations.

View file

@ -8,7 +8,6 @@ import (
"github.com/portainer/libhttp/request" "github.com/portainer/libhttp/request"
"github.com/portainer/libhttp/response" "github.com/portainer/libhttp/response"
"github.com/portainer/portainer" "github.com/portainer/portainer"
"github.com/portainer/portainer/cron"
"github.com/portainer/portainer/filesystem" "github.com/portainer/portainer/filesystem"
) )
@ -78,11 +77,9 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) *
} }
if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval { if payload.SnapshotInterval != nil && *payload.SnapshotInterval != settings.SnapshotInterval {
settings.SnapshotInterval = *payload.SnapshotInterval err := handler.updateSnapshotInterval(settings, *payload.SnapshotInterval)
err := handler.JobScheduler.UpdateScheduledTask(0, "@every "+*payload.SnapshotInterval, cron.NewSnapshotTask(nil))
if err != nil { if err != nil {
return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update task scheduler", err} return &httperror.HandlerError{http.StatusInternalServerError, "Unable to update snapshot interval", err}
} }
} }
@ -99,6 +96,27 @@ func (handler *Handler) settingsUpdate(w http.ResponseWriter, r *http.Request) *
return response.JSON(w, settings) return response.JSON(w, settings)
} }
func (handler *Handler) updateSnapshotInterval(settings *portainer.Settings, snapshotInterval string) error {
settings.SnapshotInterval = snapshotInterval
schedules, err := handler.ScheduleService.SchedulesByJobType(portainer.SnapshotJobType)
if err != nil {
return err
}
if len(schedules) != 0 {
snapshotSchedule := schedules[0]
snapshotSchedule.CronExpression = "@every " + snapshotInterval
err := handler.JobScheduler.UpdateSchedule(&snapshotSchedule, nil)
if err != nil {
return err
}
}
return nil
}
func (handler *Handler) updateTLS(settings *portainer.Settings) *httperror.HandlerError { func (handler *Handler) updateTLS(settings *portainer.Settings) *httperror.HandlerError {
if (settings.LDAPSettings.TLSConfig.TLS || settings.LDAPSettings.StartTLS) && !settings.LDAPSettings.TLSConfig.TLSSkipVerify { if (settings.LDAPSettings.TLSConfig.TLS || settings.LDAPSettings.StartTLS) && !settings.LDAPSettings.TLSConfig.TLSSkipVerify {
caCertPath, _ := handler.FileService.GetPathForTLSFile(filesystem.LDAPStorePath, portainer.TLSFileCA) caCertPath, _ := handler.FileService.GetPathForTLSFile(filesystem.LDAPStorePath, portainer.TLSFileCA)

View file

@ -146,6 +146,7 @@ func (server *Server) Start() error {
settingsHandler.LDAPService = server.LDAPService settingsHandler.LDAPService = server.LDAPService
settingsHandler.FileService = server.FileService settingsHandler.FileService = server.FileService
settingsHandler.JobScheduler = server.JobScheduler settingsHandler.JobScheduler = server.JobScheduler
settingsHandler.ScheduleService = server.ScheduleService
var stackHandler = stacks.NewHandler(requestBouncer) var stackHandler = stacks.NewHandler(requestBouncer)
stackHandler.FileService = server.FileService stackHandler.FileService = server.FileService

View file

@ -223,13 +223,38 @@ type (
// ScheduleID represents a schedule identifier. // ScheduleID represents a schedule identifier.
ScheduleID int ScheduleID int
// Schedule represents a task that is scheduled on one or multiple endpoints. // JobType represents a job type
JobType int
// ScriptExecutionJob represents a scheduled job that can execute a script via a privileged container
ScriptExecutionJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
Endpoints []EndpointID
Image string
ScriptPath string
}
// SnapshotJob represents a scheduled job that can create endpoint snapshots
SnapshotJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
}
// EndpointSyncJob represents a scheduled job that synchronize endpoints based on an external file
EndpointSyncJob struct {
ScheduleID ScheduleID `json:"ScheduleId"`
}
// Schedule represents a scheduled job.
// It only contains a pointer to one of the JobRunner implementations
// based on the JobType
Schedule struct { Schedule struct {
ID ScheduleID `json:"Id"` ID ScheduleID `json:"Id"`
Name string `json:"Name"` Name string
Endpoints []EndpointID `json:"Endpoints"` CronExpression string
CronExpression string `json:"Schedule"` JobType JobType
Task Task `json:"Task"` ScriptExecutionJob *ScriptExecutionJob
SnapshotJob *SnapshotJob
EndpointSyncJob *EndpointSyncJob
} }
// WebhookID represents a webhook identifier. // WebhookID represents a webhook identifier.
@ -568,6 +593,7 @@ type (
ScheduleService interface { ScheduleService interface {
Schedule(ID ScheduleID) (*Schedule, error) Schedule(ID ScheduleID) (*Schedule, error)
Schedules() ([]Schedule, error) Schedules() ([]Schedule, error)
SchedulesByJobType(jobType JobType) ([]Schedule, error)
CreateSchedule(schedule *Schedule) error CreateSchedule(schedule *Schedule) error
UpdateSchedule(ID ScheduleID, schedule *Schedule) error UpdateSchedule(ID ScheduleID, schedule *Schedule) error
DeleteSchedule(ID ScheduleID) error DeleteSchedule(ID ScheduleID) error
@ -639,15 +665,18 @@ type (
// JobScheduler represents a service to run jobs on a periodic basis // JobScheduler represents a service to run jobs on a periodic basis
JobScheduler interface { JobScheduler interface {
ScheduleTask(cronExpression string, task Task) error CreateSchedule(schedule *Schedule, runner JobRunner) error
UpdateScheduledTask(ID ScheduleID, cronExpression string, updatedTask Task) error UpdateSchedule(schedule *Schedule, runner JobRunner) error
UnscheduleTask(ID ScheduleID) RemoveSchedule(ID ScheduleID)
Start() Start()
} }
// Task represents a process that can be scheduled // JobRunner represents a service that can be used to run a job
Task interface { JobRunner interface {
Run() Run()
GetScheduleID() ScheduleID
SetScheduleID(ID ScheduleID)
GetJobType() JobType
} }
// Snapshotter represents a service used to create endpoint snapshots // Snapshotter represents a service used to create endpoint snapshots
@ -808,3 +837,15 @@ const (
// ServiceWebhook is a webhook for restarting a docker service // ServiceWebhook is a webhook for restarting a docker service
ServiceWebhook ServiceWebhook
) )
const (
_ JobType = iota
// ScriptExecutionJobType is a non-system job used to execute a script against a list of
// endpoints via privileged containers
ScriptExecutionJobType
// SnapshotJobType is a system job used to create endpoint snapshots
SnapshotJobType
// EndpointSyncJobType is a system job used to synchronize endpoints from
// an external definition store
EndpointSyncJobType
)