...

Source file src/github.com/redhat-developer/odo/pkg/port/port.go

Documentation: github.com/redhat-developer/odo/pkg/port

     1  package port
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"regexp"
     9  	"strconv"
    10  	"strings"
    11  	"time"
    12  
    13  	"github.com/segmentio/backo-go"
    14  	"golang.org/x/sync/errgroup"
    15  	"k8s.io/klog"
    16  
    17  	"github.com/redhat-developer/odo/pkg/api"
    18  	"github.com/redhat-developer/odo/pkg/exec"
    19  	"github.com/redhat-developer/odo/pkg/remotecmd"
    20  )
    21  
    22  // Order of values in the TCP States enum.
    23  // See https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h#n12
    24  var connectionStates = []string{
    25  	"ESTABLISHED",
    26  	"SYN_SENT",
    27  	"SYN_RECV",
    28  	"FIN_WAIT1",
    29  	"FIN_WAIT2",
    30  	"TIME_WAIT",
    31  	"CLOSE",
    32  	"CLOSE_WAIT",
    33  	"LAST_ACK",
    34  	"LISTEN",
    35  	"CLOSING",
    36  	"NEW_SYN_RECV",
    37  
    38  	"MAX_STATES", // Leave at the end!
    39  }
    40  
    41  // every 2 other characters
    42  var ipv4HexRegExp = regexp.MustCompile(".{2}")
    43  
    44  type Connection struct {
    45  	LocalAddress  string
    46  	LocalPort     int
    47  	RemoteAddress string
    48  	RemotePort    int
    49  	State         string
    50  }
    51  
    52  func (c Connection) String() string {
    53  	return fmt.Sprintf("[%s] %s:%d -> %s:%d", c.State, c.LocalAddress, c.LocalPort, c.RemoteAddress, c.RemotePort)
    54  }
    55  
    56  // DetectRemotePortsBoundOnLoopback filters the given ports by returning only those that are actually bound to the loopback interface in the specified container.
    57  func DetectRemotePortsBoundOnLoopback(ctx context.Context, execClient exec.Client, podName string, containerName string, ports []api.ForwardedPort) ([]api.ForwardedPort, error) {
    58  	if len(ports) == 0 {
    59  		return nil, nil
    60  	}
    61  
    62  	listening, err := GetListeningConnections(ctx, execClient, podName, containerName)
    63  	if err != nil {
    64  		return nil, err
    65  	}
    66  	var boundToLocalhost []api.ForwardedPort
    67  	for _, p := range ports {
    68  		for _, conn := range listening {
    69  			if p.ContainerPort != conn.LocalPort {
    70  				continue
    71  			}
    72  			klog.V(6).Infof("found listening connection matching container port %d: %s", p.ContainerPort, conn.String())
    73  			ip := net.ParseIP(conn.LocalAddress)
    74  			if ip == nil {
    75  				klog.V(6).Infof("invalid IP address: %q", conn.LocalAddress)
    76  				continue
    77  			}
    78  			if ip.IsLoopback() {
    79  				boundToLocalhost = append(boundToLocalhost, p)
    80  				break
    81  			}
    82  		}
    83  	}
    84  	return boundToLocalhost, nil
    85  }
    86  
    87  // GetListeningConnections retrieves information about ports being listened and on which local address in the specified container.
    88  // It works by parsing information from the /proc/net/{tcp,tcp6,udp,udp6} files, and is able to parse both IPv4 and IPv6 addresses.
    89  // See https://www.kernel.org/doc/Documentation/networking/proc_net_tcp.txt for more information about the structure of these files.
    90  func GetListeningConnections(ctx context.Context, execClient exec.Client, podName string, containerName string) ([]Connection, error) {
    91  	return GetConnections(ctx, execClient, podName, containerName, func(state int) bool {
    92  		return stateToString(state) == "LISTEN"
    93  	})
    94  }
    95  
    96  // GetConnections retrieves information about connections in the specified container.
    97  // It works by parsing information from the /proc/net/{tcp,tcp6,udp,udp6} files, and is able to parse both IPv4 and IPv6 addresses.
    98  // See https://www.kernel.org/doc/Documentation/networking/proc_net_tcp.txt for more information about the structure of these files.
    99  // The specified predicate allows to filter the connections based on the state.
   100  func GetConnections(ctx context.Context, execClient exec.Client, podName string, containerName string, statePredicate func(state int) bool) ([]Connection, error) {
   101  	cmd := []string{
   102  		remotecmd.ShellExecutable, "-c",
   103  		// /proc/net/{tc,ud}p6 files might be missing if IPv6 is disabled in the host networking stack.
   104  		// Actually /proc/net/{tc,ud}p* files might be totally missing if network stats are disabled.
   105  		"cat /proc/net/tcp /proc/net/udp /proc/net/tcp6 /proc/net/udp6 || true",
   106  	}
   107  	stdout, _, err := execClient.ExecuteCommand(ctx, cmd, podName, containerName, false, nil, nil)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  
   112  	hexToInt := func(hex string) (int, error) {
   113  		i, parseErr := strconv.ParseInt(hex, 16, 32)
   114  		if parseErr != nil {
   115  			return 0, parseErr
   116  		}
   117  		return int(i), nil
   118  	}
   119  
   120  	hexRevIpV4ToString := func(hex string) (string, error) {
   121  		parts := ipv4HexRegExp.FindAllString(hex, -1)
   122  		result := make([]string, 0, len(parts))
   123  		for i := len(parts) - 1; i >= 0; i-- {
   124  			toInt, parseErr := hexToInt(parts[i])
   125  			if parseErr != nil {
   126  				return "", parseErr
   127  			}
   128  			result = append(result, fmt.Sprintf("%d", toInt))
   129  		}
   130  		return strings.Join(result, "."), nil
   131  	}
   132  
   133  	hexRevIpV6ToString := func(hex string) (string, error) {
   134  		// In IPv6, each group of the address is 2 bytes long (4 hex characters).
   135  		// See https://www.rfc-editor.org/rfc/rfc4291#page-4
   136  		i := []string{
   137  			hex[30:32],
   138  			hex[28:30],
   139  			hex[26:28],
   140  			hex[24:26],
   141  			hex[22:24],
   142  			hex[20:22],
   143  			hex[18:20],
   144  			hex[16:18],
   145  			hex[14:16],
   146  			hex[12:14],
   147  			hex[10:12],
   148  			hex[8:10],
   149  			hex[6:8],
   150  			hex[4:6],
   151  			hex[2:4],
   152  			hex[0:2],
   153  		}
   154  		return fmt.Sprintf("%s%s:%s%s:%s%s:%s%s:%s%s:%s%s:%s%s:%s%s",
   155  			i[12], i[13], i[14], i[15],
   156  			i[8], i[9], i[10], i[11],
   157  			i[4], i[5], i[7], i[7],
   158  			i[0], i[1], i[2], i[3]), nil
   159  	}
   160  
   161  	parseAddrAndPort := func(s string) (addr string, port int, err error) {
   162  		addrPortList := strings.Split(s, ":")
   163  		if len(addrPortList) != 2 {
   164  			return "", 0, fmt.Errorf("invalid format - must be <addr>:<port>, but was %q", s)
   165  		}
   166  
   167  		addrHex := addrPortList[0]
   168  		switch len(addrHex) {
   169  		case 8:
   170  			addr, err = hexRevIpV4ToString(addrHex)
   171  		case 32:
   172  			addr, err = hexRevIpV6ToString(addrHex)
   173  		default:
   174  			err = fmt.Errorf("length must be 8 (IPv4) or 32 (IPv6), but was %d", len(addrHex))
   175  		}
   176  		if err != nil {
   177  			return "", 0, fmt.Errorf("could not decode address info from %q: %w", s, err)
   178  		}
   179  
   180  		portHex := addrPortList[1]
   181  		port, err = hexToInt(portHex)
   182  		if err != nil {
   183  			return "", 0, fmt.Errorf("could not decode port info from %q: %w", s, err)
   184  		}
   185  		return addr, port, nil
   186  	}
   187  
   188  	var connections []Connection
   189  	for _, l := range stdout {
   190  		if strings.Contains(l, "local_address") {
   191  			// ignore header lines
   192  			continue
   193  		}
   194  
   195  		/*
   196  			We are interested only in the first 4 values, which provide information about the local address, port and the connection state.
   197  			See https://www.kernel.org/doc/Documentation/networking/proc_net_tcp.txt
   198  
   199  					   46: 010310AC:9C4C 030310AC:1770 01
   200  					   |      |      |      |      |   |--> connection state
   201  					   |      |      |      |      |------> remote TCP port number
   202  					   |      |      |      |-------------> remote IPv4 address
   203  					   |      |      |--------------------> local TCP port number
   204  					   |      |---------------------------> local IPv4 address
   205  					   |----------------------------------> number of entry
   206  		*/
   207  		split := strings.SplitN(strings.TrimSpace(l), " ", 5)
   208  		if len(split) < 4 {
   209  			klog.V(5).Infof("ignored line %q because it has less than 4 space-separated elements", l)
   210  			continue
   211  		}
   212  		stateHex := split[3]
   213  		state, err := hexToInt(stateHex)
   214  		if err != nil {
   215  			klog.V(5).Infof("[warn] could not decode state info from line %q: %v", l, err)
   216  			continue
   217  		}
   218  		if statePredicate != nil && !statePredicate(state) {
   219  			klog.V(5).Infof("ignored line because state value does not pass predicate: %q", l)
   220  			continue
   221  		}
   222  
   223  		localAddr, localPort, err := parseAddrAndPort(split[1])
   224  		if err != nil {
   225  			klog.V(5).Infof("ignored line because it is not possible to determine local addr and port: %q", l)
   226  			continue
   227  		}
   228  		remoteAddr, remotePort, err := parseAddrAndPort(split[2])
   229  		if err != nil {
   230  			klog.V(5).Infof("ignored line because it is not possible to determine remote addr and port: %q", l)
   231  			continue
   232  		}
   233  
   234  		connections = append(connections, Connection{
   235  			LocalAddress:  localAddr,
   236  			LocalPort:     localPort,
   237  			RemoteAddress: remoteAddr,
   238  			RemotePort:    remotePort,
   239  			State:         stateToString(state),
   240  		})
   241  	}
   242  
   243  	return connections, nil
   244  }
   245  
   246  // CheckAppPortsListening checks whether all the specified ports are really opened and in LISTEN mode in each corresponding container
   247  // of the pod specified.
   248  // It does so by periodically looking inside the container for listening connections until it finds each of the specified ports,
   249  // or until the specified timeout has elapsed.
   250  func CheckAppPortsListening(
   251  	ctx context.Context,
   252  	execClient exec.Client,
   253  	podName string,
   254  	containerPortMapping map[string][]int,
   255  	timeout time.Duration,
   256  ) error {
   257  	if len(containerPortMapping) == 0 {
   258  		return nil
   259  	}
   260  
   261  	backOffBase := 1 * time.Second
   262  	if timeout <= backOffBase {
   263  		return fmt.Errorf("invalid timeout: %v, must be strictly greater than %v", timeout, backOffBase)
   264  	}
   265  
   266  	ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
   267  	defer cancel()
   268  
   269  	hasPortFn := func(connections []Connection, p int) bool {
   270  		for _, c := range connections {
   271  			if p == c.LocalPort {
   272  				return true
   273  			}
   274  		}
   275  		return false
   276  	}
   277  
   278  	notListeningChan := make(chan map[string][]int)
   279  
   280  	g := new(errgroup.Group)
   281  	for container, ports := range containerPortMapping {
   282  		container := container
   283  		ports := ports
   284  
   285  		if len(ports) == 0 {
   286  			continue
   287  		}
   288  
   289  		g.Go(func() error {
   290  			b := backo.NewBacko(backOffBase, 2, 0, 10*time.Second)
   291  			ticker := b.NewTicker()
   292  			portsNotListening := make(map[int]struct{})
   293  
   294  			for {
   295  				select {
   296  				case <-ctxWithTimeout.Done():
   297  					if len(portsNotListening) != 0 {
   298  						m := make(map[string][]int)
   299  						for p := range portsNotListening {
   300  							m[container] = append(m[container], p)
   301  						}
   302  						notListeningChan <- m
   303  					}
   304  					return ctxWithTimeout.Err()
   305  
   306  				case <-ticker.C:
   307  					connections, err := GetListeningConnections(ctx, execClient, podName, container)
   308  					if err != nil {
   309  						klog.V(3).Infof("error getting listening connections in container %q: %v", container, err)
   310  						for _, p := range ports {
   311  							portsNotListening[p] = struct{}{}
   312  						}
   313  					} else {
   314  						for _, p := range ports {
   315  							if hasPortFn(connections, p) {
   316  								delete(portsNotListening, p)
   317  								continue
   318  							}
   319  							klog.V(3).Infof("port %d not listening in container %q", p, container)
   320  							portsNotListening[p] = struct{}{}
   321  						}
   322  						if len(portsNotListening) == 0 {
   323  							// no error and all ports expected to be opened are opened at this point
   324  							return nil
   325  						}
   326  					}
   327  				}
   328  			}
   329  		})
   330  	}
   331  
   332  	// Buffer of 1 because we want to close notListeningChan (because we are iterating over it).
   333  	errChan := make(chan error, 1)
   334  	go func() {
   335  		errChan <- g.Wait()
   336  		close(notListeningChan)
   337  	}()
   338  
   339  	notListening := make(map[string][]int)
   340  	for e := range notListeningChan {
   341  		for c, ports := range e {
   342  			notListening[c] = append(notListening[c], ports...)
   343  		}
   344  	}
   345  
   346  	klog.V(4).Infof("ports not listening: %v", notListening)
   347  
   348  	if err := <-errChan; err != nil {
   349  		msg := "error"
   350  		if errors.Is(err, context.DeadlineExceeded) {
   351  			msg = "timeout"
   352  		}
   353  		msg += " while checking for ports"
   354  		if len(notListening) == 0 {
   355  			klog.V(4).Infof("%s and no unreachable port detected: %v", msg, err)
   356  			return nil
   357  		}
   358  		var msgList []string
   359  		for c, ports := range notListening {
   360  			var l []string
   361  			for _, p := range ports {
   362  				l = append(l, strconv.Itoa(p))
   363  			}
   364  			msgList = append(msgList, fmt.Sprintf("%s in container %q", strings.Join(l, ", "), c))
   365  		}
   366  		msg += fmt.Sprintf("; ports not listening: (%s)", strings.Join(msgList, "; "))
   367  		return fmt.Errorf("%s: %w", msg, err)
   368  	}
   369  
   370  	return nil
   371  }
   372  
   373  func stateToString(state int) string {
   374  	if state < 1 || state > len(connectionStates) {
   375  		return ""
   376  	}
   377  	return connectionStates[state-1]
   378  }
   379  

View as plain text