1
0
Fork 0
mirror of https://github.com/portainer/portainer.git synced 2025-07-19 13:29:41 +02:00

feat(kubernetes): support for jobs and cron jobs - r8s-182 (#260)

Co-authored-by: James Carppe <85850129+jamescarppe@users.noreply.github.com>
Co-authored-by: Anthony Lapenna <anthony.lapenna@portainer.io>
Co-authored-by: andres-portainer <91705312+andres-portainer@users.noreply.github.com>
Co-authored-by: Oscar Zhou <100548325+oscarzhou-portainer@users.noreply.github.com>
Co-authored-by: Yajith Dayarathna <yajith.dayarathna@portainer.io>
Co-authored-by: LP B <xAt0mZ@users.noreply.github.com>
Co-authored-by: oscarzhou <oscar.zhou@portainer.io>
Co-authored-by: testA113 <aliharriss1995@gmail.com>
This commit is contained in:
Steven Kang 2025-01-10 13:21:27 +13:00 committed by GitHub
parent 24fdb1f600
commit d32b0f8b7e
51 changed files with 1786 additions and 22 deletions

View file

@ -0,0 +1,123 @@
package cli
import (
"context"
"strings"
models "github.com/portainer/portainer/api/http/models/kubernetes"
"github.com/portainer/portainer/api/internal/errorlist"
batchv1 "k8s.io/api/batch/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// GetCronJobs returns all cronjobs in the given namespace
// If the user is a kube admin, it returns all cronjobs in the namespace
// Otherwise, it returns only the cronjobs in the non-admin namespaces
func (kcl *KubeClient) GetCronJobs(namespace string) ([]models.K8sCronJob, error) {
if kcl.IsKubeAdmin {
return kcl.fetchCronJobs(namespace)
}
return kcl.fetchCronJobsForNonAdmin(namespace)
}
// fetchCronJobsForNonAdmin returns all cronjobs in the given namespace
// It returns only the cronjobs in the non-admin namespaces
func (kcl *KubeClient) fetchCronJobsForNonAdmin(namespace string) ([]models.K8sCronJob, error) {
cronJobs, err := kcl.fetchCronJobs(namespace)
if err != nil {
return nil, err
}
nonAdminNamespaceSet := kcl.buildNonAdminNamespacesMap()
results := make([]models.K8sCronJob, 0)
for _, cronJob := range cronJobs {
if _, ok := nonAdminNamespaceSet[cronJob.Namespace]; ok {
results = append(results, cronJob)
}
}
return results, nil
}
// fetchCronJobs returns all cronjobs in the given namespace
// It returns all cronjobs in the namespace
func (kcl *KubeClient) fetchCronJobs(namespace string) ([]models.K8sCronJob, error) {
cronJobs, err := kcl.cli.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
jobs, err := kcl.cli.BatchV1().Jobs(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
results := make([]models.K8sCronJob, 0)
for _, cronJob := range cronJobs.Items {
results = append(results, kcl.parseCronJob(cronJob, jobs))
}
return results, nil
}
// parseCronJob converts a batchv1.CronJob object to a models.K8sCronJob object.
func (kcl *KubeClient) parseCronJob(cronJob batchv1.CronJob, jobsList *batchv1.JobList) models.K8sCronJob {
jobs, err := kcl.getCronJobExecutions(cronJob.Name, jobsList)
if err != nil {
return models.K8sCronJob{}
}
timezone := "<none>"
if cronJob.Spec.TimeZone != nil {
timezone = *cronJob.Spec.TimeZone
}
suspend := false
if cronJob.Spec.Suspend != nil {
suspend = *cronJob.Spec.Suspend
}
return models.K8sCronJob{
Id: string(cronJob.UID),
Name: cronJob.Name,
Namespace: cronJob.Namespace,
Command: strings.Join(cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command, " "),
Schedule: cronJob.Spec.Schedule,
Timezone: timezone,
Suspend: suspend,
Jobs: jobs,
IsSystem: kcl.isSystemCronJob(cronJob.Namespace),
}
}
func (kcl *KubeClient) isSystemCronJob(namespace string) bool {
return kcl.isSystemNamespace(namespace)
}
// DeleteCronJobs deletes the provided list of cronjobs in its namespace
// it returns an error if any of the cronjobs are not found or if there is an error deleting the cronjobs
func (kcl *KubeClient) DeleteCronJobs(payload models.K8sCronJobDeleteRequests) error {
var errors []error
for namespace := range payload {
for _, cronJobName := range payload[namespace] {
client := kcl.cli.BatchV1().CronJobs(namespace)
_, err := client.Get(context.Background(), cronJobName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
continue
}
errors = append(errors, err)
}
if err := client.Delete(context.Background(), cronJobName, metav1.DeleteOptions{}); err != nil {
errors = append(errors, err)
}
}
}
return errorlist.Combine(errors)
}

View file

@ -0,0 +1,66 @@
package cli
import (
"context"
"testing"
models "github.com/portainer/portainer/api/http/models/kubernetes"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kfake "k8s.io/client-go/kubernetes/fake"
)
// TestFetchCronJobs tests the fetchCronJobs method for both admin and non-admin clients
// It creates a fake Kubernetes client and passes it to the fetchCronJobs method
// It then logs the fetched Cron Jobs
// non-admin client will have access to the default namespace only
func (kcl *KubeClient) TestFetchCronJobs(t *testing.T) {
t.Run("admin client can fetch Cron Jobs from all namespaces", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
kcl.IsKubeAdmin = true
cronJobs, err := kcl.GetCronJobs("")
if err != nil {
t.Fatalf("Failed to fetch Cron Jobs: %v", err)
}
t.Logf("Fetched Cron Jobs: %v", cronJobs)
})
t.Run("non-admin client can fetch Cron Jobs from the default namespace only", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
kcl.IsKubeAdmin = false
kcl.NonAdminNamespaces = []string{"default"}
cronJobs, err := kcl.GetCronJobs("")
if err != nil {
t.Fatalf("Failed to fetch Cron Jobs: %v", err)
}
t.Logf("Fetched Cron Jobs: %v", cronJobs)
})
t.Run("delete Cron Jobs", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
_, err := kcl.cli.BatchV1().CronJobs("default").Create(context.Background(), &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{Name: "test-cronjob"},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create cron job: %v", err)
}
err = kcl.DeleteCronJobs(models.K8sCronJobDeleteRequests{
"default": []string{"test-cronjob"},
})
if err != nil {
t.Fatalf("Failed to delete Cron Jobs: %v", err)
}
t.Logf("Deleted Cron Jobs")
})
}

