diff options
Diffstat (limited to 'les/execqueue.go')
-rw-r--r-- | les/execqueue.go | 86 |
1 files changed, 56 insertions, 30 deletions
diff --git a/les/execqueue.go b/les/execqueue.go index ac779003b..614721bf0 100644 --- a/les/execqueue.go +++ b/les/execqueue.go @@ -16,56 +16,82 @@ package les -import ( - "sync/atomic" -) +import "sync" -// ExecQueue implements a queue that executes function calls in a single thread, +// execQueue implements a queue that executes function calls in a single thread, // in the same order as they have been queued. type execQueue struct { - chn chan func() - cnt, stop, capacity int32 + mu sync.Mutex + cond *sync.Cond + funcs []func() + closeWait chan struct{} } -// NewExecQueue creates a new execution queue. -func newExecQueue(capacity int32) *execQueue { - q := &execQueue{ - chn: make(chan func(), capacity), - capacity: capacity, - } +// newExecQueue creates a new execution queue. +func newExecQueue(capacity int) *execQueue { + q := &execQueue{funcs: make([]func(), 0, capacity)} + q.cond = sync.NewCond(&q.mu) go q.loop() return q } func (q *execQueue) loop() { - for f := range q.chn { - atomic.AddInt32(&q.cnt, -1) - if atomic.LoadInt32(&q.stop) != 0 { - return - } + for f := q.waitNext(false); f != nil; f = q.waitNext(true) { f() } + close(q.closeWait) } -// CanQueue returns true if more function calls can be added to the execution queue. +func (q *execQueue) waitNext(drop bool) (f func()) { + q.mu.Lock() + if drop { + // Remove the function that just executed. We do this here instead of when + // dequeuing so len(q.funcs) includes the function that is running. + q.funcs = append(q.funcs[:0], q.funcs[1:]...) + } + for !q.isClosed() { + if len(q.funcs) > 0 { + f = q.funcs[0] + break + } + q.cond.Wait() + } + q.mu.Unlock() + return f +} + +func (q *execQueue) isClosed() bool { + return q.closeWait != nil +} + +// canQueue returns true if more function calls can be added to the execution queue. func (q *execQueue) canQueue() bool { - return atomic.LoadInt32(&q.stop) == 0 && atomic.LoadInt32(&q.cnt) < q.capacity + q.mu.Lock() + ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) + q.mu.Unlock() + return ok } -// Queue adds a function call to the execution queue. Returns true if successful. +// queue adds a function call to the execution queue. Returns true if successful. func (q *execQueue) queue(f func()) bool { - if atomic.LoadInt32(&q.stop) != 0 { - return false + q.mu.Lock() + ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) + if ok { + q.funcs = append(q.funcs, f) + q.cond.Signal() } - if atomic.AddInt32(&q.cnt, 1) > q.capacity { - atomic.AddInt32(&q.cnt, -1) - return false - } - q.chn <- f - return true + q.mu.Unlock() + return ok } -// Stop stops the exec queue. +// quit stops the exec queue. +// quit waits for the current execution to finish before returning. func (q *execQueue) quit() { - atomic.StoreInt32(&q.stop, 1) + q.mu.Lock() + if !q.isClosed() { + q.closeWait = make(chan struct{}) + q.cond.Signal() + } + q.mu.Unlock() + <-q.closeWait } |