1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-08-05 05:45:22 +02:00

feat(schedules): add the ability to list tasks from snapshots (#2458)

* feat(schedules): add the ability to list tasks from snapshots

* feat(schedules): update schedules

* refactor(schedules): fix linting issue
This commit is contained in:
Anthony Lapenna 2018-11-13 14:39:26 +13:00 committed by GitHub
parent a2d9f591a7
commit 64c29f7402
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 440 additions and 149 deletions

View file

@ -11,8 +11,8 @@ import (
// EndpointSyncJobRunner is used to run a EndpointSyncJob
type EndpointSyncJobRunner struct {
job *portainer.EndpointSyncJob
context *EndpointSyncJobContext
schedule *portainer.Schedule
context *EndpointSyncJobContext
}
// EndpointSyncJobContext represents the context of execution of a EndpointSyncJob
@ -30,10 +30,10 @@ func NewEndpointSyncJobContext(endpointService portainer.EndpointService, endpoi
}
// NewEndpointSyncJobRunner returns a new runner that can be scheduled
func NewEndpointSyncJobRunner(job *portainer.EndpointSyncJob, context *EndpointSyncJobContext) *EndpointSyncJobRunner {
func NewEndpointSyncJobRunner(schedule *portainer.Schedule, context *EndpointSyncJobContext) *EndpointSyncJobRunner {
return &EndpointSyncJobRunner{
job: job,
context: context,
schedule: schedule,
context: context,
}
}
@ -53,19 +53,9 @@ type fileEndpoint struct {
TLSKey string `json:"TLSKey,omitempty"`
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *EndpointSyncJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *EndpointSyncJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *EndpointSyncJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
// GetSchedule returns the schedule associated to the runner
func (runner *EndpointSyncJobRunner) GetSchedule() *portainer.Schedule {
return runner.schedule
}
// Run triggers the execution of the endpoint synchronization process.

View file

@ -9,8 +9,8 @@ import (
// ScriptExecutionJobRunner is used to run a ScriptExecutionJob
type ScriptExecutionJobRunner struct {
job *portainer.ScriptExecutionJob
context *ScriptExecutionJobContext
schedule *portainer.Schedule
context *ScriptExecutionJobContext
}
// ScriptExecutionJobContext represents the context of execution of a ScriptExecutionJob
@ -30,10 +30,10 @@ func NewScriptExecutionJobContext(jobService portainer.JobService, endpointServi
}
// NewScriptExecutionJobRunner returns a new runner that can be scheduled
func NewScriptExecutionJobRunner(job *portainer.ScriptExecutionJob, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner {
func NewScriptExecutionJobRunner(schedule *portainer.Schedule, context *ScriptExecutionJobContext) *ScriptExecutionJobRunner {
return &ScriptExecutionJobRunner{
job: job,
context: context,
schedule: schedule,
context: context,
}
}
@ -41,14 +41,14 @@ func NewScriptExecutionJobRunner(job *portainer.ScriptExecutionJob, context *Scr
// It will iterate through all the endpoints specified in the context to
// execute the script associated to the job.
func (runner *ScriptExecutionJobRunner) Run() {
scriptFile, err := runner.context.fileService.GetFileContent(runner.job.ScriptPath)
scriptFile, err := runner.context.fileService.GetFileContent(runner.schedule.ScriptExecutionJob.ScriptPath)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve script file (err=%s)\n", err)
return
}
targets := make([]*portainer.Endpoint, 0)
for _, endpointID := range runner.job.Endpoints {
for _, endpointID := range runner.schedule.ScriptExecutionJob.Endpoints {
endpoint, err := runner.context.endpointService.Endpoint(endpointID)
if err != nil {
log.Printf("scheduled job error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err)
@ -65,7 +65,7 @@ func (runner *ScriptExecutionJobRunner) executeAndRetry(endpoints []*portainer.E
retryTargets := make([]*portainer.Endpoint, 0)
for _, endpoint := range endpoints {
err := runner.context.jobService.Execute(endpoint, "", runner.job.Image, script)
err := runner.context.jobService.ExecuteScript(endpoint, "", runner.schedule.ScriptExecutionJob.Image, script, runner.schedule)
if err == portainer.ErrUnableToPingEndpoint {
retryTargets = append(retryTargets, endpoint)
} else if err != nil {
@ -74,26 +74,16 @@ func (runner *ScriptExecutionJobRunner) executeAndRetry(endpoints []*portainer.E
}
retryCount++
if retryCount >= runner.job.RetryCount {
if retryCount >= runner.schedule.ScriptExecutionJob.RetryCount {
return
}
time.Sleep(time.Duration(runner.job.RetryInterval) * time.Second)
time.Sleep(time.Duration(runner.schedule.ScriptExecutionJob.RetryInterval) * time.Second)
runner.executeAndRetry(retryTargets, script, retryCount)
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *ScriptExecutionJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *ScriptExecutionJobRunner) GetJobType() portainer.JobType {
return portainer.ScriptExecutionJobType
// GetSchedule returns the schedule associated to the runner
func (runner *ScriptExecutionJobRunner) GetSchedule() *portainer.Schedule {
return runner.schedule
}

View file

@ -8,8 +8,8 @@ import (
// SnapshotJobRunner is used to run a SnapshotJob
type SnapshotJobRunner struct {
job *portainer.SnapshotJob
context *SnapshotJobContext
schedule *portainer.Schedule
context *SnapshotJobContext
}
// SnapshotJobContext represents the context of execution of a SnapshotJob
@ -27,35 +27,25 @@ func NewSnapshotJobContext(endpointService portainer.EndpointService, snapshotte
}
// NewSnapshotJobRunner returns a new runner that can be scheduled
func NewSnapshotJobRunner(job *portainer.SnapshotJob, context *SnapshotJobContext) *SnapshotJobRunner {
func NewSnapshotJobRunner(schedule *portainer.Schedule, context *SnapshotJobContext) *SnapshotJobRunner {
return &SnapshotJobRunner{
job: job,
context: context,
schedule: schedule,
context: context,
}
}
// GetScheduleID returns the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) GetScheduleID() portainer.ScheduleID {
return runner.job.ScheduleID
// GetSchedule returns the schedule associated to the runner
func (runner *SnapshotJobRunner) GetSchedule() *portainer.Schedule {
return runner.schedule
}
// SetScheduleID sets the schedule identifier associated to the runner
func (runner *SnapshotJobRunner) SetScheduleID(ID portainer.ScheduleID) {
runner.job.ScheduleID = ID
}
// GetJobType returns the job type associated to the runner
func (runner *SnapshotJobRunner) GetJobType() portainer.JobType {
return portainer.EndpointSyncJobType
}
// Run triggers the execution of the job.
// Run triggers the execution of the schedule.
// It will iterate through all the endpoints available in the database to
// create a snapshot of each one of them.
func (runner *SnapshotJobRunner) Run() {
endpoints, err := runner.context.endpointService.Endpoints()
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
log.Printf("background schedule error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
return
}
@ -67,7 +57,7 @@ func (runner *SnapshotJobRunner) Run() {
snapshot, err := runner.context.snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
log.Printf("background schedule error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
@ -77,7 +67,7 @@ func (runner *SnapshotJobRunner) Run() {
err = runner.context.endpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
log.Printf("background job error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
log.Printf("background schedule error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
return
}
}

View file

@ -17,31 +17,25 @@ func NewJobScheduler() *JobScheduler {
}
}
// CreateSchedule schedules the execution of a job via a runner
func (scheduler *JobScheduler) CreateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
runner.SetScheduleID(schedule.ID)
return scheduler.cron.AddJob(schedule.CronExpression, runner)
// ScheduleJob schedules the execution of a job via a runner
func (scheduler *JobScheduler) ScheduleJob(runner portainer.JobRunner) error {
return scheduler.cron.AddJob(runner.GetSchedule().CronExpression, runner)
}
// UpdateSchedule updates a specific scheduled job by re-creating a new cron
// UpdateSystemJobSchedule updates the first occurence of the specified
// scheduled job based on the specified job type.
// It does so by re-creating a new cron
// and adding all the existing jobs. It will then re-schedule the new job
// via the specified JobRunner parameter.
// with the update cron expression passed in parameter.
// NOTE: the cron library do not support updating schedules directly
// hence the work-around
func (scheduler *JobScheduler) UpdateSchedule(schedule *portainer.Schedule, runner portainer.JobRunner) error {
func (scheduler *JobScheduler) UpdateSystemJobSchedule(jobType portainer.JobType, newCronExpression string) error {
cronEntries := scheduler.cron.Entries()
newCron := cron.New()
for _, entry := range cronEntries {
if entry.Job.(portainer.JobRunner).GetScheduleID() == schedule.ID {
var jobRunner cron.Job = runner
if entry.Job.(portainer.JobRunner).GetJobType() == portainer.SnapshotJobType {
jobRunner = entry.Job
}
err := newCron.AddJob(schedule.CronExpression, jobRunner)
if entry.Job.(portainer.JobRunner).GetSchedule().JobType == jobType {
err := newCron.AddJob(newCronExpression, entry.Job)
if err != nil {
return err
}
@ -56,17 +50,50 @@ func (scheduler *JobScheduler) UpdateSchedule(schedule *portainer.Schedule, runn
return nil
}
// RemoveSchedule remove a scheduled job by re-creating a new cron
// and adding all the existing jobs except for the one specified via scheduleID.
// NOTE: the cron library do not support removing schedules directly
// UpdateJobSchedule updates a specific scheduled job by re-creating a new cron
// and adding all the existing jobs. It will then re-schedule the new job
// via the specified JobRunner parameter.
// NOTE: the cron library do not support updating schedules directly
// hence the work-around
func (scheduler *JobScheduler) RemoveSchedule(scheduleID portainer.ScheduleID) {
func (scheduler *JobScheduler) UpdateJobSchedule(runner portainer.JobRunner) error {
cronEntries := scheduler.cron.Entries()
newCron := cron.New()
for _, entry := range cronEntries {
if entry.Job.(portainer.JobRunner).GetScheduleID() == scheduleID {
if entry.Job.(portainer.JobRunner).GetSchedule().ID == runner.GetSchedule().ID {
var jobRunner cron.Job = runner
if entry.Job.(portainer.JobRunner).GetSchedule().JobType == portainer.SnapshotJobType {
jobRunner = entry.Job
}
err := newCron.AddJob(runner.GetSchedule().CronExpression, jobRunner)
if err != nil {
return err
}
}
newCron.Schedule(entry.Schedule, entry.Job)
}
scheduler.cron.Stop()
scheduler.cron = newCron
scheduler.cron.Start()
return nil
}
// UnscheduleJob remove a scheduled job by re-creating a new cron
// and adding all the existing jobs except for the one specified via scheduleID.
// NOTE: the cron library do not support removing schedules directly
// hence the work-around
func (scheduler *JobScheduler) UnscheduleJob(scheduleID portainer.ScheduleID) {
cronEntries := scheduler.cron.Entries()
newCron := cron.New()
for _, entry := range cronEntries {
if entry.Job.(portainer.JobRunner).GetSchedule().ID == scheduleID {
continue
}