1 package util 2 3 import ( 4 "sync" 5 ) 6 7 // ConcurrentTask is a task to execute in a go-routine 8 type ConcurrentTask struct { 9 ToRun func(errChannel chan error) 10 } 11 12 // run encapsulates the work to be done by calling the ToRun function 13 func (ct ConcurrentTask) run(errChannel chan error, wg *sync.WaitGroup) { 14 defer wg.Done() 15 ct.ToRun(errChannel) 16 } 17 18 // ConcurrentTasks records tasks to be run concurrently with go-routines 19 type ConcurrentTasks struct { 20 tasks []ConcurrentTask 21 } 22 23 // NewConcurrentTasks creates a new ConcurrentTasks instance, dimensioned to accept at least the specified number of tasks 24 func NewConcurrentTasks(taskNumber int) *ConcurrentTasks { 25 return &ConcurrentTasks{tasks: make([]ConcurrentTask, 0, taskNumber)} 26 } 27 28 // Add adds the specified ConcurrentTask to the list of tasks to be run concurrently 29 func (ct *ConcurrentTasks) Add(task ConcurrentTask) { 30 if len(ct.tasks) == 0 { 31 ct.tasks = make([]ConcurrentTask, 0, 7) 32 } 33 ct.tasks = append(ct.tasks, task) 34 } 35 36 // Run concurrently runs the added tasks failing on the first error 37 // Based on https://garrypolley.com/2016/02/10/golang-routines-errors/ 38 func (ct *ConcurrentTasks) Run() error { 39 var wg sync.WaitGroup 40 finished := make(chan bool, 1) // this along with wg.Wait() is why the error handling works and doesn't deadlock 41 errChannel := make(chan error) 42 43 for _, task := range ct.tasks { 44 wg.Add(1) 45 go task.run(errChannel, &wg) 46 } 47 48 // Put the wait group in a go routine. 49 // By putting the wait group in the go routine we ensure either all pass 50 // and we close the "finished" channel or we wait forever for the wait group 51 // to finish. 52 // 53 // Waiting forever is okay because of the blocking select below. 54 go func() { 55 wg.Wait() 56 close(finished) 57 }() 58 59 // This select will block until one of the two channels returns a value. 60 // This means on the first failure in the go routines above the errChannel will release a 61 // value first. Because there is a "return" statement in the err check this function will 62 // exit when an error occurs. 63 // 64 // Due to the blocking on wg.Wait() the finished channel will not get a value unless all 65 // the go routines before were successful because not all the wg.Done() calls would have 66 // happened. 67 select { 68 case <-finished: 69 case err := <-errChannel: 70 if err != nil { 71 return err 72 } 73 } 74 75 return nil 76 } 77