1 package kclient
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8
9
10
11 corev1 "k8s.io/api/core/v1"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
14 "k8s.io/apimachinery/pkg/runtime/schema"
15 "k8s.io/apimachinery/pkg/watch"
16 "k8s.io/client-go/kubernetes/scheme"
17 "k8s.io/client-go/tools/remotecommand"
18
19 "github.com/redhat-developer/odo/pkg/platform"
20 )
21
22
23 func (c *Client) ExecCMDInContainer(ctx context.Context, containerName, podName string, cmd []string, stdout, stderr io.Writer, stdin io.Reader, tty bool) error {
24 podExecOptions := corev1.PodExecOptions{
25 Command: cmd,
26 Stdin: stdin != nil,
27 Stdout: stdout != nil,
28 Stderr: stderr != nil,
29 TTY: tty,
30 }
31
32
33 if containerName != "" {
34 podExecOptions.Container = containerName
35 }
36
37 req := c.KubeClient.CoreV1().RESTClient().
38 Post().
39 Namespace(c.Namespace).
40 Resource("pods").
41 Name(podName).
42 SubResource("exec").
43 VersionedParams(&podExecOptions, scheme.ParameterCodec)
44
45 config, err := c.KubeConfig.ClientConfig()
46 if err != nil {
47 return fmt.Errorf("unable to get Kubernetes client config: %w", err)
48 }
49
50
51 exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
52 if err != nil {
53 return fmt.Errorf("unable execute command via SPDY: %w", err)
54 }
55
56 err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
57 Stdin: stdin,
58 Stdout: stdout,
59 Stderr: stderr,
60 Tty: tty,
61 })
62 if err != nil {
63 return fmt.Errorf("error while streaming command: %w", err)
64 }
65
66 return nil
67 }
68
69
70 func (c *Client) GetPodUsingComponentName(componentName string) (*corev1.Pod, error) {
71 podSelector := fmt.Sprintf("component=%s", componentName)
72 return c.GetRunningPodFromSelector(podSelector)
73 }
74
75
76 func (c *Client) GetRunningPodFromSelector(selector string) (*corev1.Pod, error) {
77 pods, err := c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{
78 LabelSelector: selector,
79 FieldSelector: "status.phase=Running",
80 })
81 if err != nil {
82
83
84 return nil, err
85 }
86 numPods := len(pods.Items)
87 if numPods == 0 {
88 return nil, &platform.PodNotFoundError{Selector: selector}
89 } else if numPods > 1 {
90 return nil, fmt.Errorf("multiple Pods exist for the selector: %v. Only one must be present", selector)
91 }
92
93
94 if pods.Items[0].DeletionTimestamp != nil {
95 return nil, &platform.PodNotFoundError{Selector: selector}
96 }
97
98 return &pods.Items[0], nil
99 }
100
101
102 func (c *Client) GetPodLogs(podName, containerName string, followLog bool) (io.ReadCloser, error) {
103
104
105 podLogOptions := corev1.PodLogOptions{Follow: false, Container: containerName}
106
107
108 if followLog {
109 podLogOptions = corev1.PodLogOptions{
110 Follow: true,
111 Previous: false,
112 Container: containerName,
113 }
114 }
115
116
117 rd, err := c.KubeClient.CoreV1().RESTClient().Get().
118 Namespace(c.Namespace).
119 Name(podName).
120 Resource("pods").
121 SubResource("log").
122 VersionedParams(&podLogOptions, scheme.ParameterCodec).
123 Stream(context.TODO())
124
125 return rd, err
126 }
127
128 func (c *Client) GetAllPodsInNamespaceMatchingSelector(selector string, ns string) (*corev1.PodList, error) {
129 podList, err := c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{})
130 if err != nil {
131 return nil, err
132 }
133
134 resources, err := c.GetAllResourcesFromSelector(selector, ns)
135 if err != nil {
136 return nil, err
137 }
138
139 var list corev1.PodList
140
141 for _, pod := range podList.Items {
142 for _, owner := range pod.GetOwnerReferences() {
143 var match bool
144 match, err = matchOwnerReferenceWithResources(c, owner, resources)
145 if err != nil {
146 return nil, err
147 }
148 if match {
149 list.Items = append(list.Items, pod)
150 break
151 }
152 }
153 }
154
155 return &list, err
156 }
157
158
159
160 func matchOwnerReferenceWithResources(c ClientInterface, owner metav1.OwnerReference, resources []unstructured.Unstructured) (bool, error) {
161
162 for _, resource := range resources {
163 if resource.GetUID() != "" && owner.UID != "" && resource.GetUID() == owner.UID {
164 return true, nil
165 }
166 }
167
168 restMapping, err := c.GetRestMappingFromGVK(schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind))
169 if err != nil {
170 return false, err
171 }
172 resource, err := c.GetDynamicResource(restMapping.Resource, owner.Name)
173 if err != nil {
174 return false, err
175 }
176 ownerReferences := resource.GetOwnerReferences()
177
178 for _, ownerReference := range ownerReferences {
179 return matchOwnerReferenceWithResources(c, ownerReference, resources)
180 }
181 return false, nil
182 }
183
184 func (c *Client) GetPodsMatchingSelector(selector string) (*corev1.PodList, error) {
185 return c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector})
186 }
187
188 func (c *Client) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
189 ns := c.GetCurrentNamespace()
190 w, err := c.GetClient().CoreV1().Pods(ns).
191 Watch(ctx, metav1.ListOptions{
192 LabelSelector: selector,
193 })
194 if err != nil {
195 return nil, err
196 }
197 if w == nil {
198 return nil, errors.New("got a nil pod watcher, which can happen in some edge cases, " +
199 "such as when there is a configuration issue or network failure during the creation of the watcher object")
200 }
201 return w, nil
202 }
203
204 func (c *Client) IsPodNameMatchingSelector(ctx context.Context, podname string, selector string) (bool, error) {
205 ns := c.GetCurrentNamespace()
206 list, err := c.GetClient().CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
207 FieldSelector: "metadata.name=" + podname,
208 LabelSelector: selector,
209 })
210 if err != nil {
211 return false, err
212 }
213 return len(list.Items) > 0, nil
214 }
215
View as plain text