1 package logs
2
3 import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "strconv"
9 "strings"
10 "sync"
11 "sync/atomic"
12
13 corev1 "k8s.io/api/core/v1"
14 "k8s.io/apimachinery/pkg/watch"
15
16 "github.com/fatih/color"
17 odolabels "github.com/redhat-developer/odo/pkg/labels"
18 "github.com/redhat-developer/odo/pkg/log"
19 odocontext "github.com/redhat-developer/odo/pkg/odo/context"
20 "github.com/redhat-developer/odo/pkg/platform"
21 )
22
23 type LogsClient struct {
24 platformClient platform.Client
25 }
26
27 type ContainerLogs struct {
28 PodName string
29 ContainerName string
30 Logs io.ReadCloser
31 }
32
33 type Events struct {
34
35 Logs chan ContainerLogs
36
37 Err chan error
38
39 Done chan struct{}
40 }
41
42 var _ Client = (*LogsClient)(nil)
43
44 func NewLogsClient(platformClient platform.Client) *LogsClient {
45 return &LogsClient{
46 platformClient: platformClient,
47 }
48 }
49
50 var _ Client = (*LogsClient)(nil)
51
52 func (o *LogsClient) DisplayLogs(
53 ctx context.Context,
54 mode string,
55 componentName string,
56 namespace string,
57 follow bool,
58 out io.Writer,
59 ) error {
60 events, err := o.GetLogsForMode(
61 ctx,
62 mode,
63 componentName,
64 namespace,
65 follow,
66 )
67 if err != nil {
68 return err
69 }
70
71 uniqueContainerNames := map[string]struct{}{}
72 var goroutines struct{ count int64 }
73 errChan := make(chan error)
74 var mu sync.Mutex
75
76 displayedLogs := map[string]struct{}{}
77 for {
78 select {
79 case containerLogs := <-events.Logs:
80 podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName)
81 if _, ok := displayedLogs[podContainerName]; ok {
82 continue
83 }
84 displayedLogs[podContainerName] = struct{}{}
85
86 uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames)
87 uniqueContainerNames[uniqueName] = struct{}{}
88 colour := log.ColorPicker()
89 logs := containerLogs.Logs
90
91 func() {
92 mu.Lock()
93 defer mu.Unlock()
94 color.Set(colour)
95 defer color.Unset()
96 help := ""
97 if uniqueName != containerLogs.ContainerName {
98 help = fmt.Sprintf(" (%s)", uniqueName)
99 }
100 _, err = fmt.Fprintf(out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help)
101 if err != nil {
102 errChan <- err
103 }
104 }()
105
106 if follow {
107 atomic.AddInt64(&goroutines.count, 1)
108 go func(out io.Writer) {
109 defer func() {
110 atomic.AddInt64(&goroutines.count, -1)
111 }()
112 err = printLogs(uniqueName, logs, out, colour, &mu)
113 if err != nil {
114 errChan <- err
115 }
116 delete(displayedLogs, podContainerName)
117 events.Done <- struct{}{}
118 }(out)
119 } else {
120 err = printLogs(uniqueName, logs, out, colour, &mu)
121 if err != nil {
122 return err
123 }
124 }
125 case err = <-errChan:
126 return err
127 case err = <-events.Err:
128 return err
129 case <-events.Done:
130 if !follow && goroutines.count == 0 {
131 if len(uniqueContainerNames) == 0 {
132
133
134
135
136 fmt.Fprintf(out, "no containers running in the specified mode for the component %q\n", componentName)
137 }
138 return nil
139 }
140 case <-ctx.Done():
141 return nil
142 }
143 }
144 }
145
146 func getUniqueContainerName(name string, uniqueNames map[string]struct{}) string {
147 if _, ok := uniqueNames[name]; ok {
148
149
150 var numStr string
151 var last int
152 var err error
153
154 split := strings.Split(name, "[")
155 if len(split) == 2 {
156 numStr = strings.Trim(split[1], "]")
157 last, err = strconv.Atoi(numStr)
158 if err != nil {
159 return ""
160 }
161 last++
162 } else {
163 last = 1
164 }
165 name = fmt.Sprintf("%s[%d]", split[0], last)
166 return getUniqueContainerName(name, uniqueNames)
167 }
168 return name
169 }
170
171
172 func printLogs(containerName string, rd io.ReadCloser, out io.Writer, colour color.Attribute, mu *sync.Mutex) error {
173 scanner := bufio.NewScanner(rd)
174 scanner.Split(bufio.ScanLines)
175
176 for scanner.Scan() {
177 line := scanner.Text()
178 err := func() error {
179 mu.Lock()
180 defer mu.Unlock()
181 color.Set(colour)
182 defer color.Unset()
183
184 _, err := fmt.Fprintln(out, containerName+": "+line)
185 return err
186 }()
187 if err != nil {
188 return err
189 }
190 }
191
192 return nil
193 }
194
195 func (o *LogsClient) GetLogsForMode(
196 ctx context.Context,
197 mode string,
198 componentName string,
199 namespace string,
200 follow bool,
201 ) (Events, error) {
202 events := Events{
203 Logs: make(chan ContainerLogs),
204 Err: make(chan error),
205 Done: make(chan struct{}),
206 }
207
208 go o.getLogsForMode(ctx, events, mode, componentName, namespace, follow)
209 return events, nil
210 }
211
212 func (o *LogsClient) getLogsForMode(
213 ctx context.Context,
214 events Events,
215 mode string,
216 componentName string,
217 namespace string,
218 follow bool,
219 ) {
220 var selector string
221 podChan := make(chan corev1.Pod)
222 errChan := make(chan error)
223 doneChan := make(chan struct{})
224
225 go func() {
226
227 for {
228 select {
229 case pod := <-podChan:
230 for _, container := range pod.Spec.Containers {
231 containerLogs, err := o.platformClient.GetPodLogs(pod.Name, container.Name, follow)
232 if err != nil {
233 events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err)
234 }
235 events.Logs <- ContainerLogs{
236 PodName: pod.GetName(),
237 ContainerName: container.Name,
238 Logs: containerLogs,
239 }
240 }
241 case err := <-errChan:
242 events.Err <- err
243 case <-doneChan:
244 events.Done <- struct{}{}
245 }
246 }
247 }()
248
249 appname := odocontext.GetApplication(ctx)
250
251 getPods := func() error {
252 if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
253 selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
254 err := o.getPodsForSelector(selector, namespace, podChan)
255 if err != nil {
256 return err
257 }
258 }
259 if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
260 selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
261 err := o.getPodsForSelector(selector, namespace, podChan)
262 if err != nil {
263 return err
264 }
265 }
266 return nil
267 }
268
269 err := getPods()
270 if err != nil {
271 errChan <- err
272 }
273
274 if follow {
275 podWatcher, err := o.platformClient.PodWatcher(ctx, "")
276 if err != nil {
277 errChan <- err
278 }
279 outer:
280 for {
281 select {
282 case ev := <-podWatcher.ResultChan():
283 switch ev.Type {
284 case watch.Added, watch.Modified:
285 err = getPods()
286 if err != nil {
287 errChan <- err
288 }
289 }
290
291 case <-ctx.Done():
292 break outer
293 }
294 }
295 }
296
297 doneChan <- struct{}{}
298 }
299
300
301
302 func (o *LogsClient) getPodsForSelector(
303 selector string,
304 namespace string,
305 podChan chan corev1.Pod,
306 ) error {
307
308 pods := map[string]struct{}{}
309
310 podList, err := o.platformClient.GetPodsMatchingSelector(selector)
311 if err != nil {
312 return err
313 }
314 for _, pod := range podList.Items {
315 if pod.Status.Phase == "Running" {
316 pods[pod.GetName()] = struct{}{}
317 }
318 }
319
320
321 podsInNs, err := o.platformClient.GetAllPodsInNamespaceMatchingSelector(selector, namespace)
322 if err != nil {
323 return err
324 }
325
326 for _, pod := range podsInNs.Items {
327 if _, ok := pods[pod.GetName()]; ok {
328
329 continue
330 }
331 if pod.Status.Phase == "Running" {
332 podList.Items = append(podList.Items, pod)
333 }
334 }
335
336 for _, pod := range podList.Items {
337 if pod.Status.Phase == "Running" {
338 podChan <- pod
339 }
340 }
341
342 return nil
343 }
344
View as plain text