...
1 package sse
2
3 import (
4 "context"
5 "net/http"
6 "sync"
7 "time"
8
9 "k8s.io/klog"
10
11 openapi "github.com/redhat-developer/odo/pkg/apiserver-gen/go"
12 "github.com/redhat-developer/odo/pkg/testingutil/filesystem"
13 )
14
15 type Notifier struct {
16 fsys filesystem.Filesystem
17
18 devfilePath string
19
20
21
22
23 eventsChan chan Event
24
25
26 subscribers []chan Event
27
28
29
30 newSubscriptionChan chan chan Event
31
32
33 cancelSubscriptionChan chan (<-chan Event)
34 }
35
36 func NewNotifier(ctx context.Context, fsys filesystem.Filesystem, devfilePath string, devfileFiles []string) (*Notifier, error) {
37 notifier := Notifier{
38 fsys: fsys,
39 devfilePath: devfilePath,
40 eventsChan: make(chan Event),
41 subscribers: make([]chan Event, 0),
42 newSubscriptionChan: make(chan chan Event),
43 cancelSubscriptionChan: make(chan (<-chan Event)),
44 }
45
46 err := notifier.watchDevfileChanges(ctx, devfileFiles)
47 if err != nil {
48 return nil, err
49 }
50
51 go notifier.manageSubscriptions(ctx)
52
53
54 go func() {
55 ticker := time.NewTicker(7 * time.Second)
56 for {
57 select {
58 case <-ctx.Done():
59 return
60 case <-ticker.C:
61 notifier.eventsChan <- Event{
62 eventType: Heartbeat,
63 }
64 }
65 }
66 }()
67
68 return ¬ifier, nil
69 }
70
71 func (n *Notifier) manageSubscriptions(ctx context.Context) {
72 defer func() {
73 for _, listener := range n.subscribers {
74 if listener != nil {
75 close(listener)
76 }
77 }
78 }()
79
80 go func() {
81 for {
82 select {
83 case <-ctx.Done():
84 return
85 case newSubscriber := <-n.newSubscriptionChan:
86 n.subscribers = append(n.subscribers, newSubscriber)
87 }
88 }
89 }()
90
91 go func() {
92 for {
93 select {
94 case <-ctx.Done():
95 return
96 case subscriberToRemove := <-n.cancelSubscriptionChan:
97 for i, ch := range n.subscribers {
98 if ch == subscriberToRemove {
99 n.subscribers[i] = n.subscribers[len(n.subscribers)-1]
100 n.subscribers = n.subscribers[:len(n.subscribers)-1]
101 close(ch)
102 break
103 }
104 }
105 }
106 }
107 }()
108
109 for {
110 select {
111 case <-ctx.Done():
112 return
113 case val, ok := <-n.eventsChan:
114 if !ok {
115 return
116 }
117 var wg sync.WaitGroup
118 for _, subscriber := range n.subscribers {
119 subscriber := subscriber
120 if subscriber == nil {
121 continue
122 }
123 wg.Add(1)
124 go func() {
125 defer wg.Done()
126 select {
127 case subscriber <- val:
128 case <-ctx.Done():
129 return
130 }
131 }()
132 }
133 wg.Wait()
134 }
135 }
136 }
137
138 func (n *Notifier) Routes() openapi.Routes {
139 return openapi.Routes{
140 {
141 Name: "ServerSentEvents",
142 Method: http.MethodGet,
143 Pattern: "/api/v1/notifications",
144 HandlerFunc: n.handler,
145 },
146 }
147 }
148
149 func (n *Notifier) handler(w http.ResponseWriter, r *http.Request) {
150 flusher, ok := w.(http.Flusher)
151
152 if !ok {
153 http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
154 return
155 }
156
157 newListener := make(chan Event)
158 n.newSubscriptionChan <- newListener
159 defer func() {
160 n.cancelSubscriptionChan <- newListener
161 }()
162
163 w.Header().Set("Content-Type", "text/event-stream")
164 w.Header().Set("Cache-Control", "no-cache")
165 w.Header().Set("Connection", "keep-alive")
166 w.Header().Set("Access-Control-Allow-Origin", "*")
167
168 flusher.Flush()
169
170 for {
171 select {
172 case ev := <-newListener:
173 func() {
174 defer flusher.Flush()
175 dataToWrite, err := ev.toSseString()
176 if err != nil {
177 klog.V(2).Infof("error writing notification data: %v", err)
178 return
179 }
180 _, err = w.Write([]byte(dataToWrite))
181 if err != nil {
182 klog.V(2).Infof("error writing notification data: %v", err)
183 return
184 }
185 }()
186
187 case <-r.Context().Done():
188 klog.V(8).Infof("Connection closed!")
189 return
190 }
191 }
192 }
193
View as plain text