...

Source file src/github.com/redhat-developer/odo/pkg/watch/watch.go

Documentation: github.com/redhat-developer/odo/pkg/watch

     1  package watch
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"path/filepath"
    10  	"reflect"
    11  	"time"
    12  
    13  	"github.com/redhat-developer/odo/pkg/dev"
    14  	"github.com/redhat-developer/odo/pkg/dev/common"
    15  	"github.com/redhat-developer/odo/pkg/informer"
    16  
    17  	"github.com/redhat-developer/odo/pkg/kclient"
    18  	"github.com/redhat-developer/odo/pkg/labels"
    19  	"github.com/redhat-developer/odo/pkg/libdevfile"
    20  	"github.com/redhat-developer/odo/pkg/log"
    21  	odocontext "github.com/redhat-developer/odo/pkg/odo/context"
    22  
    23  	"github.com/fsnotify/fsnotify"
    24  	gitignore "github.com/sabhiram/go-gitignore"
    25  
    26  	appsv1 "k8s.io/api/apps/v1"
    27  	corev1 "k8s.io/api/core/v1"
    28  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/watch"
    31  	"k8s.io/klog"
    32  )
    33  
    34  const (
    35  	// PushErrorString is the string that is printed when an error occurs during watch's Push operation
    36  	PushErrorString = "Error occurred on Push"
    37  )
    38  
    39  type WatchClient struct {
    40  	kubeClient     kclient.ClientInterface
    41  	informerClient *informer.InformerClient
    42  
    43  	sourcesWatcher    *fsnotify.Watcher
    44  	deploymentWatcher watch.Interface
    45  	devfileWatcher    *fsnotify.Watcher
    46  	podWatcher        watch.Interface
    47  	warningsWatcher   watch.Interface
    48  	keyWatcher        <-chan byte
    49  
    50  	// true to force sync, used when manual sync
    51  	forceSync bool
    52  
    53  	// deploymentGeneration indicates the generation of the latest observed Deployment
    54  	deploymentGeneration int64
    55  	readyReplicas        int32
    56  }
    57  
    58  var _ Client = (*WatchClient)(nil)
    59  
    60  func NewWatchClient(
    61  	kubeClient kclient.ClientInterface,
    62  	informerClient *informer.InformerClient,
    63  ) *WatchClient {
    64  	return &WatchClient{
    65  		kubeClient:     kubeClient,
    66  		informerClient: informerClient,
    67  	}
    68  }
    69  
    70  // WatchParameters is designed to hold the controllables and attributes that the watch function works on
    71  type WatchParameters struct {
    72  	StartOptions dev.StartOptions
    73  
    74  	// Custom function that can be used to push detected changes to remote pod. For more info about what each of the parameters to this function, please refer, pkg/component/component.go#PushLocal
    75  	// WatchHandler func(kclient.ClientInterface, string, string, string, io.Writer, []string, []string, bool, []string, bool) error
    76  	// Custom function that can be used to push detected changes to remote devfile pod. For more info about what each of the parameters to this function, please refer, pkg/devfile/adapters/interface.go#PlatformAdapter
    77  	DevfileWatchHandler func(context.Context, common.PushParameters, *ComponentStatus) error
    78  	// Parameter whether or not to show build logs
    79  	Show bool
    80  	// DebugPort indicates which debug port to use for pushing after sync
    81  	DebugPort int
    82  
    83  	// WatchCluster indicates to watch Cluster-related objects (Deployment, Pod, etc)
    84  	WatchCluster bool
    85  }
    86  
    87  // evaluateChangesFunc evaluates any file changes for the events by ignoring the files in fileIgnores slice and removes
    88  // any deleted paths from the watcher. It returns a slice of changed files (if any) and paths that are deleted (if any)
    89  // by the events
    90  type evaluateChangesFunc func(events []fsnotify.Event, path string, fileIgnores []string, watcher *fsnotify.Watcher) (changedFiles, deletedPaths []string)
    91  
    92  // processEventsFunc processes the events received on the watcher. It uses the WatchParameters to trigger watch handler and writes to out
    93  // It returns a Duration after which to recall in case of error
    94  type processEventsFunc func(ctx context.Context, parameters WatchParameters, changedFiles, deletedPaths []string, componentStatus *ComponentStatus) error
    95  
    96  func (o *WatchClient) WatchAndPush(ctx context.Context, parameters WatchParameters, componentStatus ComponentStatus) error {
    97  	var (
    98  		devfileObj    = odocontext.GetEffectiveDevfileObj(ctx)
    99  		devfilePath   = odocontext.GetDevfilePath(ctx)
   100  		path          = filepath.Dir(devfilePath)
   101  		componentName = odocontext.GetComponentName(ctx)
   102  		appName       = odocontext.GetApplication(ctx)
   103  	)
   104  
   105  	klog.V(4).Infof("starting WatchAndPush, path: %s, component: %s, ignores %s", path, componentName, parameters.StartOptions.IgnorePaths)
   106  
   107  	var err error
   108  	if parameters.StartOptions.WatchFiles {
   109  		o.sourcesWatcher, err = getFullSourcesWatcher(path, parameters.StartOptions.IgnorePaths)
   110  		if err != nil {
   111  			return err
   112  		}
   113  	} else {
   114  		o.sourcesWatcher, err = fsnotify.NewWatcher()
   115  		if err != nil {
   116  			return err
   117  		}
   118  	}
   119  	defer o.sourcesWatcher.Close()
   120  
   121  	if parameters.WatchCluster {
   122  		selector := labels.GetSelector(componentName, appName, labels.ComponentDevMode, true)
   123  		o.deploymentWatcher, err = o.kubeClient.DeploymentWatcher(ctx, selector)
   124  		if err != nil {
   125  			return fmt.Errorf("error watching deployment: %v", err)
   126  		}
   127  
   128  		o.podWatcher, err = o.kubeClient.PodWatcher(ctx, selector)
   129  		if err != nil {
   130  			return err
   131  		}
   132  	} else {
   133  		o.deploymentWatcher = NewNoOpWatcher()
   134  		o.podWatcher = NewNoOpWatcher()
   135  	}
   136  
   137  	o.devfileWatcher, err = fsnotify.NewWatcher()
   138  	if err != nil {
   139  		return err
   140  	}
   141  	if parameters.StartOptions.WatchFiles {
   142  		var devfileFiles []string
   143  		devfileFiles, err = libdevfile.GetReferencedLocalFiles(*devfileObj)
   144  		if err != nil {
   145  			return err
   146  		}
   147  		devfileFiles = append(devfileFiles, devfilePath)
   148  		for _, f := range devfileFiles {
   149  			err = o.devfileWatcher.Add(f)
   150  			if err != nil {
   151  				klog.V(4).Infof("error adding watcher for path %s: %v", f, err)
   152  			}
   153  		}
   154  	}
   155  
   156  	if parameters.WatchCluster {
   157  		var isForbidden bool
   158  		o.warningsWatcher, isForbidden, err = o.kubeClient.PodWarningEventWatcher(ctx)
   159  		if err != nil {
   160  			return err
   161  		}
   162  		if isForbidden {
   163  			log.Fwarning(parameters.StartOptions.Out, "Unable to watch Events resource, warning Events won't be displayed")
   164  		}
   165  	} else {
   166  		o.warningsWatcher = NewNoOpWatcher()
   167  	}
   168  
   169  	o.keyWatcher = getKeyWatcher(ctx, parameters.StartOptions.Out)
   170  
   171  	err = o.processEvents(ctx, parameters, nil, nil, &componentStatus)
   172  	if err != nil {
   173  		return err
   174  	}
   175  
   176  	return o.eventWatcher(ctx, parameters, evaluateFileChanges, o.processEvents, componentStatus)
   177  }
   178  
   179  // eventWatcher loops till the context's Done channel indicates it to stop looping, at which point it performs cleanup.
   180  // While looping, it listens for filesystem events and processes these events using the WatchParameters to push to the remote pod.
   181  // It outputs any logs to the out io Writer
   182  func (o *WatchClient) eventWatcher(
   183  	ctx context.Context,
   184  	parameters WatchParameters,
   185  	evaluateChangesHandler evaluateChangesFunc,
   186  	processEventsHandler processEventsFunc,
   187  	componentStatus ComponentStatus,
   188  ) error {
   189  
   190  	var (
   191  		devfilePath   = odocontext.GetDevfilePath(ctx)
   192  		path          = filepath.Dir(devfilePath)
   193  		componentName = odocontext.GetComponentName(ctx)
   194  		appName       = odocontext.GetApplication(ctx)
   195  		out           = parameters.StartOptions.Out
   196  	)
   197  
   198  	var events []fsnotify.Event
   199  
   200  	// sourcesTimer helps collect multiple events that happen in a quick succession. We start with 1ms as we don't care much
   201  	// at this point. In the select block, however, every time we receive an event, we reset the sourcesTimer to watch for
   202  	// 100ms since receiving that event. This is done because a single filesystem event by the user triggers multiple
   203  	// events for fsnotify. It's a known-issue, but not really bug. For more info look at below issues:
   204  	//    - https://github.com/fsnotify/fsnotify/issues/122
   205  	//    - https://github.com/fsnotify/fsnotify/issues/344
   206  	sourcesTimer := time.NewTimer(time.Millisecond)
   207  	<-sourcesTimer.C
   208  
   209  	// devfileTimer has the same usage as sourcesTimer, for file events coming from devfileWatcher
   210  	devfileTimer := time.NewTimer(time.Millisecond)
   211  	<-devfileTimer.C
   212  
   213  	// deployTimer has the same usage as sourcesTimer, for events coming from watching Deployments, from deploymentWatcher
   214  	deployTimer := time.NewTimer(time.Millisecond)
   215  	<-deployTimer.C
   216  
   217  	podsPhases := NewPodPhases()
   218  
   219  	for {
   220  		select {
   221  		case event := <-o.sourcesWatcher.Events:
   222  			events = append(events, event)
   223  			// We are waiting for more events in this interval
   224  			sourcesTimer.Reset(100 * time.Millisecond)
   225  
   226  		case <-sourcesTimer.C:
   227  			// timer has fired
   228  			if !componentCanSyncFile(componentStatus.GetState()) {
   229  				klog.V(4).Infof("State of component is %q, don't sync sources", componentStatus.GetState())
   230  				continue
   231  			}
   232  
   233  			var changedFiles, deletedPaths []string
   234  			if !o.forceSync {
   235  				// first find the files that have changed (also includes the ones newly created) or deleted
   236  				changedFiles, deletedPaths = evaluateChangesHandler(events, path, parameters.StartOptions.IgnorePaths, o.sourcesWatcher)
   237  				// process the changes and sync files with remote pod
   238  				if len(changedFiles) == 0 && len(deletedPaths) == 0 {
   239  					continue
   240  				}
   241  			}
   242  
   243  			componentStatus.SetState(StateSyncOutdated)
   244  			fmt.Fprintf(out, "Pushing files...\n\n")
   245  			err := processEventsHandler(ctx, parameters, changedFiles, deletedPaths, &componentStatus)
   246  			o.forceSync = false
   247  			if err != nil {
   248  				return err
   249  			}
   250  			// empty the events to receive new events
   251  			if componentStatus.GetState() == StateReady {
   252  				events = []fsnotify.Event{} // empty the events slice to capture new events
   253  			}
   254  
   255  		case watchErr := <-o.sourcesWatcher.Errors:
   256  			return watchErr
   257  
   258  		case key := <-o.keyWatcher:
   259  			if key == 'p' {
   260  				o.forceSync = true
   261  				sourcesTimer.Reset(100 * time.Millisecond)
   262  			}
   263  
   264  		case <-parameters.StartOptions.PushWatcher:
   265  			o.forceSync = true
   266  			sourcesTimer.Reset(100 * time.Millisecond)
   267  
   268  		case ev := <-o.deploymentWatcher.ResultChan():
   269  			switch obj := ev.Object.(type) {
   270  			case *appsv1.Deployment:
   271  				klog.V(4).Infof("deployment watcher Event: Type: %s, name: %s, rv: %s, generation: %d, pods: %d\n",
   272  					ev.Type, obj.GetName(), obj.GetResourceVersion(), obj.GetGeneration(), obj.Status.ReadyReplicas)
   273  				if obj.GetGeneration() > o.deploymentGeneration || obj.Status.ReadyReplicas != o.readyReplicas {
   274  					o.deploymentGeneration = obj.GetGeneration()
   275  					o.readyReplicas = obj.Status.ReadyReplicas
   276  					deployTimer.Reset(300 * time.Millisecond)
   277  				}
   278  
   279  			case *metav1.Status:
   280  				klog.V(4).Infof("Status: %+v\n", obj)
   281  			}
   282  
   283  		case <-deployTimer.C:
   284  			err := processEventsHandler(ctx, parameters, nil, nil, &componentStatus)
   285  			if err != nil {
   286  				return err
   287  			}
   288  
   289  		case <-o.devfileWatcher.Events:
   290  			devfileTimer.Reset(100 * time.Millisecond)
   291  
   292  		case <-devfileTimer.C:
   293  			fmt.Fprintf(out, "Updating Component...\n\n")
   294  			err := processEventsHandler(ctx, parameters, nil, nil, &componentStatus)
   295  			if err != nil {
   296  				return err
   297  			}
   298  
   299  		case ev := <-o.podWatcher.ResultChan():
   300  			switch ev.Type {
   301  			case watch.Deleted:
   302  				pod, ok := ev.Object.(*corev1.Pod)
   303  				if !ok {
   304  					return errors.New("unable to decode watch event")
   305  				}
   306  				podsPhases.Delete(out, pod)
   307  			case watch.Added, watch.Modified:
   308  				pod, ok := ev.Object.(*corev1.Pod)
   309  				if !ok {
   310  					return errors.New("unable to decode watch event")
   311  				}
   312  				podsPhases.Add(out, pod.GetCreationTimestamp(), pod)
   313  			}
   314  
   315  		case ev := <-o.warningsWatcher.ResultChan():
   316  			switch kevent := ev.Object.(type) {
   317  			case *corev1.Event:
   318  				podName := kevent.InvolvedObject.Name
   319  				selector := labels.GetSelector(componentName, appName, labels.ComponentDevMode, true)
   320  				matching, err := o.kubeClient.IsPodNameMatchingSelector(ctx, podName, selector)
   321  				if err != nil {
   322  					return err
   323  				}
   324  				if matching {
   325  					log.Fwarning(out, kevent.Message)
   326  				}
   327  			}
   328  
   329  		case watchErr := <-o.devfileWatcher.Errors:
   330  			return watchErr
   331  
   332  		case <-ctx.Done():
   333  			klog.V(2).Info("Dev mode interrupted by user")
   334  			return nil
   335  		}
   336  	}
   337  }
   338  
   339  // evaluateFileChanges evaluates any file changes for the events. It ignores the files in fileIgnores slice related to path, and removes
   340  // any deleted paths from the watcher
   341  func evaluateFileChanges(events []fsnotify.Event, path string, fileIgnores []string, watcher *fsnotify.Watcher) ([]string, []string) {
   342  	var changedFiles []string
   343  	var deletedPaths []string
   344  
   345  	ignoreMatcher := gitignore.CompileIgnoreLines(fileIgnores...)
   346  
   347  	for _, event := range events {
   348  		klog.V(4).Infof("filesystem watch event: %s", event)
   349  		isIgnoreEvent := shouldIgnoreEvent(event)
   350  
   351  		// add file name to changedFiles only once
   352  		alreadyInChangedFiles := false
   353  		for _, cfile := range changedFiles {
   354  			if cfile == event.Name {
   355  				alreadyInChangedFiles = true
   356  				break
   357  			}
   358  		}
   359  
   360  		// Filter out anything in ignores list from the list of changed files
   361  		// This is important in spite of not watching the
   362  		// ignores paths because, when a directory that is ignored, is deleted,
   363  		// because its parent is watched, the fsnotify automatically raises an event
   364  		// for it.
   365  		var watchError error
   366  		rel, err := filepath.Rel(path, event.Name)
   367  		if err != nil {
   368  			watchError = fmt.Errorf("unable to get relative path of %q on %q", event.Name, path)
   369  		}
   370  		matched := ignoreMatcher.MatchesPath(rel)
   371  		if !alreadyInChangedFiles && !matched && !isIgnoreEvent {
   372  			// Append the new file change event to changedFiles if and only if the event is not a file remove event
   373  			if event.Op&fsnotify.Remove != fsnotify.Remove {
   374  				changedFiles = append(changedFiles, event.Name)
   375  			}
   376  		}
   377  
   378  		// Rename operation triggers RENAME event on old path + CREATE event for renamed path so delete old path in case of rename
   379  		// Also weirdly, fsnotify raises a RENAME event for deletion of files/folders with space in their name so even that should be handled here
   380  		if event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename {
   381  			// On remove/rename, stop watching the resource
   382  			if e := watcher.Remove(event.Name); e != nil {
   383  				klog.V(4).Infof("error removing watch for %s: %v", event.Name, e)
   384  			}
   385  			// Append the file to list of deleted files
   386  			// When a file/folder is deleted, it raises 2 events:
   387  			//	a. RENAME with event.Name empty
   388  			//	b. REMOVE with event.Name as file name
   389  			if !alreadyInChangedFiles && !matched && event.Name != "" {
   390  				deletedPaths = append(deletedPaths, event.Name)
   391  			}
   392  		} else {
   393  			// On other ops, recursively watch the resource (if applicable)
   394  			if e := addRecursiveWatch(watcher, path, event.Name, fileIgnores); e != nil && watchError == nil {
   395  				klog.V(4).Infof("Error occurred in addRecursiveWatch, setting watchError to %v", e)
   396  				watchError = e
   397  			}
   398  		}
   399  	}
   400  	deletedPaths = removeDuplicates(deletedPaths)
   401  
   402  	return changedFiles, deletedPaths
   403  }
   404  
   405  func (o *WatchClient) processEvents(
   406  	ctx context.Context,
   407  	parameters WatchParameters,
   408  	changedFiles, deletedPaths []string,
   409  	componentStatus *ComponentStatus,
   410  ) error {
   411  	var (
   412  		devfilePath = odocontext.GetDevfilePath(ctx)
   413  		path        = filepath.Dir(devfilePath)
   414  		out         = parameters.StartOptions.Out
   415  	)
   416  
   417  	for _, file := range removeDuplicates(append(changedFiles, deletedPaths...)) {
   418  		fmt.Fprintf(out, "\nFile %s changed\n", file)
   419  	}
   420  
   421  	var hasFirstSuccessfulPushOccurred bool
   422  
   423  	klog.V(4).Infof("Copying files %s to pod", changedFiles)
   424  
   425  	pushParams := common.PushParameters{
   426  		StartOptions:             parameters.StartOptions,
   427  		WatchFiles:               changedFiles,
   428  		WatchDeletedFiles:        deletedPaths,
   429  		DevfileScanIndexForWatch: !hasFirstSuccessfulPushOccurred,
   430  	}
   431  	oldStatus := *componentStatus
   432  	err := parameters.DevfileWatchHandler(ctx, pushParams, componentStatus)
   433  	if err != nil {
   434  		if isFatal(err) {
   435  			return err
   436  		}
   437  		klog.V(4).Infof("Error from Push: %v", err)
   438  		// Log and output, but intentionally not exiting on error here.
   439  		// We don't want to break watch when push failed, it might be fixed with the next push.
   440  		if kerrors.IsUnauthorized(err) || kerrors.IsForbidden(err) {
   441  			fmt.Fprintf(out, "Error connecting to the cluster. Please log in again\n\n")
   442  			var refreshed bool
   443  			refreshed, err = o.kubeClient.Refresh()
   444  			if err != nil {
   445  				fmt.Fprintf(out, "Error updating Kubernetes config: %s\n", err)
   446  			} else if refreshed {
   447  				fmt.Fprintf(out, "Updated Kubernetes config\n")
   448  			}
   449  		} else {
   450  			fmt.Fprintf(out, "%s - %s\n\n", PushErrorString, err.Error())
   451  			o.printInfoMessage(out, path, parameters.StartOptions.WatchFiles)
   452  		}
   453  		return nil
   454  	}
   455  	if oldStatus.GetState() != StateReady && componentStatus.GetState() == StateReady ||
   456  		!reflect.DeepEqual(oldStatus.EndpointsForwarded, componentStatus.EndpointsForwarded) {
   457  
   458  		o.printInfoMessage(out, path, parameters.StartOptions.WatchFiles)
   459  	}
   460  	return nil
   461  }
   462  
   463  func shouldIgnoreEvent(event fsnotify.Event) (ignoreEvent bool) {
   464  	if !(event.Op&fsnotify.Remove == fsnotify.Remove || event.Op&fsnotify.Rename == fsnotify.Rename) {
   465  		stat, err := os.Lstat(event.Name)
   466  		if err != nil {
   467  			// Some of the editors like vim and gedit, generate temporary buffer files during update to the file and deletes it soon after exiting from the editor
   468  			// So, its better to log the error rather than feeding it to error handler via `watchError = fmt.Errorf("unable to watch changes: %w", err)`,
   469  			// which will terminate the watch
   470  			klog.V(4).Infof("Failed getting details of the changed file %s. Ignoring the change", event.Name)
   471  		}
   472  		// Some of the editors generate temporary buffer files during update to the file and deletes it soon after exiting from the editor
   473  		// So, its better to log the error rather than feeding it to error handler via `watchError = fmt.Errorf("unable to watch changes: %w", err)`,
   474  		// which will terminate the watch
   475  		if stat == nil {
   476  			klog.V(4).Infof("Ignoring event for file %s as details about the file couldn't be fetched", event.Name)
   477  			ignoreEvent = true
   478  		}
   479  
   480  		// In windows, every new file created under a sub-directory of the watched directory, raises 2 events:
   481  		// 1. Write event for the directory under which the file was created
   482  		// 2. Create event for the file that was created
   483  		// Ignore 1 to avoid duplicate events.
   484  		if ignoreEvent || (stat.IsDir() && event.Op&fsnotify.Write == fsnotify.Write) {
   485  			ignoreEvent = true
   486  		}
   487  	}
   488  	return ignoreEvent
   489  }
   490  
   491  func removeDuplicates(input []string) []string {
   492  	valueMap := map[string]string{}
   493  	for _, str := range input {
   494  		valueMap[str] = str
   495  	}
   496  
   497  	result := []string{}
   498  	for str := range valueMap {
   499  		result = append(result, str)
   500  	}
   501  	return result
   502  }
   503  
   504  func (o *WatchClient) printInfoMessage(out io.Writer, path string, watchFiles bool) {
   505  	log.Sectionf("Dev mode")
   506  	if watchFiles {
   507  		fmt.Fprintf(
   508  			out,
   509  			" %s\n Watching for changes in the current directory %s\n\n",
   510  			log.Sbold("Status:"),
   511  			path,
   512  		)
   513  	}
   514  	fmt.Fprint(
   515  		out,
   516  		o.informerClient.GetInfo(),
   517  	)
   518  }
   519  
   520  func isFatal(err error) bool {
   521  	return errors.As(err, &common.ErrPortForward{})
   522  }
   523  

View as plain text