1 package port
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "regexp"
9 "strconv"
10 "strings"
11 "time"
12
13 "github.com/segmentio/backo-go"
14 "golang.org/x/sync/errgroup"
15 "k8s.io/klog"
16
17 "github.com/redhat-developer/odo/pkg/api"
18 "github.com/redhat-developer/odo/pkg/exec"
19 "github.com/redhat-developer/odo/pkg/remotecmd"
20 )
21
22
23
24 var connectionStates = []string{
25 "ESTABLISHED",
26 "SYN_SENT",
27 "SYN_RECV",
28 "FIN_WAIT1",
29 "FIN_WAIT2",
30 "TIME_WAIT",
31 "CLOSE",
32 "CLOSE_WAIT",
33 "LAST_ACK",
34 "LISTEN",
35 "CLOSING",
36 "NEW_SYN_RECV",
37
38 "MAX_STATES",
39 }
40
41
42 var ipv4HexRegExp = regexp.MustCompile(".{2}")
43
44 type Connection struct {
45 LocalAddress string
46 LocalPort int
47 RemoteAddress string
48 RemotePort int
49 State string
50 }
51
52 func (c Connection) String() string {
53 return fmt.Sprintf("[%s] %s:%d -> %s:%d", c.State, c.LocalAddress, c.LocalPort, c.RemoteAddress, c.RemotePort)
54 }
55
56
57 func DetectRemotePortsBoundOnLoopback(ctx context.Context, execClient exec.Client, podName string, containerName string, ports []api.ForwardedPort) ([]api.ForwardedPort, error) {
58 if len(ports) == 0 {
59 return nil, nil
60 }
61
62 listening, err := GetListeningConnections(ctx, execClient, podName, containerName)
63 if err != nil {
64 return nil, err
65 }
66 var boundToLocalhost []api.ForwardedPort
67 for _, p := range ports {
68 for _, conn := range listening {
69 if p.ContainerPort != conn.LocalPort {
70 continue
71 }
72 klog.V(6).Infof("found listening connection matching container port %d: %s", p.ContainerPort, conn.String())
73 ip := net.ParseIP(conn.LocalAddress)
74 if ip == nil {
75 klog.V(6).Infof("invalid IP address: %q", conn.LocalAddress)
76 continue
77 }
78 if ip.IsLoopback() {
79 boundToLocalhost = append(boundToLocalhost, p)
80 break
81 }
82 }
83 }
84 return boundToLocalhost, nil
85 }
86
87
88
89
90 func GetListeningConnections(ctx context.Context, execClient exec.Client, podName string, containerName string) ([]Connection, error) {
91 return GetConnections(ctx, execClient, podName, containerName, func(state int) bool {
92 return stateToString(state) == "LISTEN"
93 })
94 }
95
96
97
98
99
100 func GetConnections(ctx context.Context, execClient exec.Client, podName string, containerName string, statePredicate func(state int) bool) ([]Connection, error) {
101 cmd := []string{
102 remotecmd.ShellExecutable, "-c",
103
104
105 "cat /proc/net/tcp /proc/net/udp /proc/net/tcp6 /proc/net/udp6 || true",
106 }
107 stdout, _, err := execClient.ExecuteCommand(ctx, cmd, podName, containerName, false, nil, nil)
108 if err != nil {
109 return nil, err
110 }
111
112 hexToInt := func(hex string) (int, error) {
113 i, parseErr := strconv.ParseInt(hex, 16, 32)
114 if parseErr != nil {
115 return 0, parseErr
116 }
117 return int(i), nil
118 }
119
120 hexRevIpV4ToString := func(hex string) (string, error) {
121 parts := ipv4HexRegExp.FindAllString(hex, -1)
122 result := make([]string, 0, len(parts))
123 for i := len(parts) - 1; i >= 0; i-- {
124 toInt, parseErr := hexToInt(parts[i])
125 if parseErr != nil {
126 return "", parseErr
127 }
128 result = append(result, fmt.Sprintf("%d", toInt))
129 }
130 return strings.Join(result, "."), nil
131 }
132
133 hexRevIpV6ToString := func(hex string) (string, error) {
134
135
136 i := []string{
137 hex[30:32],
138 hex[28:30],
139 hex[26:28],
140 hex[24:26],
141 hex[22:24],
142 hex[20:22],
143 hex[18:20],
144 hex[16:18],
145 hex[14:16],
146 hex[12:14],
147 hex[10:12],
148 hex[8:10],
149 hex[6:8],
150 hex[4:6],
151 hex[2:4],
152 hex[0:2],
153 }
154 return fmt.Sprintf("%s%s:%s%s:%s%s:%s%s:%s%s:%s%s:%s%s:%s%s",
155 i[12], i[13], i[14], i[15],
156 i[8], i[9], i[10], i[11],
157 i[4], i[5], i[7], i[7],
158 i[0], i[1], i[2], i[3]), nil
159 }
160
161 parseAddrAndPort := func(s string) (addr string, port int, err error) {
162 addrPortList := strings.Split(s, ":")
163 if len(addrPortList) != 2 {
164 return "", 0, fmt.Errorf("invalid format - must be <addr>:<port>, but was %q", s)
165 }
166
167 addrHex := addrPortList[0]
168 switch len(addrHex) {
169 case 8:
170 addr, err = hexRevIpV4ToString(addrHex)
171 case 32:
172 addr, err = hexRevIpV6ToString(addrHex)
173 default:
174 err = fmt.Errorf("length must be 8 (IPv4) or 32 (IPv6), but was %d", len(addrHex))
175 }
176 if err != nil {
177 return "", 0, fmt.Errorf("could not decode address info from %q: %w", s, err)
178 }
179
180 portHex := addrPortList[1]
181 port, err = hexToInt(portHex)
182 if err != nil {
183 return "", 0, fmt.Errorf("could not decode port info from %q: %w", s, err)
184 }
185 return addr, port, nil
186 }
187
188 var connections []Connection
189 for _, l := range stdout {
190 if strings.Contains(l, "local_address") {
191
192 continue
193 }
194
195
207 split := strings.SplitN(strings.TrimSpace(l), " ", 5)
208 if len(split) < 4 {
209 klog.V(5).Infof("ignored line %q because it has less than 4 space-separated elements", l)
210 continue
211 }
212 stateHex := split[3]
213 state, err := hexToInt(stateHex)
214 if err != nil {
215 klog.V(5).Infof("[warn] could not decode state info from line %q: %v", l, err)
216 continue
217 }
218 if statePredicate != nil && !statePredicate(state) {
219 klog.V(5).Infof("ignored line because state value does not pass predicate: %q", l)
220 continue
221 }
222
223 localAddr, localPort, err := parseAddrAndPort(split[1])
224 if err != nil {
225 klog.V(5).Infof("ignored line because it is not possible to determine local addr and port: %q", l)
226 continue
227 }
228 remoteAddr, remotePort, err := parseAddrAndPort(split[2])
229 if err != nil {
230 klog.V(5).Infof("ignored line because it is not possible to determine remote addr and port: %q", l)
231 continue
232 }
233
234 connections = append(connections, Connection{
235 LocalAddress: localAddr,
236 LocalPort: localPort,
237 RemoteAddress: remoteAddr,
238 RemotePort: remotePort,
239 State: stateToString(state),
240 })
241 }
242
243 return connections, nil
244 }
245
246
247
248
249
250 func CheckAppPortsListening(
251 ctx context.Context,
252 execClient exec.Client,
253 podName string,
254 containerPortMapping map[string][]int,
255 timeout time.Duration,
256 ) error {
257 if len(containerPortMapping) == 0 {
258 return nil
259 }
260
261 backOffBase := 1 * time.Second
262 if timeout <= backOffBase {
263 return fmt.Errorf("invalid timeout: %v, must be strictly greater than %v", timeout, backOffBase)
264 }
265
266 ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
267 defer cancel()
268
269 hasPortFn := func(connections []Connection, p int) bool {
270 for _, c := range connections {
271 if p == c.LocalPort {
272 return true
273 }
274 }
275 return false
276 }
277
278 notListeningChan := make(chan map[string][]int)
279
280 g := new(errgroup.Group)
281 for container, ports := range containerPortMapping {
282 container := container
283 ports := ports
284
285 if len(ports) == 0 {
286 continue
287 }
288
289 g.Go(func() error {
290 b := backo.NewBacko(backOffBase, 2, 0, 10*time.Second)
291 ticker := b.NewTicker()
292 portsNotListening := make(map[int]struct{})
293
294 for {
295 select {
296 case <-ctxWithTimeout.Done():
297 if len(portsNotListening) != 0 {
298 m := make(map[string][]int)
299 for p := range portsNotListening {
300 m[container] = append(m[container], p)
301 }
302 notListeningChan <- m
303 }
304 return ctxWithTimeout.Err()
305
306 case <-ticker.C:
307 connections, err := GetListeningConnections(ctx, execClient, podName, container)
308 if err != nil {
309 klog.V(3).Infof("error getting listening connections in container %q: %v", container, err)
310 for _, p := range ports {
311 portsNotListening[p] = struct{}{}
312 }
313 } else {
314 for _, p := range ports {
315 if hasPortFn(connections, p) {
316 delete(portsNotListening, p)
317 continue
318 }
319 klog.V(3).Infof("port %d not listening in container %q", p, container)
320 portsNotListening[p] = struct{}{}
321 }
322 if len(portsNotListening) == 0 {
323
324 return nil
325 }
326 }
327 }
328 }
329 })
330 }
331
332
333 errChan := make(chan error, 1)
334 go func() {
335 errChan <- g.Wait()
336 close(notListeningChan)
337 }()
338
339 notListening := make(map[string][]int)
340 for e := range notListeningChan {
341 for c, ports := range e {
342 notListening[c] = append(notListening[c], ports...)
343 }
344 }
345
346 klog.V(4).Infof("ports not listening: %v", notListening)
347
348 if err := <-errChan; err != nil {
349 msg := "error"
350 if errors.Is(err, context.DeadlineExceeded) {
351 msg = "timeout"
352 }
353 msg += " while checking for ports"
354 if len(notListening) == 0 {
355 klog.V(4).Infof("%s and no unreachable port detected: %v", msg, err)
356 return nil
357 }
358 var msgList []string
359 for c, ports := range notListening {
360 var l []string
361 for _, p := range ports {
362 l = append(l, strconv.Itoa(p))
363 }
364 msgList = append(msgList, fmt.Sprintf("%s in container %q", strings.Join(l, ", "), c))
365 }
366 msg += fmt.Sprintf("; ports not listening: (%s)", strings.Join(msgList, "; "))
367 return fmt.Errorf("%s: %w", msg, err)
368 }
369
370 return nil
371 }
372
373 func stateToString(state int) string {
374 if state < 1 || state > len(connectionStates) {
375 return ""
376 }
377 return connectionStates[state-1]
378 }
379
View as plain text