...

Source file src/github.com/redhat-developer/odo/pkg/util/concurrent.go

Documentation: github.com/redhat-developer/odo/pkg/util

     1  package util
     2  
     3  import (
     4  	"sync"
     5  )
     6  
     7  // ConcurrentTask is a task to execute in a go-routine
     8  type ConcurrentTask struct {
     9  	ToRun func(errChannel chan error)
    10  }
    11  
    12  // run encapsulates the work to be done by calling the ToRun function
    13  func (ct ConcurrentTask) run(errChannel chan error, wg *sync.WaitGroup) {
    14  	defer wg.Done()
    15  	ct.ToRun(errChannel)
    16  }
    17  
    18  // ConcurrentTasks records tasks to be run concurrently with go-routines
    19  type ConcurrentTasks struct {
    20  	tasks []ConcurrentTask
    21  }
    22  
    23  // NewConcurrentTasks creates a new ConcurrentTasks instance, dimensioned to accept at least the specified number of tasks
    24  func NewConcurrentTasks(taskNumber int) *ConcurrentTasks {
    25  	return &ConcurrentTasks{tasks: make([]ConcurrentTask, 0, taskNumber)}
    26  }
    27  
    28  // Add adds the specified ConcurrentTask to the list of tasks to be run concurrently
    29  func (ct *ConcurrentTasks) Add(task ConcurrentTask) {
    30  	if len(ct.tasks) == 0 {
    31  		ct.tasks = make([]ConcurrentTask, 0, 7)
    32  	}
    33  	ct.tasks = append(ct.tasks, task)
    34  }
    35  
    36  // Run concurrently runs the added tasks failing on the first error
    37  // Based on https://garrypolley.com/2016/02/10/golang-routines-errors/
    38  func (ct *ConcurrentTasks) Run() error {
    39  	var wg sync.WaitGroup
    40  	finished := make(chan bool, 1) // this along with wg.Wait() is why the error handling works and doesn't deadlock
    41  	errChannel := make(chan error)
    42  
    43  	for _, task := range ct.tasks {
    44  		wg.Add(1)
    45  		go task.run(errChannel, &wg)
    46  	}
    47  
    48  	// Put the wait group in a go routine.
    49  	// By putting the wait group in the go routine we ensure either all pass
    50  	// and we close the "finished" channel or we wait forever for the wait group
    51  	// to finish.
    52  	//
    53  	// Waiting forever is okay because of the blocking select below.
    54  	go func() {
    55  		wg.Wait()
    56  		close(finished)
    57  	}()
    58  
    59  	// This select will block until one of the two channels returns a value.
    60  	// This means on the first failure in the go routines above the errChannel will release a
    61  	// value first. Because there is a "return" statement in the err check this function will
    62  	// exit when an error occurs.
    63  	//
    64  	// Due to the blocking on wg.Wait() the finished channel will not get a value unless all
    65  	// the go routines before were successful because not all the wg.Done() calls would have
    66  	// happened.
    67  	select {
    68  	case <-finished:
    69  	case err := <-errChannel:
    70  		if err != nil {
    71  			return err
    72  		}
    73  	}
    74  
    75  	return nil
    76  }
    77  

View as plain text