...

Source file src/github.com/redhat-developer/odo/pkg/remotecmd/kubeexec.go

Documentation: github.com/redhat-developer/odo/pkg/remotecmd

     1  package remotecmd
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strconv"
     7  	"strings"
     8  	"time"
     9  
    10  	"k8s.io/klog"
    11  
    12  	"github.com/redhat-developer/odo/pkg/exec"
    13  	"github.com/redhat-developer/odo/pkg/storage"
    14  	"github.com/redhat-developer/odo/pkg/task"
    15  )
    16  
    17  // kubeExecProcessHandler implements RemoteProcessHandler by executing Devfile commands right away in the container
    18  // (like a 'kubectl exec'-like approach). Command execution is done in the background, in a separate goroutine.
    19  // It works by storing the parent process PID in a file in the container,
    20  // then fires the exec command in the background.
    21  // The goroutine started can then be stopped by killing the process stored in the state file (_startCmdProcessPidFile)
    22  // in the container.
    23  type kubeExecProcessHandler struct {
    24  	execClient exec.Client
    25  }
    26  
    27  // This allows to verify interface compliance at compile-time.
    28  // See https://github.com/redhat-developer/odo/wiki/Dev:-Coding-Conventions#verify-interface-compliance
    29  var _ RemoteProcessHandler = (*kubeExecProcessHandler)(nil)
    30  
    31  func NewKubeExecProcessHandler(execClient exec.Client) *kubeExecProcessHandler {
    32  	return &kubeExecProcessHandler{
    33  		execClient: execClient,
    34  	}
    35  }
    36  
    37  // GetProcessInfoForCommand returns information about the process representing the given Devfile command.
    38  // A PID of 0 denotes a Stopped process.
    39  func (k *kubeExecProcessHandler) GetProcessInfoForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string) (RemoteProcessInfo, error) {
    40  	klog.V(4).Infof("GetProcessInfoForCommand for %q", def.Id)
    41  
    42  	pid, exitStatus, err := k.getRemoteProcessPID(ctx, def, podName, containerName)
    43  	if err != nil {
    44  		return RemoteProcessInfo{}, err
    45  	}
    46  
    47  	return k.getProcessInfoFromPid(ctx, pid, exitStatus, podName, containerName)
    48  }
    49  
    50  // StartProcessForCommand runs the (potentially never finishing) Devfile command in the background.
    51  // The goroutine spawned here can be stopped either by stopping the parent process (e.g., 'odo dev'),
    52  // or by stopping the underlying remote process by calling the StopProcessForCommand method.
    53  func (k *kubeExecProcessHandler) StartProcessForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string, outputHandler CommandOutputHandler) error {
    54  	klog.V(4).Infof("StartProcessForCommand for %q", def.Id)
    55  
    56  	// deal with environment variables
    57  	cmdLine := def.CmdLine
    58  	envCommands := make([]string, 0, len(def.EnvVars))
    59  	for _, envVar := range def.EnvVars {
    60  		envCommands = append(envCommands, fmt.Sprintf("%s='%s'", envVar.Key, envVar.Value))
    61  	}
    62  	var setEnvCmd string
    63  	if len(envCommands) != 0 {
    64  		setEnvCmd = fmt.Sprintf("export %s &&", strings.Join(envCommands, " "))
    65  	}
    66  
    67  	var cdCmd string
    68  	if def.WorkingDir != "" {
    69  		// Change to the workdir and execute the command
    70  		cdCmd = fmt.Sprintf("cd %s &&", def.WorkingDir)
    71  	}
    72  
    73  	// since we are using /bin/sh -c, the command needs to be within a single double quote instance,
    74  	// for example "cd /tmp && pwd"
    75  	// Full command is: /bin/sh -c "[cd $workingDir && ] echo $$ > $pidFile && (envVar1='value1' envVar2='value2' $cmdLine) 1>>/proc/1/fd/1 2>>/proc/1/fd/2"
    76  	//
    77  	// "echo $$ > $pidFile" allows to store the /bin/sh parent process PID. It will allow to determine its children later on and kill them when a stop is requested.
    78  	// ($cmdLine) runs the command passed in a subshell (to handle cases where the command does more complex things like running processes in the background),
    79  	// which will be the child process of the /bin/sh one.
    80  	//
    81  	// Redirecting to /proc/1/fd/* allows to redirect the process output to the output streams of PID 1 process inside the container.
    82  	// This way, returning the container logs with 'odo logs' or 'kubectl logs' would work seamlessly.
    83  	// See https://stackoverflow.com/questions/58716574/where-exactly-do-the-logs-of-kubernetes-pods-come-from-at-the-container-level
    84  	pidFile := getPidFileForCommand(def)
    85  	cmd := []string{
    86  		ShellExecutable, "-c",
    87  		fmt.Sprintf("echo $$ > %[1]s && %s %s (%s) 1>>/proc/1/fd/1 2>>/proc/1/fd/2; echo $? >> %[1]s", pidFile, cdCmd, setEnvCmd, cmdLine),
    88  	}
    89  
    90  	//Monitoring go-routine
    91  	type event struct {
    92  		status RemoteProcessStatus
    93  		stdout []string
    94  		stderr []string
    95  		err    error
    96  	}
    97  	eventsChan := make(chan event)
    98  	eventsReceived := make(map[RemoteProcessStatus]struct{})
    99  	go func() {
   100  		for e := range eventsChan {
   101  			klog.V(5).Infof("event received for %q: %v, %v", def.Id, e.status, e.err)
   102  			if _, ok := eventsReceived[e.status]; ok {
   103  				continue
   104  			}
   105  			if outputHandler != nil {
   106  				outputHandler(e.status, e.stdout, e.stderr, e.err)
   107  			}
   108  			eventsReceived[e.status] = struct{}{}
   109  		}
   110  	}()
   111  
   112  	eventsChan <- event{status: Starting}
   113  
   114  	go func() {
   115  		eventsChan <- event{status: Running}
   116  		stdout, stderr, err := k.execClient.ExecuteCommand(ctx, cmd, podName, containerName, false, nil, nil)
   117  		if err != nil {
   118  			klog.V(2).Infof("error while running background command: %v", err)
   119  		}
   120  
   121  		processInfo, infoErr := k.GetProcessInfoForCommand(ctx, def, podName, containerName)
   122  		var status RemoteProcessStatus
   123  		if infoErr != nil {
   124  			status = Errored
   125  		} else {
   126  			status = processInfo.Status
   127  		}
   128  
   129  		eventsChan <- event{
   130  			status: status,
   131  			stdout: stdout,
   132  			stderr: stderr,
   133  			err:    err,
   134  		}
   135  
   136  		close(eventsChan)
   137  	}()
   138  
   139  	return nil
   140  }
   141  
   142  // StopProcessForCommand stops the process representing the specified Devfile command.
   143  // Because of the way this process is launched and its PID stored (see StartProcessForCommand),
   144  // we need to determine the process children (there should be only one child which is the sub-shell running the command passed to StartProcessForCommand).
   145  // Then killing those children will exit the parent 'sh' process.
   146  func (k *kubeExecProcessHandler) StopProcessForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string) error {
   147  	klog.V(4).Infof("StopProcessForCommand for %q", def.Id)
   148  
   149  	kill := func(p int) error {
   150  		_, _, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("kill %d || true", p)}, podName, containerName, false, nil, nil)
   151  		if err != nil {
   152  			return err
   153  		}
   154  
   155  		//Because the process we just stopped might take longer to exit (it might have caught the signal and is performing additional cleanup),
   156  		//retry detecting its actual state till it is stopped or timeout expires
   157  		var processInfo interface{}
   158  		processInfo, err = task.NewRetryable(fmt.Sprintf("status for remote process %d", p), func() (bool, interface{}, error) {
   159  			pInfo, e := k.getProcessInfoFromPid(ctx, p, 0, podName, containerName)
   160  			return e == nil && (pInfo.Status == Stopped || pInfo.Status == Errored), pInfo, e
   161  		}).RetryWithSchedule([]time.Duration{2 * time.Second, 4 * time.Second, 8 * time.Second}, true)
   162  		if err != nil {
   163  			return err
   164  		}
   165  
   166  		pInfo, ok := processInfo.(RemoteProcessInfo)
   167  		if !ok {
   168  			klog.V(2).Infof("invalid type for remote process (%d) info, expected RemoteProcessInfo", p)
   169  			return fmt.Errorf("internal error while checking remote process status: %d", p)
   170  		}
   171  		if pInfo.Status != Stopped && pInfo.Status != Errored {
   172  			return fmt.Errorf("invalid status for remote process %d: %+v", p, processInfo)
   173  		}
   174  		return nil
   175  	}
   176  
   177  	ppid, _, err := k.getRemoteProcessPID(ctx, def, podName, containerName)
   178  	if err != nil {
   179  		return err
   180  	}
   181  	if ppid == 0 {
   182  		return nil
   183  	}
   184  	defer func() {
   185  		if kErr := kill(ppid); kErr != nil {
   186  			klog.V(3).Infof("could not kill parent process %d: %v", ppid, kErr)
   187  		}
   188  	}()
   189  
   190  	children, err := k.getProcessChildren(ctx, ppid, podName, containerName)
   191  	if err != nil {
   192  		return err
   193  	}
   194  
   195  	klog.V(3).Infof("Found %d children (either direct and indirect) for parent process %d: %v", len(children), ppid, children)
   196  
   197  	pidFile := getPidFileForCommand(def)
   198  	_, _, err = k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("rm -f %s", pidFile)}, podName, containerName, false, nil, nil)
   199  	if err != nil {
   200  		klog.V(2).Infof("Could not remove file %q: %v", pidFile, err)
   201  	}
   202  
   203  	for _, child := range children {
   204  		if err = kill(child); err != nil {
   205  			return err
   206  		}
   207  	}
   208  
   209  	return nil
   210  }
   211  
   212  func (k *kubeExecProcessHandler) getRemoteProcessPID(ctx context.Context, def CommandDefinition, podName string, containerName string) (int, int, error) {
   213  	pidFile := getPidFileForCommand(def)
   214  	stdout, stderr, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("cat %s || true", pidFile)}, podName, containerName, false, nil, nil)
   215  
   216  	if err != nil {
   217  		return 0, 0, err
   218  	}
   219  
   220  	if len(stdout) == 0 {
   221  		//The file does not exist. We assume the process has not run yet.
   222  		return 0, 0, nil
   223  	}
   224  	if len(stdout) > 2 {
   225  		return 0, 0, fmt.Errorf(
   226  			"unable to determine command status, due to unexpected number of lines for %s, output: %v %v",
   227  			pidFile, stdout, stderr)
   228  	}
   229  
   230  	line := stdout[0]
   231  	var pid int
   232  	pid, err = strconv.Atoi(strings.TrimSpace(line))
   233  	if err != nil {
   234  		klog.V(2).Infof("error while trying to retrieve status of command: %+v", err)
   235  		return 0, 0, fmt.Errorf("unable to determine command status, due to unexpected PID content for %s: %s",
   236  			pidFile, line)
   237  	}
   238  	if len(stdout) == 1 {
   239  		return pid, 0, nil
   240  	}
   241  
   242  	line2 := stdout[1]
   243  	var exitStatus int
   244  	exitStatus, err = strconv.Atoi(strings.TrimSpace(line2))
   245  	if err != nil {
   246  		klog.V(2).Infof("error while trying to retrieve status of command: %+v", err)
   247  		return pid, 0, fmt.Errorf("unable to determine command status, due to unexpected exit status content for %s: %s",
   248  			pidFile, line2)
   249  	}
   250  
   251  	return pid, exitStatus, nil
   252  }
   253  
   254  func (k *kubeExecProcessHandler) getProcessInfoFromPid(ctx context.Context, pid int, lastKnownExitStatus int, podName string, containerName string) (RemoteProcessInfo, error) {
   255  	process := RemoteProcessInfo{Pid: pid}
   256  
   257  	if pid < 0 {
   258  		process.Status = Unknown
   259  		return process, fmt.Errorf("invalid PID value for remote process: %d", pid)
   260  	}
   261  	if pid == 0 {
   262  		process.Status = Stopped
   263  		return process, nil
   264  	}
   265  
   266  	//Now check that the PID value is a valid process
   267  	stdout, _, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("kill -0 %d; echo $?", pid)}, podName, containerName, false, nil, nil)
   268  
   269  	if err != nil {
   270  		process.Status = Unknown
   271  		return process, err
   272  	}
   273  
   274  	var killStatus int
   275  	killStatus, err = strconv.Atoi(strings.TrimSpace(stdout[0]))
   276  	if err != nil {
   277  		process.Status = Unknown
   278  		return process, err
   279  	}
   280  
   281  	if killStatus == 0 {
   282  		process.Status = Running
   283  	} else {
   284  		if lastKnownExitStatus == 0 {
   285  			process.Status = Stopped
   286  		} else {
   287  			process.Status = Errored
   288  		}
   289  	}
   290  
   291  	return process, nil
   292  }
   293  
   294  // getProcessChildren returns all the children (either direct or indirect) of the specified process in the given container.
   295  // It works by reading the /proc/<pid>/stat files, giving PPID for each PID.
   296  // The overall result is an ordered list of children PIDs obtained via a recursive post-order traversal algorithm,
   297  // so that the returned list can start with the deepest children processes.
   298  func (k *kubeExecProcessHandler) getProcessChildren(ctx context.Context, pid int, podName string, containerName string) ([]int, error) {
   299  	if pid <= 0 {
   300  		return nil, fmt.Errorf("invalid pid: %d", pid)
   301  	}
   302  
   303  	allProcesses, err := k.getAllProcesses(ctx, podName, containerName)
   304  	if err != nil {
   305  		return nil, err
   306  	}
   307  
   308  	var getProcessChildrenRec func(int) []int
   309  	getProcessChildrenRec = func(p int) []int {
   310  		var result []int
   311  		for _, child := range allProcesses[p] {
   312  			result = append(result, getProcessChildrenRec(child)...)
   313  		}
   314  		if p != pid {
   315  			// Do not include the root ppid, as we are getting only children.
   316  			result = append(result, p)
   317  		}
   318  		return result
   319  	}
   320  
   321  	return getProcessChildrenRec(pid), nil
   322  }
   323  
   324  // getAllProcesses returns a map of all the processes and their direct children:
   325  // i) the key is the process PID;
   326  // ii) and the value is a list of all its direct children.
   327  // It does so by reading all /proc/<pid>/stat files. More details on https://man7.org/linux/man-pages/man5/proc.5.html.
   328  func (k *kubeExecProcessHandler) getAllProcesses(ctx context.Context, podName string, containerName string) (map[int][]int, error) {
   329  	stdout, stderr, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", "cat /proc/*/stat || true"}, podName, containerName, false, nil, nil)
   330  	if err != nil {
   331  		klog.V(7).Infof("stdout: %s\n", strings.Join(stdout, "\n"))
   332  		klog.V(7).Infof("stderr: %s\n", strings.Join(stderr, "\n"))
   333  		return nil, err
   334  	}
   335  
   336  	allProcesses := make(map[int][]int)
   337  	for _, line := range stdout {
   338  		var pid int
   339  		_, err = fmt.Sscanf(line, "%d ", &pid)
   340  		if err != nil {
   341  			return nil, err
   342  		}
   343  
   344  		// Last index of the last ")" character to unambiguously parse the content.
   345  		// See https://unix.stackexchange.com/questions/558239/way-to-unambiguously-parse-proc-pid-stat-given-arbitrary-contents-of-name-fi
   346  		i := strings.LastIndex(line, ")")
   347  		if i < 0 {
   348  			continue
   349  		}
   350  
   351  		// At this point, "i" is the index of the last ")" character, and we have an additional space before the process state character, hence the "i+2".
   352  		// For example:
   353  		// 87 (main) S 0 81 81 0 -1 ...
   354  		// This is required to scan the ppid correctly.
   355  		var state byte
   356  		var ppid int
   357  		_, err = fmt.Sscanf(line[i+2:], "%c %d", &state, &ppid)
   358  		if err != nil {
   359  			return nil, err
   360  		}
   361  
   362  		allProcesses[ppid] = append(allProcesses[ppid], pid)
   363  	}
   364  
   365  	return allProcesses, nil
   366  }
   367  
   368  // getPidFileForCommand returns the path to the PID file in the remote container.
   369  // The parent folder is supposed to be existing, because it should be mounted in the container using the mandatory
   370  // shared volume (more info in the AddOdoMandatoryVolume function from the utils package).
   371  func getPidFileForCommand(def CommandDefinition) string {
   372  	parentDir := def.PidDirectory
   373  	if parentDir == "" {
   374  		parentDir = storage.SharedDataMountPath
   375  	}
   376  	return fmt.Sprintf("%s/.odo_cmd_%s.pid", strings.TrimSuffix(parentDir, "/"), def.Id)
   377  }
   378  

View as plain text