1 package remotecmd
2
3 import (
4 "context"
5 "fmt"
6 "strconv"
7 "strings"
8 "time"
9
10 "k8s.io/klog"
11
12 "github.com/redhat-developer/odo/pkg/exec"
13 "github.com/redhat-developer/odo/pkg/storage"
14 "github.com/redhat-developer/odo/pkg/task"
15 )
16
17
18
19
20
21
22
23 type kubeExecProcessHandler struct {
24 execClient exec.Client
25 }
26
27
28
29 var _ RemoteProcessHandler = (*kubeExecProcessHandler)(nil)
30
31 func NewKubeExecProcessHandler(execClient exec.Client) *kubeExecProcessHandler {
32 return &kubeExecProcessHandler{
33 execClient: execClient,
34 }
35 }
36
37
38
39 func (k *kubeExecProcessHandler) GetProcessInfoForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string) (RemoteProcessInfo, error) {
40 klog.V(4).Infof("GetProcessInfoForCommand for %q", def.Id)
41
42 pid, exitStatus, err := k.getRemoteProcessPID(ctx, def, podName, containerName)
43 if err != nil {
44 return RemoteProcessInfo{}, err
45 }
46
47 return k.getProcessInfoFromPid(ctx, pid, exitStatus, podName, containerName)
48 }
49
50
51
52
53 func (k *kubeExecProcessHandler) StartProcessForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string, outputHandler CommandOutputHandler) error {
54 klog.V(4).Infof("StartProcessForCommand for %q", def.Id)
55
56
57 cmdLine := def.CmdLine
58 envCommands := make([]string, 0, len(def.EnvVars))
59 for _, envVar := range def.EnvVars {
60 envCommands = append(envCommands, fmt.Sprintf("%s='%s'", envVar.Key, envVar.Value))
61 }
62 var setEnvCmd string
63 if len(envCommands) != 0 {
64 setEnvCmd = fmt.Sprintf("export %s &&", strings.Join(envCommands, " "))
65 }
66
67 var cdCmd string
68 if def.WorkingDir != "" {
69
70 cdCmd = fmt.Sprintf("cd %s &&", def.WorkingDir)
71 }
72
73
74
75
76
77
78
79
80
81
82
83
84 pidFile := getPidFileForCommand(def)
85 cmd := []string{
86 ShellExecutable, "-c",
87 fmt.Sprintf("echo $$ > %[1]s && %s %s (%s) 1>>/proc/1/fd/1 2>>/proc/1/fd/2; echo $? >> %[1]s", pidFile, cdCmd, setEnvCmd, cmdLine),
88 }
89
90
91 type event struct {
92 status RemoteProcessStatus
93 stdout []string
94 stderr []string
95 err error
96 }
97 eventsChan := make(chan event)
98 eventsReceived := make(map[RemoteProcessStatus]struct{})
99 go func() {
100 for e := range eventsChan {
101 klog.V(5).Infof("event received for %q: %v, %v", def.Id, e.status, e.err)
102 if _, ok := eventsReceived[e.status]; ok {
103 continue
104 }
105 if outputHandler != nil {
106 outputHandler(e.status, e.stdout, e.stderr, e.err)
107 }
108 eventsReceived[e.status] = struct{}{}
109 }
110 }()
111
112 eventsChan <- event{status: Starting}
113
114 go func() {
115 eventsChan <- event{status: Running}
116 stdout, stderr, err := k.execClient.ExecuteCommand(ctx, cmd, podName, containerName, false, nil, nil)
117 if err != nil {
118 klog.V(2).Infof("error while running background command: %v", err)
119 }
120
121 processInfo, infoErr := k.GetProcessInfoForCommand(ctx, def, podName, containerName)
122 var status RemoteProcessStatus
123 if infoErr != nil {
124 status = Errored
125 } else {
126 status = processInfo.Status
127 }
128
129 eventsChan <- event{
130 status: status,
131 stdout: stdout,
132 stderr: stderr,
133 err: err,
134 }
135
136 close(eventsChan)
137 }()
138
139 return nil
140 }
141
142
143
144
145
146 func (k *kubeExecProcessHandler) StopProcessForCommand(ctx context.Context, def CommandDefinition, podName string, containerName string) error {
147 klog.V(4).Infof("StopProcessForCommand for %q", def.Id)
148
149 kill := func(p int) error {
150 _, _, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("kill %d || true", p)}, podName, containerName, false, nil, nil)
151 if err != nil {
152 return err
153 }
154
155
156
157 var processInfo interface{}
158 processInfo, err = task.NewRetryable(fmt.Sprintf("status for remote process %d", p), func() (bool, interface{}, error) {
159 pInfo, e := k.getProcessInfoFromPid(ctx, p, 0, podName, containerName)
160 return e == nil && (pInfo.Status == Stopped || pInfo.Status == Errored), pInfo, e
161 }).RetryWithSchedule([]time.Duration{2 * time.Second, 4 * time.Second, 8 * time.Second}, true)
162 if err != nil {
163 return err
164 }
165
166 pInfo, ok := processInfo.(RemoteProcessInfo)
167 if !ok {
168 klog.V(2).Infof("invalid type for remote process (%d) info, expected RemoteProcessInfo", p)
169 return fmt.Errorf("internal error while checking remote process status: %d", p)
170 }
171 if pInfo.Status != Stopped && pInfo.Status != Errored {
172 return fmt.Errorf("invalid status for remote process %d: %+v", p, processInfo)
173 }
174 return nil
175 }
176
177 ppid, _, err := k.getRemoteProcessPID(ctx, def, podName, containerName)
178 if err != nil {
179 return err
180 }
181 if ppid == 0 {
182 return nil
183 }
184 defer func() {
185 if kErr := kill(ppid); kErr != nil {
186 klog.V(3).Infof("could not kill parent process %d: %v", ppid, kErr)
187 }
188 }()
189
190 children, err := k.getProcessChildren(ctx, ppid, podName, containerName)
191 if err != nil {
192 return err
193 }
194
195 klog.V(3).Infof("Found %d children (either direct and indirect) for parent process %d: %v", len(children), ppid, children)
196
197 pidFile := getPidFileForCommand(def)
198 _, _, err = k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("rm -f %s", pidFile)}, podName, containerName, false, nil, nil)
199 if err != nil {
200 klog.V(2).Infof("Could not remove file %q: %v", pidFile, err)
201 }
202
203 for _, child := range children {
204 if err = kill(child); err != nil {
205 return err
206 }
207 }
208
209 return nil
210 }
211
212 func (k *kubeExecProcessHandler) getRemoteProcessPID(ctx context.Context, def CommandDefinition, podName string, containerName string) (int, int, error) {
213 pidFile := getPidFileForCommand(def)
214 stdout, stderr, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("cat %s || true", pidFile)}, podName, containerName, false, nil, nil)
215
216 if err != nil {
217 return 0, 0, err
218 }
219
220 if len(stdout) == 0 {
221
222 return 0, 0, nil
223 }
224 if len(stdout) > 2 {
225 return 0, 0, fmt.Errorf(
226 "unable to determine command status, due to unexpected number of lines for %s, output: %v %v",
227 pidFile, stdout, stderr)
228 }
229
230 line := stdout[0]
231 var pid int
232 pid, err = strconv.Atoi(strings.TrimSpace(line))
233 if err != nil {
234 klog.V(2).Infof("error while trying to retrieve status of command: %+v", err)
235 return 0, 0, fmt.Errorf("unable to determine command status, due to unexpected PID content for %s: %s",
236 pidFile, line)
237 }
238 if len(stdout) == 1 {
239 return pid, 0, nil
240 }
241
242 line2 := stdout[1]
243 var exitStatus int
244 exitStatus, err = strconv.Atoi(strings.TrimSpace(line2))
245 if err != nil {
246 klog.V(2).Infof("error while trying to retrieve status of command: %+v", err)
247 return pid, 0, fmt.Errorf("unable to determine command status, due to unexpected exit status content for %s: %s",
248 pidFile, line2)
249 }
250
251 return pid, exitStatus, nil
252 }
253
254 func (k *kubeExecProcessHandler) getProcessInfoFromPid(ctx context.Context, pid int, lastKnownExitStatus int, podName string, containerName string) (RemoteProcessInfo, error) {
255 process := RemoteProcessInfo{Pid: pid}
256
257 if pid < 0 {
258 process.Status = Unknown
259 return process, fmt.Errorf("invalid PID value for remote process: %d", pid)
260 }
261 if pid == 0 {
262 process.Status = Stopped
263 return process, nil
264 }
265
266
267 stdout, _, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", fmt.Sprintf("kill -0 %d; echo $?", pid)}, podName, containerName, false, nil, nil)
268
269 if err != nil {
270 process.Status = Unknown
271 return process, err
272 }
273
274 var killStatus int
275 killStatus, err = strconv.Atoi(strings.TrimSpace(stdout[0]))
276 if err != nil {
277 process.Status = Unknown
278 return process, err
279 }
280
281 if killStatus == 0 {
282 process.Status = Running
283 } else {
284 if lastKnownExitStatus == 0 {
285 process.Status = Stopped
286 } else {
287 process.Status = Errored
288 }
289 }
290
291 return process, nil
292 }
293
294
295
296
297
298 func (k *kubeExecProcessHandler) getProcessChildren(ctx context.Context, pid int, podName string, containerName string) ([]int, error) {
299 if pid <= 0 {
300 return nil, fmt.Errorf("invalid pid: %d", pid)
301 }
302
303 allProcesses, err := k.getAllProcesses(ctx, podName, containerName)
304 if err != nil {
305 return nil, err
306 }
307
308 var getProcessChildrenRec func(int) []int
309 getProcessChildrenRec = func(p int) []int {
310 var result []int
311 for _, child := range allProcesses[p] {
312 result = append(result, getProcessChildrenRec(child)...)
313 }
314 if p != pid {
315
316 result = append(result, p)
317 }
318 return result
319 }
320
321 return getProcessChildrenRec(pid), nil
322 }
323
324
325
326
327
328 func (k *kubeExecProcessHandler) getAllProcesses(ctx context.Context, podName string, containerName string) (map[int][]int, error) {
329 stdout, stderr, err := k.execClient.ExecuteCommand(ctx, []string{ShellExecutable, "-c", "cat /proc/*/stat || true"}, podName, containerName, false, nil, nil)
330 if err != nil {
331 klog.V(7).Infof("stdout: %s\n", strings.Join(stdout, "\n"))
332 klog.V(7).Infof("stderr: %s\n", strings.Join(stderr, "\n"))
333 return nil, err
334 }
335
336 allProcesses := make(map[int][]int)
337 for _, line := range stdout {
338 var pid int
339 _, err = fmt.Sscanf(line, "%d ", &pid)
340 if err != nil {
341 return nil, err
342 }
343
344
345
346 i := strings.LastIndex(line, ")")
347 if i < 0 {
348 continue
349 }
350
351
352
353
354
355 var state byte
356 var ppid int
357 _, err = fmt.Sscanf(line[i+2:], "%c %d", &state, &ppid)
358 if err != nil {
359 return nil, err
360 }
361
362 allProcesses[ppid] = append(allProcesses[ppid], pid)
363 }
364
365 return allProcesses, nil
366 }
367
368
369
370
371 func getPidFileForCommand(def CommandDefinition) string {
372 parentDir := def.PidDirectory
373 if parentDir == "" {
374 parentDir = storage.SharedDataMountPath
375 }
376 return fmt.Sprintf("%s/.odo_cmd_%s.pid", strings.TrimSuffix(parentDir, "/"), def.Id)
377 }
378
View as plain text