1 package watch
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "os"
9 "path/filepath"
10 "reflect"
11 "time"
12
13 "github.com/redhat-developer/odo/pkg/dev"
14 "github.com/redhat-developer/odo/pkg/dev/common"
15 "github.com/redhat-developer/odo/pkg/informer"
16
17 "github.com/redhat-developer/odo/pkg/kclient"
18 "github.com/redhat-developer/odo/pkg/labels"
19 "github.com/redhat-developer/odo/pkg/libdevfile"
20 "github.com/redhat-developer/odo/pkg/log"
21 odocontext "github.com/redhat-developer/odo/pkg/odo/context"
22
23 "github.com/fsnotify/fsnotify"
24 gitignore "github.com/sabhiram/go-gitignore"
25
26 appsv1 "k8s.io/api/apps/v1"
27 corev1 "k8s.io/api/core/v1"
28 kerrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/watch"
31 "k8s.io/klog"
32 )
33
34 const (
35
36 PushErrorString = "Error occurred on Push"
37 )
38
39 type WatchClient struct {
40 kubeClient kclient.ClientInterface
41 informerClient *informer.InformerClient
42
43 sourcesWatcher *fsnotify.Watcher
44 deploymentWatcher watch.Interface
45 devfileWatcher *fsnotify.Watcher
46 podWatcher watch.Interface
47 warningsWatcher watch.Interface
48 keyWatcher <-chan byte
49
50
51 forceSync bool
52
53
54 deploymentGeneration int64
55 readyReplicas int32
56 }
57
58 var _ Client = (*WatchClient)(nil)
59
60 func NewWatchClient(
61 kubeClient kclient.ClientInterface,
62 informerClient *informer.InformerClient,
63 ) *WatchClient {
64 return &WatchClient{
65 kubeClient: kubeClient,
66 informerClient: informerClient,
67 }
68 }
69
70
71 type WatchParameters struct {
72 StartOptions dev.StartOptions
73
74
75
76
77 DevfileWatchHandler func(context.Context, common.PushParameters, *ComponentStatus) error
78
79 Show bool
80
81 DebugPort int
82
83
84 WatchCluster bool
85 }
86
87
88
89
90 type evaluateChangesFunc func(events []fsnotify.Event, path string, fileIgnores []string, watcher *fsnotify.Watcher) (changedFiles, deletedPaths []string)
91
92
93
94 type processEventsFunc func(ctx context.Context, parameters WatchParameters, changedFiles, deletedPaths []string, componentStatus *ComponentStatus) error
95
96 func (o *WatchClient) WatchAndPush(ctx context.Context, parameters WatchParameters, componentStatus ComponentStatus) error {
97 var (
98 devfileObj = odocontext.GetEffectiveDevfileObj(ctx)
99 devfilePath = odocontext.GetDevfilePath(ctx)
100 path = filepath.Dir(devfilePath)
101 componentName = odocontext.GetComponentName(ctx)
102 appName = odocontext.GetApplication(ctx)
103 )
104
105 klog.V(4).Infof("starting WatchAndPush, path: %s, component: %s, ignores %s", path, componentName, parameters.StartOptions.IgnorePaths)
106
107 var err error
108 if parameters.StartOptions.WatchFiles {
109 o.sourcesWatcher, err = getFullSourcesWatcher(path, parameters.StartOptions.IgnorePaths)
110 if err != nil {
111 return err
112 }
113 } else {
114 o.sourcesWatcher, err = fsnotify.NewWatcher()
115 if err != nil {
116 return err
117 }
118 }
119 defer o.sourcesWatcher.Close()
120
121 if parameters.WatchCluster {
122 selector := labels.GetSelector(componentName, appName, labels.ComponentDevMode, true)
123 o.deploymentWatcher, err = o.kubeClient.DeploymentWatcher(ctx, selector)
124 if err != nil {
125 return fmt.Errorf("error watching deployment: %v", err)
126 }
127
128 o.podWatcher, err = o.kubeClient.PodWatcher(ctx, selector)
129 if err != nil {
130 return err
131 }
132 } else {
133 o.deploymentWatcher = NewNoOpWatcher()
134 o.podWatcher = NewNoOpWatcher()
135 }
136
137 o.devfileWatcher, err = fsnotify.NewWatcher()
138 if err != nil {
139 return err
140 }
141 if parameters.StartOptions.WatchFiles {
142 var devfileFiles []string
143 devfileFiles, err = libdevfile.GetReferencedLocalFiles(*devfileObj)
144 if err != nil {
145 return err
146 }
147 devfileFiles = append(devfileFiles, devfilePath)
148 for _, f := range devfileFiles {
149 err = o.devfileWatcher.Add(f)
150 if err != nil {
151 klog.V(4).Infof("error adding watcher for path %s: %v", f, err)
152 }
153 }
154 }
155
156 if parameters.WatchCluster {
157 var isForbidden bool
158 o.warningsWatcher, isForbidden, err = o.kubeClient.PodWarningEventWatcher(ctx)
159 if err != nil {
160 return err
161 }
162 if isForbidden {
163 log.Fwarning(parameters.StartOptions.Out, "Unable to watch Events resource, warning Events won't be displayed")
164 }
165 } else {
166 o.warningsWatcher = NewNoOpWatcher()
167 }
168
169 o.keyWatcher = getKeyWatcher(ctx, parameters.StartOptions.Out)
170
171 err = o.processEvents(ctx, parameters, nil, nil, &componentStatus)
172 if err != nil {
173 return err
174 }
175
176 return o.eventWatcher(ctx, parameters, evaluateFileChanges, o.processEvents, componentStatus)
177 }
178
179
180
181
182 func (o *WatchClient) eventWatcher(
183 ctx context.Context,
184 parameters WatchParameters,
185 evaluateChangesHandler evaluateChangesFunc,
186 processEventsHandler processEventsFunc,
187 componentStatus ComponentStatus,
188 ) error {
189
190 var (
191 devfilePath = odocontext.GetDevfilePath(ctx)
192 path = filepath.Dir(devfilePath)
193 componentName = odocontext.GetComponentName(ctx)
194 appName = odocontext.GetApplication(ctx)
195 out = parameters.StartOptions.Out
196 )
197
198 var events []fsnotify.Event
199
200
201
202
203
204
205
206 sourcesTimer := time.NewTimer(time.Millisecond)
207 <-sourcesTimer.C
208
209
210 devfileTimer := time.NewTimer(time.Millisecond)
211 <-devfileTimer.C
212
213
214 deployTimer := time.NewTimer(time.Millisecond)
215 <-deployTimer.C
216
217 podsPhases := NewPodPhases()
218
219 for {
220 select {
221 case event := <-o.sourcesWatcher.Events:
222 events = append(events, event)
223
224 sourcesTimer.Reset(100 * time.Millisecond)
225
226 case <-sourcesTimer.C:
227
228 if !componentCanSyncFile(componentStatus.GetState()) {
229 klog.V(4).Infof("State of component is %q, don't sync sources", componentStatus.GetState())
230 continue
231 }
232
233 var changedFiles, deletedPaths []string
234 if !o.forceSync {
235
236 changedFiles, deletedPaths = evaluateChangesHandler(events, path, parameters.StartOptions.IgnorePaths, o.sourcesWatcher)
237
238 if len(changedFiles) == 0 && len(deletedPaths) == 0 {
239 continue
240 }
241 }
242
243 componentStatus.SetState(StateSyncOutdated)
244 fmt.Fprintf(out, "Pushing files...\n\n")
245 err := processEventsHandler(ctx, parameters, changedFiles, deletedPaths, &componentStatus)
246 o.forceSync = false
247 if err != nil {
248 return err
249 }
250
251 if componentStatus.GetState() == StateReady {
252 events = []fsnotify.Event{}
253 }
254
255 case watchErr := <-o.sourcesWatcher.Errors:
256 return watchErr
257
258 case key := <-o.keyWatcher:
259 if key == 'p' {
260 o.forceSync = true
261 sourcesTimer.Reset(100 * time.Millisecond)
262 }
263
264 case <-parameters.StartOptions.PushWatcher:
265 o.forceSync = true
266 sourcesTimer.Reset(100 * time.Millisecond)
267
268 case ev := <-o.deploymentWatcher.ResultChan():
269 switch obj := ev.Object.(type) {
270 case *appsv1.Deployment:
271 klog.V(4).Infof("deployment watcher Event: Type: %s, name: %s, rv: %s, generation: %d, pods: %d\n",
272 ev.Type, obj.GetName(), obj.GetResourceVersion(), obj.GetGeneration(), obj.Status.ReadyReplicas)
273 if obj.GetGeneration() > o.deploymentGeneration || obj.Status.ReadyReplicas != o.readyReplicas {
274 o.deploymentGeneration = obj.GetGeneration()
275 o.readyReplicas = obj.Status.ReadyReplicas
276 deployTimer.Reset(300 * time.Millisecond)
277 }
278
279 case *metav1.Status:
280 klog.V(4).Infof("Status: %+v\n", obj)
281 }
282
283 case <-deployTimer.C:
284 err := processEventsHandler(ctx, parameters, nil, nil, &componentStatus)
285 if err != nil {
286 return err
287 }
288
289 case <-o.devfileWatcher.Events:
290 devfileTimer.Reset(100 * time.Millisecond)
291
292 case <-devfileTimer.C:
293 fmt.Fprintf(out, "Updating Component...\n\n")
294 err := processEventsHandler(ctx, parameters, nil, nil, &componentStatus)
295 if err != nil {
296 return err
297 }
298
299 case ev := <-o.podWatcher.ResultChan():
300 switch ev.Type {
301 case watch.Deleted:
302 pod, ok := ev.Object.(*corev1.Pod)
303 if !ok {
304 return errors.New("unable to decode watch event")
305 }
306 podsPhases.Delete(out, pod)
307 case watch.Added, watch.Modified:
308 pod, ok := ev.Object.(*corev1.Pod)
309 if !ok {
310 return errors.New("unable to decode watch event")
311 }
312 podsPhases.Add(out, pod.GetCreationTimestamp(), pod)
313 }
314
315 case ev := <-o.warningsWatcher.ResultChan():
316 switch kevent := ev.Object.(type) {
317 case *corev1.Event:
318 podName := kevent.InvolvedObject.Name
319 selector := labels.GetSelector(componentName, appName, labels.ComponentDevMode, true)
320 matching, err := o.kubeClient.IsPodNameMatchingSelector(ctx, podName, selector)
321 if err != nil {
322 return err
323 }
324 if matching {
325 log.Fwarning(out, kevent.Message)
326 }
327 }
328
329 case watchErr := <-o.devfileWatcher.Errors:
330 return watchErr
331
332 case <-ctx.Done():
333 klog.V(2).Info("Dev mode interrupted by user")
334 return nil
335 }
336 }
337 }
338
339
340
341 func evaluateFileChanges(events []fsnotify.Event, path string, fileIgnores []string, watcher *fsnotify.Watcher) ([]string, []string) {
342 var changedFiles []string
343 var deletedPaths []string
344
345 ignoreMatcher := gitignore.CompileIgnoreLines(fileIgnores...)
346
347 for _, event := range events {
348 klog.V(4).Infof("filesystem watch event: %s", event)
349 isIgnoreEvent := shouldIgnoreEvent(event)
350
351
352 alreadyInChangedFiles := false
353 for _, cfile := range changedFiles {
354 if cfile == event.Name {
355 alreadyInChangedFiles = true
356 break
357 }
358 }
359
360
361
362
363
364
365 var watchError error
366 rel, err := filepath.Rel(path, event.Name)
367 if err != nil {
368 watchError = fmt.Errorf("unable to get relative path of %q on %q", event.Name, path)
369 }
370 matched := ignoreMatcher.MatchesPath(rel)
371 if !alreadyInChangedFiles && !matched && !isIgnoreEvent {
372
373 if event.Op&fsnotify.Remove != fsnotify.Remove {
374 changedFiles = append(changedFiles, event.Name)
375 }
376 }
377
378
379
380 if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename {
381
382 if e := watcher.Remove(event.Name); e != nil {
383 klog.V(4).Infof("error removing watch for %s: %v", event.Name, e)
384 }
385
386
387
388
389 if !alreadyInChangedFiles && !matched && event.Name != "" {
390 deletedPaths = append(deletedPaths, event.Name)
391 }
392 } else {
393
394 if e := addRecursiveWatch(watcher, path, event.Name, fileIgnores); e != nil && watchError == nil {
395 klog.V(4).Infof("Error occurred in addRecursiveWatch, setting watchError to %v", e)
396 watchError = e
397 }
398 }
399 }
400 deletedPaths = removeDuplicates(deletedPaths)
401
402 return changedFiles, deletedPaths
403 }
404
405 func (o *WatchClient) processEvents(
406 ctx context.Context,
407 parameters WatchParameters,
408 changedFiles, deletedPaths []string,
409 componentStatus *ComponentStatus,
410 ) error {
411 var (
412 devfilePath = odocontext.GetDevfilePath(ctx)
413 path = filepath.Dir(devfilePath)
414 out = parameters.StartOptions.Out
415 )
416
417 for _, file := range removeDuplicates(append(changedFiles, deletedPaths...)) {
418 fmt.Fprintf(out, "\nFile %s changed\n", file)
419 }
420
421 var hasFirstSuccessfulPushOccurred bool
422
423 klog.V(4).Infof("Copying files %s to pod", changedFiles)
424
425 pushParams := common.PushParameters{
426 StartOptions: parameters.StartOptions,
427 WatchFiles: changedFiles,
428 WatchDeletedFiles: deletedPaths,
429 DevfileScanIndexForWatch: !hasFirstSuccessfulPushOccurred,
430 }
431 oldStatus := *componentStatus
432 err := parameters.DevfileWatchHandler(ctx, pushParams, componentStatus)
433 if err != nil {
434 if isFatal(err) {
435 return err
436 }
437 klog.V(4).Infof("Error from Push: %v", err)
438
439
440 if kerrors.IsUnauthorized(err) || kerrors.IsForbidden(err) {
441 fmt.Fprintf(out, "Error connecting to the cluster. Please log in again\n\n")
442 var refreshed bool
443 refreshed, err = o.kubeClient.Refresh()
444 if err != nil {
445 fmt.Fprintf(out, "Error updating Kubernetes config: %s\n", err)
446 } else if refreshed {
447 fmt.Fprintf(out, "Updated Kubernetes config\n")
448 }
449 } else {
450 fmt.Fprintf(out, "%s - %s\n\n", PushErrorString, err.Error())
451 o.printInfoMessage(out, path, parameters.StartOptions.WatchFiles)
452 }
453 return nil
454 }
455 if oldStatus.GetState() != StateReady && componentStatus.GetState() == StateReady ||
456 !reflect.DeepEqual(oldStatus.EndpointsForwarded, componentStatus.EndpointsForwarded) {
457
458 o.printInfoMessage(out, path, parameters.StartOptions.WatchFiles)
459 }
460 return nil
461 }
462
463 func shouldIgnoreEvent(event fsnotify.Event) (ignoreEvent bool) {
464 if !(event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename) {
465 stat, err := os.Lstat(event.Name)
466 if err != nil {
467
468
469
470 klog.V(4).Infof("Failed getting details of the changed file %s. Ignoring the change", event.Name)
471 }
472
473
474
475 if stat == nil {
476 klog.V(4).Infof("Ignoring event for file %s as details about the file couldn't be fetched", event.Name)
477 ignoreEvent = true
478 }
479
480
481
482
483
484 if ignoreEvent || (stat.IsDir() && event.Op&fsnotify.Write == fsnotify.Write) {
485 ignoreEvent = true
486 }
487 }
488 return ignoreEvent
489 }
490
491 func removeDuplicates(input []string) []string {
492 valueMap := map[string]string{}
493 for _, str := range input {
494 valueMap[str] = str
495 }
496
497 result := []string{}
498 for str := range valueMap {
499 result = append(result, str)
500 }
501 return result
502 }
503
504 func (o *WatchClient) printInfoMessage(out io.Writer, path string, watchFiles bool) {
505 log.Sectionf("Dev mode")
506 if watchFiles {
507 fmt.Fprintf(
508 out,
509 " %s\n Watching for changes in the current directory %s\n\n",
510 log.Sbold("Status:"),
511 path,
512 )
513 }
514 fmt.Fprint(
515 out,
516 o.informerClient.GetInfo(),
517 )
518 }
519
520 func isFatal(err error) bool {
521 return errors.As(err, &common.ErrPortForward{})
522 }
523
View as plain text