1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-07-25 08:19:40 +02:00

feat(schedules): add the schedule API

* feat(jobs): add job service interface

* feat(jobs): create job execution api

* style(jobs): remove comment

* feat(jobs): add bindings

* feat(jobs): validate payload different cases

* refactor(jobs): rename endpointJob method

* refactor(jobs): return original error

* feat(jobs): pull image before creating container

* feat(jobs): run jobs with sh

* style(jobs): remove comment

* refactor(jobs): change error names

* feat(jobs): sync pull image

* fix(jobs): close image reader after error check

* style(jobs): remove comment and add docs

* refactor(jobs): inline script command

* fix(jobs): handle pul image error

* refactor(jobs): handle image pull output

* fix(docker): set http client timeout to 100s

* feat(api): create schedule type

* feat(agent): add basic schedule api

* feat(schedules): add schedule service in bolt

* feat(schedule): add schedule service to handler

* feat(schedule): add and list schedules from db

* feat(agent): get schedule from db

* feat(schedule): update schedule in db

* feat(agent): delete schedule

* fix(bolt): remove sync method from scheduleService

* feat(schedules): save/delete script in fs

* feat(schedules): schedules cron service implementation

* feat(schedule): integrate handler with cron

* feat(schedules): schedules API overhaul

* refactor(project): remove .idea folder

* fix(schedules): fix script task execute call

* refactor(schedules): refactor/fix golint issues

* refactor(schedules): update SnapshotTask documentation

* refactor(schedules): validate image name in ScheduleCreate operation
This commit is contained in:
Chaim Lev-Ari 2018-11-05 22:58:15 +02:00 committed by Anthony Lapenna
parent e94d6ad6b2
commit dbbea0a20f
20 changed files with 873 additions and 174 deletions

View file

@ -1,60 +0,0 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
type (
endpointSnapshotJob struct {
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
}
)
func newEndpointSnapshotJob(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) endpointSnapshotJob {
return endpointSnapshotJob{
endpointService: endpointService,
snapshotter: snapshotter,
}
}
func (job endpointSnapshotJob) Snapshot() error {
endpoints, err := job.endpointService.Endpoints()
if err != nil {
return err
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := job.snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("cron error: endpoint snapshot error (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = job.endpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
return err
}
}
return nil
}
func (job endpointSnapshotJob) Run() {
err := job.Snapshot()
if err != nil {
log.Printf("cron error: snapshot job error (err=%s)\n", err)
}
}

View file

