...
1 package kclient
2
3 import (
4 "context"
5 "fmt"
6 "io"
7
8 batchv1 "k8s.io/api/batch/v1"
9 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10 "k8s.io/apimachinery/pkg/fields"
11 "k8s.io/apimachinery/pkg/labels"
12 "k8s.io/klog"
13 )
14
15
16 const (
17 JobsKind = "Job"
18 JobsAPIVersion = "batch/v1"
19
20
21
22
23 JobNameOdoMaxLength = 60
24 )
25
26 func (c *Client) ListJobs(selector string) (*batchv1.JobList, error) {
27 return c.KubeClient.BatchV1().Jobs(c.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector})
28 }
29
30
31 func (c *Client) CreateJob(job batchv1.Job, namespace string) (*batchv1.Job, error) {
32 if namespace == "" {
33 namespace = c.Namespace
34 }
35 createdJob, err := c.KubeClient.BatchV1().Jobs(namespace).Create(context.TODO(), &job, metav1.CreateOptions{FieldManager: FieldManager})
36 if err != nil {
37 return nil, fmt.Errorf("unable to create Jobs: %w", err)
38 }
39 return createdJob, nil
40 }
41
42
43 func (c *Client) WaitForJobToComplete(job *batchv1.Job) (*batchv1.Job, error) {
44 klog.V(3).Infof("Waiting for Job %s to complete successfully", job.Name)
45
46 w, err := c.KubeClient.BatchV1().Jobs(c.Namespace).Watch(context.TODO(), metav1.ListOptions{
47 FieldSelector: fields.Set{"metadata.name": job.Name}.AsSelector().String(),
48 })
49 if err != nil {
50 return nil, fmt.Errorf("unable to watch job: %w", err)
51 }
52 defer w.Stop()
53
54 for {
55 val, ok := <-w.ResultChan()
56 if !ok {
57 break
58 }
59
60 wJob, ok := val.Object.(*batchv1.Job)
61 if !ok {
62 klog.V(4).Infof("did not receive job object, received: %v", val)
63 continue
64 }
65 for _, condition := range wJob.Status.Conditions {
66 if condition.Type == batchv1.JobFailed {
67 klog.V(4).Infof("Failed to execute the job, reason: %s", condition.String())
68
69 return wJob, fmt.Errorf("failed to execute the job")
70 }
71 if condition.Type == batchv1.JobComplete {
72 return wJob, nil
73 }
74 }
75 }
76 return nil, nil
77 }
78
79
80 func (c *Client) GetJobLogs(job *batchv1.Job, containerName string) (io.ReadCloser, error) {
81
82
83 selector := labels.Set{"controller-uid": string(job.UID), "job-name": job.Name}.AsSelector().String()
84 pods, err := c.GetPodsMatchingSelector(selector)
85 if err != nil {
86 return nil, err
87 }
88 if len(pods.Items) == 0 {
89 return nil, fmt.Errorf("no pod found for job %q", job.Name)
90 }
91 pod := pods.Items[0]
92 return c.GetPodLogs(pod.Name, containerName, false)
93 }
94
95 func (c *Client) DeleteJob(jobName string) error {
96 propagationPolicy := metav1.DeletePropagationBackground
97 return c.KubeClient.BatchV1().Jobs(c.Namespace).Delete(context.Background(), jobName, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy})
98 }
99
View as plain text