...

Source file src/github.com/redhat-developer/odo/pkg/logs/logs.go

Documentation: github.com/redhat-developer/odo/pkg/logs

     1  package logs
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"fmt"
     7  	"io"
     8  	"strconv"
     9  	"strings"
    10  	"sync"
    11  	"sync/atomic"
    12  
    13  	corev1 "k8s.io/api/core/v1"
    14  	"k8s.io/apimachinery/pkg/watch"
    15  
    16  	"github.com/fatih/color"
    17  	odolabels "github.com/redhat-developer/odo/pkg/labels"
    18  	"github.com/redhat-developer/odo/pkg/log"
    19  	odocontext "github.com/redhat-developer/odo/pkg/odo/context"
    20  	"github.com/redhat-developer/odo/pkg/platform"
    21  )
    22  
    23  type LogsClient struct {
    24  	platformClient platform.Client
    25  }
    26  
    27  type ContainerLogs struct {
    28  	PodName       string
    29  	ContainerName string
    30  	Logs          io.ReadCloser
    31  }
    32  
    33  type Events struct {
    34  	// channel to put the container logs on
    35  	Logs chan ContainerLogs
    36  	// channel to put an error on, if any
    37  	Err chan error
    38  	// channel to indicate that logs for all pods have been grabbed; not to be populated if --follow is used
    39  	Done chan struct{}
    40  }
    41  
    42  var _ Client = (*LogsClient)(nil)
    43  
    44  func NewLogsClient(platformClient platform.Client) *LogsClient {
    45  	return &LogsClient{
    46  		platformClient: platformClient,
    47  	}
    48  }
    49  
    50  var _ Client = (*LogsClient)(nil)
    51  
    52  func (o *LogsClient) DisplayLogs(
    53  	ctx context.Context,
    54  	mode string,
    55  	componentName string,
    56  	namespace string,
    57  	follow bool,
    58  	out io.Writer,
    59  ) error {
    60  	events, err := o.GetLogsForMode(
    61  		ctx,
    62  		mode,
    63  		componentName,
    64  		namespace,
    65  		follow,
    66  	)
    67  	if err != nil {
    68  		return err
    69  	}
    70  
    71  	uniqueContainerNames := map[string]struct{}{}
    72  	var goroutines struct{ count int64 } // keep a track of running goroutines so that we don't exit prematurely
    73  	errChan := make(chan error)          // errors are put on this channel
    74  	var mu sync.Mutex
    75  
    76  	displayedLogs := map[string]struct{}{}
    77  	for {
    78  		select {
    79  		case containerLogs := <-events.Logs:
    80  			podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName)
    81  			if _, ok := displayedLogs[podContainerName]; ok {
    82  				continue
    83  			}
    84  			displayedLogs[podContainerName] = struct{}{}
    85  
    86  			uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames)
    87  			uniqueContainerNames[uniqueName] = struct{}{}
    88  			colour := log.ColorPicker()
    89  			logs := containerLogs.Logs
    90  
    91  			func() {
    92  				mu.Lock()
    93  				defer mu.Unlock()
    94  				color.Set(colour)
    95  				defer color.Unset()
    96  				help := ""
    97  				if uniqueName != containerLogs.ContainerName {
    98  					help = fmt.Sprintf(" (%s)", uniqueName)
    99  				}
   100  				_, err = fmt.Fprintf(out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help)
   101  				if err != nil {
   102  					errChan <- err
   103  				}
   104  			}()
   105  
   106  			if follow {
   107  				atomic.AddInt64(&goroutines.count, 1)
   108  				go func(out io.Writer) {
   109  					defer func() {
   110  						atomic.AddInt64(&goroutines.count, -1)
   111  					}()
   112  					err = printLogs(uniqueName, logs, out, colour, &mu)
   113  					if err != nil {
   114  						errChan <- err
   115  					}
   116  					delete(displayedLogs, podContainerName)
   117  					events.Done <- struct{}{}
   118  				}(out)
   119  			} else {
   120  				err = printLogs(uniqueName, logs, out, colour, &mu)
   121  				if err != nil {
   122  					return err
   123  				}
   124  			}
   125  		case err = <-errChan:
   126  			return err
   127  		case err = <-events.Err:
   128  			return err
   129  		case <-events.Done:
   130  			if !follow && goroutines.count == 0 {
   131  				if len(uniqueContainerNames) == 0 {
   132  					// This will be the case when:
   133  					// 1. user specifies --dev flag, but the component's running in Deploy mode
   134  					// 2. user specified --deploy flag, but the component's running in Dev mode
   135  					// 3. user passes no flag, but component is running in neither Dev nor Deploy mode
   136  					fmt.Fprintf(out, "no containers running in the specified mode for the component %q\n", componentName)
   137  				}
   138  				return nil
   139  			}
   140  		case <-ctx.Done():
   141  			return nil
   142  		}
   143  	}
   144  }
   145  
   146  func getUniqueContainerName(name string, uniqueNames map[string]struct{}) string {
   147  	if _, ok := uniqueNames[name]; ok {
   148  		// name already present in uniqueNames; find another name
   149  		// first check if last character in name is a number; if so increment it, else append name with [1]
   150  		var numStr string
   151  		var last int
   152  		var err error
   153  
   154  		split := strings.Split(name, "[")
   155  		if len(split) == 2 {
   156  			numStr = strings.Trim(split[1], "]")
   157  			last, err = strconv.Atoi(numStr)
   158  			if err != nil {
   159  				return ""
   160  			}
   161  			last++
   162  		} else {
   163  			last = 1
   164  		}
   165  		name = fmt.Sprintf("%s[%d]", split[0], last)
   166  		return getUniqueContainerName(name, uniqueNames)
   167  	}
   168  	return name
   169  }
   170  
   171  // printLogs prints the logs of the containers with container name prefixed to the log message
   172  func printLogs(containerName string, rd io.ReadCloser, out io.Writer, colour color.Attribute, mu *sync.Mutex) error {
   173  	scanner := bufio.NewScanner(rd)
   174  	scanner.Split(bufio.ScanLines)
   175  
   176  	for scanner.Scan() {
   177  		line := scanner.Text()
   178  		err := func() error {
   179  			mu.Lock()
   180  			defer mu.Unlock()
   181  			color.Set(colour)
   182  			defer color.Unset()
   183  
   184  			_, err := fmt.Fprintln(out, containerName+": "+line)
   185  			return err
   186  		}()
   187  		if err != nil {
   188  			return err
   189  		}
   190  	}
   191  
   192  	return nil
   193  }
   194  
   195  func (o *LogsClient) GetLogsForMode(
   196  	ctx context.Context,
   197  	mode string,
   198  	componentName string,
   199  	namespace string,
   200  	follow bool,
   201  ) (Events, error) {
   202  	events := Events{
   203  		Logs: make(chan ContainerLogs),
   204  		Err:  make(chan error),
   205  		Done: make(chan struct{}),
   206  	}
   207  
   208  	go o.getLogsForMode(ctx, events, mode, componentName, namespace, follow)
   209  	return events, nil
   210  }
   211  
   212  func (o *LogsClient) getLogsForMode(
   213  	ctx context.Context,
   214  	events Events,
   215  	mode string,
   216  	componentName string,
   217  	namespace string,
   218  	follow bool,
   219  ) {
   220  	var selector string
   221  	podChan := make(chan corev1.Pod) // grab the logs of the pod put on this channel
   222  	errChan := make(chan error)
   223  	doneChan := make(chan struct{}) // because populating doneChan directly would cause odo logs to exit prematurely.
   224  
   225  	go func() {
   226  		// this go routine gets the logs of the pods put on the podChan
   227  		for {
   228  			select {
   229  			case pod := <-podChan:
   230  				for _, container := range pod.Spec.Containers {
   231  					containerLogs, err := o.platformClient.GetPodLogs(pod.Name, container.Name, follow)
   232  					if err != nil {
   233  						events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err)
   234  					}
   235  					events.Logs <- ContainerLogs{
   236  						PodName:       pod.GetName(),
   237  						ContainerName: container.Name,
   238  						Logs:          containerLogs,
   239  					}
   240  				}
   241  			case err := <-errChan:
   242  				events.Err <- err
   243  			case <-doneChan:
   244  				events.Done <- struct{}{}
   245  			}
   246  		}
   247  	}()
   248  
   249  	appname := odocontext.GetApplication(ctx)
   250  
   251  	getPods := func() error {
   252  		if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
   253  			selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
   254  			err := o.getPodsForSelector(selector, namespace, podChan)
   255  			if err != nil {
   256  				return err
   257  			}
   258  		}
   259  		if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
   260  			selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
   261  			err := o.getPodsForSelector(selector, namespace, podChan)
   262  			if err != nil {
   263  				return err
   264  			}
   265  		}
   266  		return nil
   267  	}
   268  
   269  	err := getPods()
   270  	if err != nil {
   271  		errChan <- err
   272  	}
   273  
   274  	if follow {
   275  		podWatcher, err := o.platformClient.PodWatcher(ctx, "")
   276  		if err != nil {
   277  			errChan <- err
   278  		}
   279  	outer:
   280  		for {
   281  			select {
   282  			case ev := <-podWatcher.ResultChan():
   283  				switch ev.Type {
   284  				case watch.Added, watch.Modified:
   285  					err = getPods()
   286  					if err != nil {
   287  						errChan <- err
   288  					}
   289  				}
   290  
   291  			case <-ctx.Done():
   292  				break outer
   293  			}
   294  		}
   295  	}
   296  
   297  	doneChan <- struct{}{}
   298  }
   299  
   300  // getPodsForSelector gets pods for the resources matching selector in the namespace; Pods found by this method will be
   301  // put on podChan so that caller function can fetch its logs
   302  func (o *LogsClient) getPodsForSelector(
   303  	selector string,
   304  	namespace string,
   305  	podChan chan corev1.Pod,
   306  ) error {
   307  	// set of unique Pods with Pod name as key; these are the Pods whose logs we want to get from the cluster
   308  	pods := map[string]struct{}{}
   309  
   310  	podList, err := o.platformClient.GetPodsMatchingSelector(selector)
   311  	if err != nil {
   312  		return err
   313  	}
   314  	for _, pod := range podList.Items {
   315  		if pod.Status.Phase == "Running" {
   316  			pods[pod.GetName()] = struct{}{}
   317  		}
   318  	}
   319  
   320  	// get all pods in the namespace
   321  	podsInNs, err := o.platformClient.GetAllPodsInNamespaceMatchingSelector(selector, namespace)
   322  	if err != nil {
   323  		return err
   324  	}
   325  
   326  	for _, pod := range podsInNs.Items {
   327  		if _, ok := pods[pod.GetName()]; ok {
   328  			// Pod's logs have already been displayed to user
   329  			continue
   330  		}
   331  		if pod.Status.Phase == "Running" {
   332  			podList.Items = append(podList.Items, pod)
   333  		}
   334  	}
   335  
   336  	for _, pod := range podList.Items {
   337  		if pod.Status.Phase == "Running" {
   338  			podChan <- pod
   339  		}
   340  	}
   341  
   342  	return nil
   343  }
   344  

View as plain text