...

Source file src/github.com/redhat-developer/odo/pkg/apiserver-impl/sse/notifications.go

Documentation: github.com/redhat-developer/odo/pkg/apiserver-impl/sse

     1  package sse
     2  
     3  import (
     4  	"context"
     5  	"net/http"
     6  	"sync"
     7  	"time"
     8  
     9  	"k8s.io/klog"
    10  
    11  	openapi "github.com/redhat-developer/odo/pkg/apiserver-gen/go"
    12  	"github.com/redhat-developer/odo/pkg/testingutil/filesystem"
    13  )
    14  
    15  type Notifier struct {
    16  	fsys filesystem.Filesystem
    17  
    18  	devfilePath string
    19  
    20  	// eventsChan is a channel for all events that will be broadcast to all subscribers.
    21  	// Because it is not natively possible to read a same value twice from a Go channel,
    22  	// we are storing the list of channels to broadcast the event to into the subscribers list.
    23  	eventsChan chan Event
    24  
    25  	// subscribers is a list of all channels where any event from eventsChan will be broadcast to.
    26  	subscribers []chan Event
    27  
    28  	// newSubscriptionChan is a channel where new subscribers can register the channel on which they wish to be notified.
    29  	// Such channels are stored into the subscribers list.
    30  	newSubscriptionChan chan chan Event
    31  
    32  	// cancelSubscriptionChan is a write-only channel where subscribers can cancel their registration and stop being broadcast new events coming from eventsChan.
    33  	cancelSubscriptionChan chan (<-chan Event)
    34  }
    35  
    36  func NewNotifier(ctx context.Context, fsys filesystem.Filesystem, devfilePath string, devfileFiles []string) (*Notifier, error) {
    37  	notifier := Notifier{
    38  		fsys:                   fsys,
    39  		devfilePath:            devfilePath,
    40  		eventsChan:             make(chan Event),
    41  		subscribers:            make([]chan Event, 0),
    42  		newSubscriptionChan:    make(chan chan Event),
    43  		cancelSubscriptionChan: make(chan (<-chan Event)),
    44  	}
    45  
    46  	err := notifier.watchDevfileChanges(ctx, devfileFiles)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  
    51  	go notifier.manageSubscriptions(ctx)
    52  
    53  	// Heartbeat as a keep-alive mechanism to prevent some clients from closing inactive connections (notifications might not be sent regularly).
    54  	go func() {
    55  		ticker := time.NewTicker(7 * time.Second)
    56  		for {
    57  			select {
    58  			case <-ctx.Done():
    59  				return
    60  			case <-ticker.C:
    61  				notifier.eventsChan <- Event{
    62  					eventType: Heartbeat,
    63  				}
    64  			}
    65  		}
    66  	}()
    67  
    68  	return &notifier, nil
    69  }
    70  
    71  func (n *Notifier) manageSubscriptions(ctx context.Context) {
    72  	defer func() {
    73  		for _, listener := range n.subscribers {
    74  			if listener != nil {
    75  				close(listener)
    76  			}
    77  		}
    78  	}()
    79  
    80  	go func() {
    81  		for {
    82  			select {
    83  			case <-ctx.Done():
    84  				return
    85  			case newSubscriber := <-n.newSubscriptionChan:
    86  				n.subscribers = append(n.subscribers, newSubscriber)
    87  			}
    88  		}
    89  	}()
    90  
    91  	go func() {
    92  		for {
    93  			select {
    94  			case <-ctx.Done():
    95  				return
    96  			case subscriberToRemove := <-n.cancelSubscriptionChan:
    97  				for i, ch := range n.subscribers {
    98  					if ch == subscriberToRemove {
    99  						n.subscribers[i] = n.subscribers[len(n.subscribers)-1]
   100  						n.subscribers = n.subscribers[:len(n.subscribers)-1]
   101  						close(ch)
   102  						break
   103  					}
   104  				}
   105  			}
   106  		}
   107  	}()
   108  
   109  	for {
   110  		select {
   111  		case <-ctx.Done():
   112  			return
   113  		case val, ok := <-n.eventsChan:
   114  			if !ok {
   115  				return
   116  			}
   117  			var wg sync.WaitGroup
   118  			for _, subscriber := range n.subscribers {
   119  				subscriber := subscriber
   120  				if subscriber == nil {
   121  					continue
   122  				}
   123  				wg.Add(1)
   124  				go func() {
   125  					defer wg.Done()
   126  					select {
   127  					case subscriber <- val:
   128  					case <-ctx.Done():
   129  						return
   130  					}
   131  				}()
   132  			}
   133  			wg.Wait()
   134  		}
   135  	}
   136  }
   137  
   138  func (n *Notifier) Routes() openapi.Routes {
   139  	return openapi.Routes{
   140  		{
   141  			Name:        "ServerSentEvents",
   142  			Method:      http.MethodGet,
   143  			Pattern:     "/api/v1/notifications",
   144  			HandlerFunc: n.handler,
   145  		},
   146  	}
   147  }
   148  
   149  func (n *Notifier) handler(w http.ResponseWriter, r *http.Request) {
   150  	flusher, ok := w.(http.Flusher)
   151  
   152  	if !ok {
   153  		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
   154  		return
   155  	}
   156  
   157  	newListener := make(chan Event)
   158  	n.newSubscriptionChan <- newListener
   159  	defer func() {
   160  		n.cancelSubscriptionChan <- newListener
   161  	}()
   162  
   163  	w.Header().Set("Content-Type", "text/event-stream")
   164  	w.Header().Set("Cache-Control", "no-cache")
   165  	w.Header().Set("Connection", "keep-alive")
   166  	w.Header().Set("Access-Control-Allow-Origin", "*")
   167  	// Headers sent back as early as possible to clients
   168  	flusher.Flush()
   169  
   170  	for {
   171  		select {
   172  		case ev := <-newListener:
   173  			func() {
   174  				defer flusher.Flush()
   175  				dataToWrite, err := ev.toSseString()
   176  				if err != nil {
   177  					klog.V(2).Infof("error writing notification data: %v", err)
   178  					return
   179  				}
   180  				_, err = w.Write([]byte(dataToWrite))
   181  				if err != nil {
   182  					klog.V(2).Infof("error writing notification data: %v", err)
   183  					return
   184  				}
   185  			}()
   186  
   187  		case <-r.Context().Done():
   188  			klog.V(8).Infof("Connection closed!")
   189  			return
   190  		}
   191  	}
   192  }
   193  

View as plain text