...

Source file src/github.com/redhat-developer/odo/pkg/kclient/pods.go

Documentation: github.com/redhat-developer/odo/pkg/kclient

     1  package kclient
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  
     9  	// api resource types
    10  
    11  	corev1 "k8s.io/api/core/v1"
    12  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    13  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    14  	"k8s.io/apimachinery/pkg/runtime/schema"
    15  	"k8s.io/apimachinery/pkg/watch"
    16  	"k8s.io/client-go/kubernetes/scheme"
    17  	"k8s.io/client-go/tools/remotecommand"
    18  
    19  	"github.com/redhat-developer/odo/pkg/platform"
    20  )
    21  
    22  // ExecCMDInContainer execute command in the container of a pod, pass an empty string for containerName to execute in the first container of the pod
    23  func (c *Client) ExecCMDInContainer(ctx context.Context, containerName, podName string, cmd []string, stdout, stderr io.Writer, stdin io.Reader, tty bool) error {
    24  	podExecOptions := corev1.PodExecOptions{
    25  		Command: cmd,
    26  		Stdin:   stdin != nil,
    27  		Stdout:  stdout != nil,
    28  		Stderr:  stderr != nil,
    29  		TTY:     tty,
    30  	}
    31  
    32  	// If a container name was passed in, set it in the exec options, otherwise leave it blank
    33  	if containerName != "" {
    34  		podExecOptions.Container = containerName
    35  	}
    36  
    37  	req := c.KubeClient.CoreV1().RESTClient().
    38  		Post().
    39  		Namespace(c.Namespace).
    40  		Resource("pods").
    41  		Name(podName).
    42  		SubResource("exec").
    43  		VersionedParams(&podExecOptions, scheme.ParameterCodec)
    44  
    45  	config, err := c.KubeConfig.ClientConfig()
    46  	if err != nil {
    47  		return fmt.Errorf("unable to get Kubernetes client config: %w", err)
    48  	}
    49  
    50  	// Connect to url (constructed from req) using SPDY (HTTP/2) protocol which allows bidirectional streams.
    51  	exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
    52  	if err != nil {
    53  		return fmt.Errorf("unable execute command via SPDY: %w", err)
    54  	}
    55  	// initialize the transport of the standard shell streams
    56  	err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
    57  		Stdin:  stdin,
    58  		Stdout: stdout,
    59  		Stderr: stderr,
    60  		Tty:    tty,
    61  	})
    62  	if err != nil {
    63  		return fmt.Errorf("error while streaming command: %w", err)
    64  	}
    65  
    66  	return nil
    67  }
    68  
    69  // GetPodUsingComponentName gets a pod using the component name
    70  func (c *Client) GetPodUsingComponentName(componentName string) (*corev1.Pod, error) {
    71  	podSelector := fmt.Sprintf("component=%s", componentName)
    72  	return c.GetRunningPodFromSelector(podSelector)
    73  }
    74  
    75  // GetRunningPodFromSelector gets a pod from the selector
    76  func (c *Client) GetRunningPodFromSelector(selector string) (*corev1.Pod, error) {
    77  	pods, err := c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{
    78  		LabelSelector: selector,
    79  		FieldSelector: "status.phase=Running",
    80  	})
    81  	if err != nil {
    82  		// Don't wrap error since we want to know if it's a forbidden error
    83  		// if we wrap, we lose the err status reason and callers of this api rely on it
    84  		return nil, err
    85  	}
    86  	numPods := len(pods.Items)
    87  	if numPods == 0 {
    88  		return nil, &platform.PodNotFoundError{Selector: selector}
    89  	} else if numPods > 1 {
    90  		return nil, fmt.Errorf("multiple Pods exist for the selector: %v. Only one must be present", selector)
    91  	}
    92  
    93  	// check if the pod is in the terminating state
    94  	if pods.Items[0].DeletionTimestamp != nil {
    95  		return nil, &platform.PodNotFoundError{Selector: selector}
    96  	}
    97  
    98  	return &pods.Items[0], nil
    99  }
   100  
   101  // GetPodLogs prints the log from pod to stdout
   102  func (c *Client) GetPodLogs(podName, containerName string, followLog bool) (io.ReadCloser, error) {
   103  
   104  	// Set standard log options
   105  	podLogOptions := corev1.PodLogOptions{Follow: false, Container: containerName}
   106  
   107  	// If the log is being followed, set it to follow / don't wait
   108  	if followLog {
   109  		podLogOptions = corev1.PodLogOptions{
   110  			Follow:    true,
   111  			Previous:  false,
   112  			Container: containerName,
   113  		}
   114  	}
   115  
   116  	// RESTClient call to kubernetes
   117  	rd, err := c.KubeClient.CoreV1().RESTClient().Get().
   118  		Namespace(c.Namespace).
   119  		Name(podName).
   120  		Resource("pods").
   121  		SubResource("log").
   122  		VersionedParams(&podLogOptions, scheme.ParameterCodec).
   123  		Stream(context.TODO())
   124  
   125  	return rd, err
   126  }
   127  
   128  func (c *Client) GetAllPodsInNamespaceMatchingSelector(selector string, ns string) (*corev1.PodList, error) {
   129  	podList, err := c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{})
   130  	if err != nil {
   131  		return nil, err
   132  	}
   133  
   134  	resources, err := c.GetAllResourcesFromSelector(selector, ns)
   135  	if err != nil {
   136  		return nil, err
   137  	}
   138  
   139  	var list corev1.PodList
   140  	// match pod ownerReference (if any) with resources matching the selector
   141  	for _, pod := range podList.Items {
   142  		for _, owner := range pod.GetOwnerReferences() {
   143  			var match bool
   144  			match, err = matchOwnerReferenceWithResources(c, owner, resources)
   145  			if err != nil {
   146  				return nil, err
   147  			}
   148  			if match {
   149  				list.Items = append(list.Items, pod)
   150  				break // because we don't need to check other owner references of the pod anymore
   151  			}
   152  		}
   153  	}
   154  
   155  	return &list, err
   156  }
   157  
   158  // matchOwnerReferenceWithResources recursively checks if the owner reference passed to it matches any of the resources
   159  // This is useful when trying to find if a pod is owned by any of the ReplicaSet or Deployment in the cluster.
   160  func matchOwnerReferenceWithResources(c ClientInterface, owner metav1.OwnerReference, resources []unstructured.Unstructured) (bool, error) {
   161  	// first, check if ownerReference belongs to any of the resources
   162  	for _, resource := range resources {
   163  		if resource.GetUID() != "" && owner.UID != "" && resource.GetUID() == owner.UID {
   164  			return true, nil
   165  		}
   166  	}
   167  	// second, get the resource indicated by ownerReference and check its ownerReferences field
   168  	restMapping, err := c.GetRestMappingFromGVK(schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind))
   169  	if err != nil {
   170  		return false, err
   171  	}
   172  	resource, err := c.GetDynamicResource(restMapping.Resource, owner.Name)
   173  	if err != nil {
   174  		return false, err
   175  	}
   176  	ownerReferences := resource.GetOwnerReferences()
   177  	// recursively check if ownerReference matches any of the resources' UID
   178  	for _, ownerReference := range ownerReferences {
   179  		return matchOwnerReferenceWithResources(c, ownerReference, resources)
   180  	}
   181  	return false, nil
   182  }
   183  
   184  func (c *Client) GetPodsMatchingSelector(selector string) (*corev1.PodList, error) {
   185  	return c.KubeClient.CoreV1().Pods(c.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector})
   186  }
   187  
   188  func (c *Client) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
   189  	ns := c.GetCurrentNamespace()
   190  	w, err := c.GetClient().CoreV1().Pods(ns).
   191  		Watch(ctx, metav1.ListOptions{
   192  			LabelSelector: selector,
   193  		})
   194  	if err != nil {
   195  		return nil, err
   196  	}
   197  	if w == nil {
   198  		return nil, errors.New("got a nil pod watcher, which can happen in some edge cases, " +
   199  			"such as when there is a configuration issue or network failure during the creation of the watcher object")
   200  	}
   201  	return w, nil
   202  }
   203  
   204  func (c *Client) IsPodNameMatchingSelector(ctx context.Context, podname string, selector string) (bool, error) {
   205  	ns := c.GetCurrentNamespace()
   206  	list, err := c.GetClient().CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
   207  		FieldSelector: "metadata.name=" + podname,
   208  		LabelSelector: selector,
   209  	})
   210  	if err != nil {
   211  		return false, err
   212  	}
   213  	return len(list.Items) > 0, nil
   214  }
   215  

View as plain text