227
api/kubernetes/cli/job.go Normal file
View file

@ -0,0 +1,227 @@
package cli
import (
"context"
"fmt"
"sort"
"strings"
"time"
models "github.com/portainer/portainer/api/http/models/kubernetes"
"github.com/portainer/portainer/api/internal/errorlist"
"github.com/rs/zerolog/log"
batchv1 "k8s.io/api/batch/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// GetJobs returns all jobs in the given namespace
// If the user is a kube admin, it returns all jobs in the namespace
// Otherwise, it returns only the jobs in the non-admin namespaces
func (kcl *KubeClient) GetJobs(namespace string, includeCronJobChildren bool) ([]models.K8sJob, error) {
if kcl.IsKubeAdmin {
return kcl.fetchJobs(namespace, includeCronJobChildren)
}
return kcl.fetchJobsForNonAdmin(namespace, includeCronJobChildren)
}
// fetchJobsForNonAdmin returns all jobs in the given namespace
// It returns only the jobs in the non-admin namespaces
func (kcl *KubeClient) fetchJobsForNonAdmin(namespace string, includeCronJobChildren bool) ([]models.K8sJob, error) {
jobs, err := kcl.fetchJobs(namespace, includeCronJobChildren)
if err != nil {
return nil, err
}
nonAdminNamespaceSet := kcl.buildNonAdminNamespacesMap()
results := make([]models.K8sJob, 0)
for _, job := range jobs {
if _, ok := nonAdminNamespaceSet[job.Namespace]; ok {
results = append(results, job)
}
}
return results, nil
}
// fetchJobs returns all jobs in the given namespace
// It returns all jobs in the namespace
func (kcl *KubeClient) fetchJobs(namespace string, includeCronJobChildren bool) ([]models.K8sJob, error) {
jobs, err := kcl.cli.BatchV1().Jobs(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
results := make([]models.K8sJob, 0)
for _, job := range jobs.Items {
if !includeCronJobChildren && checkCronJobOwner(job) {
continue
}
results = append(results, kcl.parseJob(job))
}
return results, nil
}
// checkCronJobOwner checks if the job has a cronjob owner
// it returns true if the job has a cronjob owner
// otherwise, it returns false
func checkCronJobOwner(job batchv1.Job) bool {
for _, owner := range job.OwnerReferences {
if owner.Kind == "CronJob" {
return true
}
}
return false
}
// parseJob converts a batchv1.Job object to a models.K8sJob object.
func (kcl *KubeClient) parseJob(job batchv1.Job) models.K8sJob {
times := parseJobTimes(job)
status, failedReason := determineJobStatus(job)
podName := getJobPodName(kcl, job)
return models.K8sJob{
ID: string(job.UID),
Namespace: job.Namespace,
Name: job.Name,
PodName: podName,
Command: strings.Join(job.Spec.Template.Spec.Containers[0].Command, " "),
Container: job.Spec.Template.Spec.Containers[0],
BackoffLimit: *job.Spec.BackoffLimit,
Completions: *job.Spec.Completions,
StartTime: times.start,
FinishTime: times.finish,
Duration: times.duration,
Status: status,
FailedReason: failedReason,
IsSystem: kcl.isSystemJob(job.Namespace),
}
}
func (kcl *KubeClient) isSystemJob(namespace string) bool {
return kcl.isSystemNamespace(namespace)
}
type jobTimes struct {
start string
finish string
duration string
}
func parseJobTimes(job batchv1.Job) jobTimes {
times := jobTimes{
start: "N/A",
finish: "N/A",
duration: "N/A",
}
if st := job.Status.StartTime; st != nil {
times.start = st.Time.Format(time.RFC3339)
times.duration = time.Since(st.Time).Truncate(time.Minute).String()
if ct := job.Status.CompletionTime; ct != nil {
times.finish = ct.Time.Format(time.RFC3339)
times.duration = ct.Time.Sub(st.Time).String()
}
}
return times
}
func determineJobStatus(job batchv1.Job) (status, failedReason string) {
failedReason = "N/A"
switch {
case job.Status.Failed > 0:
return "Failed", getLatestJobCondition(job.Status.Conditions)
case job.Status.Succeeded > 0:
return "Succeeded", failedReason
case job.Status.Active == 0:
return "Completed", failedReason
default:
return "Running", failedReason
}
}
func getJobPodName(kcl *KubeClient, job batchv1.Job) string {
pod, err := kcl.getLatestJobPod(job.Namespace, job.Name)
if err != nil {
log.Warn().Err(err).
Str("job", job.Name).
Str("namespace", job.Namespace).
Msg("Failed to get latest job pod")
return ""
}
if pod != nil {
return pod.Name
}
return ""
}
// getCronJobExecutions returns the jobs for a given cronjob
// it returns the jobs for the cronjob
func (kcl *KubeClient) getCronJobExecutions(cronJobName string, jobs *batchv1.JobList) ([]models.K8sJob, error) {
maxItems := 5
results := make([]models.K8sJob, 0)
for _, job := range jobs.Items {
for _, owner := range job.OwnerReferences {
if owner.Kind == "CronJob" && owner.Name == cronJobName {
results = append(results, kcl.parseJob(job))
if len(results) >= maxItems {
return results, nil
}
}
}
}
return results, nil
}
// DeleteJobs deletes the provided list of jobs
// it returns an error if any of the jobs are not found or if there is an error deleting the jobs
func (kcl *KubeClient) DeleteJobs(payload models.K8sJobDeleteRequests) error {
var errors []error
for namespace := range payload {
for _, jobName := range payload[namespace] {
client := kcl.cli.BatchV1().Jobs(namespace)
_, err := client.Get(context.Background(), jobName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
continue
}
errors = append(errors, err)
}
if err := client.Delete(context.Background(), jobName, metav1.DeleteOptions{}); err != nil {
errors = append(errors, err)
}
}
}
return errorlist.Combine(errors)
}
// getLatestJobCondition returns the latest condition of the job
// it returns the latest condition of the job
// this is only used for the failed reason
func getLatestJobCondition(conditions []batchv1.JobCondition) string {
if len(conditions) == 0 {
return "No conditions"
}
sort.Slice(conditions, func(i, j int) bool {
return conditions[i].LastTransitionTime.After(conditions[j].LastTransitionTime.Time)
})
latest := conditions[0]
return fmt.Sprintf("%s: %s", latest.Type, latest.Message)
}

View file

@ -0,0 +1,64 @@
package cli
import (
"context"
"testing"
models "github.com/portainer/portainer/api/http/models/kubernetes"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kfake "k8s.io/client-go/kubernetes/fake"
)
// TestFetchJobs tests the fetchJobs method for both admin and non-admin clients
// It creates a fake Kubernetes client and passes it to the fetchJobs method
// It then logs the fetched jobs
// non-admin client will have access to the default namespace only
func (kcl *KubeClient) TestFetchJobs(t *testing.T) {
t.Run("admin client can fetch jobs from all namespaces", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
kcl.IsKubeAdmin = true
jobs, err := kcl.GetJobs("", false)
if err != nil {
t.Fatalf("Failed to fetch jobs: %v", err)
}
t.Logf("Fetched jobs: %v", jobs)
})
t.Run("non-admin client can fetch jobs from the default namespace only", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
kcl.IsKubeAdmin = false
kcl.NonAdminNamespaces = []string{"default"}
jobs, err := kcl.GetJobs("", false)
if err != nil {
t.Fatalf("Failed to fetch jobs: %v", err)
}
t.Logf("Fetched jobs: %v", jobs)
})
t.Run("delete jobs", func(t *testing.T) {
kcl.cli = kfake.NewSimpleClientset()
kcl.instanceID = "test"
_, err := kcl.cli.BatchV1().Jobs("default").Create(context.Background(), &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: "test-job"},
}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create job: %v", err)
}
err = kcl.DeleteJobs(models.K8sJobDeleteRequests{
"default": []string{"test-job"},
})
if err != nil {
t.Fatalf("Failed to delete jobs: %v", err)
}
})
}

View file

@ -275,3 +275,22 @@ func isPodUsingSecret(pod *corev1.Pod, secretName string) bool {
return false
}
// getLatestJobPod returns the pods that are owned by a job
// it returns an error if there is an error fetching the pods
func (kcl *KubeClient) getLatestJobPod(namespace string, jobName string) (*corev1.Pod, error) {
pods, err := kcl.cli.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
for _, owner := range pod.OwnerReferences {
if owner.Kind == "Job" && owner.Name == jobName {
return &pod, nil
}
}
}
return nil, nil
}