diff options
Diffstat (limited to 'core/bloombits/matcher.go')
-rw-r--r-- | core/bloombits/matcher.go | 51 |
1 files changed, 44 insertions, 7 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index e33de018a..32a660337 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -18,6 +18,7 @@ package bloombits import ( "bytes" + "context" "errors" "math" "sort" @@ -60,6 +61,8 @@ type Retrieval struct { Bit uint Sections []uint64 Bitsets [][]byte + Error error + Context context.Context } // Matcher is a pipelined system of schedulers and logic matchers which perform @@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) { // Start starts the matching process and returns a stream of bloom matches in // a given range of blocks. If there are no more matches in the range, the result // channel is closed. -func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) { +func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { // Make sure we're not creating concurrent sessions if atomic.SwapUint32(&m.running, 1) == 1 { return nil, errors.New("matcher already running") @@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession matcher: m, quit: make(chan struct{}), kill: make(chan struct{}), + ctx: ctx, } for _, scheduler := range m.schedulers { scheduler.reset() @@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { type MatcherSession struct { matcher *Matcher - quit chan struct{} // Quit channel to request pipeline termination - kill chan struct{} // Term channel to signal non-graceful forced shutdown - pend sync.WaitGroup + quit chan struct{} // Quit channel to request pipeline termination + kill chan struct{} // Term channel to signal non-graceful forced shutdown + ctx context.Context + err error + stopping bool + lock sync.Mutex + pend sync.WaitGroup } // Close stops the matching process and waits for all subprocesses to terminate // before returning. The timeout may be used for graceful shutdown, allowing the // currently running retrievals to complete before this time. -func (s *MatcherSession) Close(timeout time.Duration) { +func (s *MatcherSession) Close() { + s.lock.Lock() + stopping := s.stopping + s.stopping = true + s.lock.Unlock() + // ensure that we only close the session once + if stopping { + return + } + // Bail out if the matcher is not running select { case <-s.quit: @@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) { } // Signal termination and wait for all goroutines to tear down close(s.quit) - time.AfterFunc(timeout, func() { close(s.kill) }) + time.AfterFunc(time.Second, func() { close(s.kill) }) s.pend.Wait() } +// setError sets an error and stops the session +func (s *MatcherSession) setError(err error) { + s.lock.Lock() + s.err = err + s.lock.Unlock() + s.Close() +} + +// Error returns an error if one has happened during the session +func (s *MatcherSession) Error() error { + s.lock.Lock() + defer s.lock.Unlock() + + return s.err +} + // AllocateRetrieval assigns a bloom bit index to a client process that can either // immediately reuest and fetch the section contents assigned to this bit or wait // a little while for more sections to be requested. @@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan case mux <- request: // Retrieval accepted, something must arrive before we're aborting - request <- &Retrieval{Bit: bit, Sections: sections} + request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx} result := <-request + if result.Error != nil { + s.setError(result.Error) + } + s.DeliverSections(result.Bit, result.Sections, result.Bitsets) } } |