@ -1,77 +1,93 @@
package cron
import (
"log"
"github.com/portainer/portainer"
"github.com/robfig/cron"
)
// JobScheduler represents a service for managing crons.
type JobScheduler struct {
cron *cron.Cron
endpointService portainer.EndpointService
snapshotter portainer.Snapshotter
endpointFilePath string
endpointSyncInterval string
cron *cron.Cron
}
// NewJobScheduler initializes a new service.
func NewJobScheduler(endpointService portainer.EndpointService, snapshotter portainer.Snapshotter) *JobScheduler {
func NewJobScheduler() *JobScheduler {
return &JobScheduler{
cron: cron.New(),
endpointService: endpointService,
snapshotter: snapshotter,
cron: cron.New(),
}
}
// ScheduleEndpointSyncJob schedules a cron job to synchronize the endpoints from a file
func (scheduler *JobScheduler) ScheduleEndpointSyncJob(endpointFilePath string, interval string) error {
scheduler.endpointFilePath = endpointFilePath
scheduler.endpointSyncInterval = interval
job := newEndpointSyncJob(endpointFilePath, scheduler.endpointService)
err := job.Sync()
if err != nil {
return err
}
return scheduler.cron.AddJob("@every "+interval, job)
}
// ScheduleSnapshotJob schedules a cron job to create endpoint snapshots
func (scheduler *JobScheduler) ScheduleSnapshotJob(interval string) error {
job := newEndpointSnapshotJob(scheduler.endpointService, scheduler.snapshotter)
go job.Snapshot()
return scheduler.cron.AddJob("@every "+interval, job)
}
// UpdateSnapshotJob will update the schedules to match the new snapshot interval
func (scheduler *JobScheduler) UpdateSnapshotJob(interval string) {
// TODO: the cron library do not support removing/updating schedules.
// As a work-around we need to re-create the cron and reschedule the jobs.
// We should update the library.
// UpdateScheduledTask updates a specific scheduled task by re-creating a new cron
// and adding all the existing jobs. It will then re-schedule the new task
// based on the updatedTask parameter.
// NOTE: the cron library do not support updating schedules directly
// hence the work-around.
func (scheduler *JobScheduler) UpdateScheduledTask(scheduleID portainer.ScheduleID, cronExpression string, updatedTask portainer.Task) error {
jobs := scheduler.cron.Entries()
scheduler.cron.Stop()
scheduler.cron = cron.New()
newCron := cron.New()
for _, job := range jobs {
switch job.Job.(type) {
case endpointSnapshotJob:
scheduler.ScheduleSnapshotJob(interval)
case endpointSyncJob:
scheduler.ScheduleEndpointSyncJob(scheduler.endpointFilePath, scheduler.endpointSyncInterval)
default:
log.Println("Unsupported job")
switch task := job.Job.(type) {
case ScriptTask:
if task.context.ScheduleID == scheduleID {
err := newCron.AddJob(cronExpression, updatedTask)
if err != nil {
return err
}
continue
}
case SnapshotTask:
_, ok := updatedTask.(SnapshotTask)
if ok {
err := newCron.AddJob(cronExpression, job.Job)
if err != nil {
return err
}
continue
}
}
newCron.Schedule(job.Schedule, job.Job)
}
scheduler.cron.Stop()
scheduler.cron = newCron
scheduler.cron.Start()
return nil
}
// UnscheduleTask remove a schedule by re-creating a new cron
// and adding all the existing jobs except for the one specified via scheduleID.
// NOTE: the cron library do not support removing schedules directly
// hence the work-around.
func (scheduler *JobScheduler) UnscheduleTask(scheduleID portainer.ScheduleID) {
jobs := scheduler.cron.Entries()
newCron := cron.New()
for _, job := range jobs {
switch task := job.Job.(type) {
case ScriptTask:
if task.context.ScheduleID == scheduleID {
continue
}
}
newCron.Schedule(job.Schedule, job.Job)
}
scheduler.cron.Stop()
scheduler.cron = newCron
scheduler.cron.Start()
}
// ScheduleTask adds a new task to be scheduled in the cron.
func (scheduler *JobScheduler) ScheduleTask(cronExpression string, task portainer.Task) error {
return scheduler.cron.AddJob(cronExpression, task)
}
// Start starts the scheduled jobs

View file

@ -10,9 +10,17 @@ import (
)
type (
endpointSyncJob struct {
endpointService portainer.EndpointService
endpointFilePath string
// EndpointSyncTask represents a task used to synchronize endpoints
// based on an external file. It can be scheduled.
EndpointSyncTask struct {
context *EndpointSyncTaskContext
}
// EndpointSyncTaskContext represents the context required for the execution
// of an EndpointSyncTask.
EndpointSyncTaskContext struct {
EndpointService portainer.EndpointService
EndpointFilePath string
}
synchronization struct {
@ -32,21 +40,52 @@ type (
}
)
const (
// ErrEmptyEndpointArray is an error raised when the external endpoint source array is empty.
ErrEmptyEndpointArray = portainer.Error("External endpoint source is empty")
)
// NewEndpointSyncTask creates a new EndpointSyncTask using the specified
// context.
func NewEndpointSyncTask(context *EndpointSyncTaskContext) EndpointSyncTask {
return EndpointSyncTask{
context: context,
}
}
func newEndpointSyncJob(endpointFilePath string, endpointService portainer.EndpointService) endpointSyncJob {
return endpointSyncJob{
endpointService: endpointService,
endpointFilePath: endpointFilePath,
// Run triggers the execution of the endpoint synchronization process.
func (task EndpointSyncTask) Run() {
data, err := ioutil.ReadFile(task.context.EndpointFilePath)
if endpointSyncError(err) {
return
}
var fileEndpoints []fileEndpoint
err = json.Unmarshal(data, &fileEndpoints)
if endpointSyncError(err) {
return
}
if len(fileEndpoints) == 0 {
log.Println("background task error (endpoint synchronization). External endpoint source is empty")
return
}
storedEndpoints, err := task.context.EndpointService.Endpoints()
if endpointSyncError(err) {
return
}
convertedFileEndpoints := convertFileEndpoints(fileEndpoints)
sync := prepareSyncData(storedEndpoints, convertedFileEndpoints)
if sync.requireSync() {
err = task.context.EndpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
if endpointSyncError(err) {
return
}
log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete))
}
}
func endpointSyncError(err error) bool {
if err != nil {
log.Printf("cron error: synchronization job error (err=%s)\n", err)
log.Printf("background task error (endpoint synchronization). Unable to synchronize endpoints (err=%s)\n", err)
return true
}
return false
@ -126,8 +165,7 @@ func (sync synchronization) requireSync() bool {
return false
}
// TMP: endpointSyncJob method to access logger, should be generic
func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization {
func prepareSyncData(storedEndpoints, fileEndpoints []portainer.Endpoint) *synchronization {
endpointsToCreate := make([]*portainer.Endpoint, 0)
endpointsToUpdate := make([]*portainer.Endpoint, 0)
endpointsToDelete := make([]*portainer.Endpoint, 0)
@ -164,43 +202,3 @@ func (job endpointSyncJob) prepareSyncData(storedEndpoints, fileEndpoints []port
endpointsToDelete: endpointsToDelete,
}
}
func (job endpointSyncJob) Sync() error {
data, err := ioutil.ReadFile(job.endpointFilePath)
if endpointSyncError(err) {
return err
}
var fileEndpoints []fileEndpoint
err = json.Unmarshal(data, &fileEndpoints)
if endpointSyncError(err) {
return err
}
if len(fileEndpoints) == 0 {
return ErrEmptyEndpointArray
}
storedEndpoints, err := job.endpointService.Endpoints()
if endpointSyncError(err) {
return err
}
convertedFileEndpoints := convertFileEndpoints(fileEndpoints)
sync := job.prepareSyncData(storedEndpoints, convertedFileEndpoints)
if sync.requireSync() {
err = job.endpointService.Synchronize(sync.endpointsToCreate, sync.endpointsToUpdate, sync.endpointsToDelete)
if endpointSyncError(err) {
return err
}
log.Printf("Endpoint synchronization ended. [created: %v] [updated: %v] [deleted: %v]", len(sync.endpointsToCreate), len(sync.endpointsToUpdate), len(sync.endpointsToDelete))
}
return nil
}
func (job endpointSyncJob) Run() {
log.Println("cron: synchronization job started")
err := job.Sync()
endpointSyncError(err)
}

63
api/cron/task_script.go Normal file
View file

@ -0,0 +1,63 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// ScriptTaskContext represents the context required for the execution
// of a ScriptTask.
type ScriptTaskContext struct {
JobService portainer.JobService
EndpointService portainer.EndpointService
FileService portainer.FileService
ScheduleID portainer.ScheduleID
TargetEndpoints []portainer.EndpointID
}
// ScriptTask represents a task used to execute a script inside a privileged
// container. It can be scheduled.
type ScriptTask struct {
Image string
ScriptPath string
context *ScriptTaskContext
}
// NewScriptTask creates a new ScriptTask using the specified context.
func NewScriptTask(image, scriptPath string, context *ScriptTaskContext) ScriptTask {
return ScriptTask{
Image: image,
ScriptPath: scriptPath,
context: context,
}
}
// SetContext can be used to set/override the task context
func (task ScriptTask) SetContext(context *ScriptTaskContext) {
task.context = context
}
// Run triggers the execution of the task.
// It will iterate through all the endpoints specified in the context to
// execute the script associated to the task.
func (task ScriptTask) Run() {
scriptFile, err := task.context.FileService.GetFileContent(task.ScriptPath)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to retrieve script file (err=%s)\n", err)
return
}
for _, endpointID := range task.context.TargetEndpoints {
endpoint, err := task.context.EndpointService.Endpoint(endpointID)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to retrieve information about endpoint (id=%d) (err=%s)\n", endpointID, err)
return
}
err = task.context.JobService.Execute(endpoint, "", task.Image, scriptFile)
if err != nil {
log.Printf("scheduled task error (script execution). Unable to execute scrtip (endpoint=%s) (err=%s)\n", endpoint.Name, err)
}
}
}

61
api/cron/task_snapshot.go Normal file
View file

@ -0,0 +1,61 @@
package cron
import (
"log"
"github.com/portainer/portainer"
)
// SnapshotTaskContext represents the context required for the execution
// of a SnapshotTask.
type SnapshotTaskContext struct {
EndpointService portainer.EndpointService
Snapshotter portainer.Snapshotter
}
// SnapshotTask represents a task used to create endpoint snapshots.
// It can be scheduled.
type SnapshotTask struct {
context *SnapshotTaskContext
}
// NewSnapshotTask creates a new ScriptTask using the specified context.
func NewSnapshotTask(context *SnapshotTaskContext) SnapshotTask {
return SnapshotTask{
context: context,
}
}
// Run triggers the execution of the task.
// It will iterate through all the endpoints available in the database to
// create a snapshot of each one of them.
func (task SnapshotTask) Run() {
endpoints, err := task.context.EndpointService.Endpoints()
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to retrieve endpoint list (err=%s)\n", err)
return
}
for _, endpoint := range endpoints {
if endpoint.Type == portainer.AzureEnvironment {
continue
}
snapshot, err := task.context.Snapshotter.CreateSnapshot(&endpoint)
endpoint.Status = portainer.EndpointStatusUp
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to create snapshot (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
endpoint.Status = portainer.EndpointStatusDown
}
if snapshot != nil {
endpoint.Snapshots = []portainer.Snapshot{*snapshot}
}
err = task.context.EndpointService.UpdateEndpoint(endpoint.ID, &endpoint)
if err != nil {
log.Printf("background task error (endpoint snapshot). Unable to update endpoint (endpoint=%s, URL=%s) (err=%s)\n", endpoint.Name, endpoint.URL, err)
return
}
}
}