aboutsummaryrefslogtreecommitdiffstats
path: root/core/bloombits/matcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/bloombits/matcher.go')
-rw-r--r--core/bloombits/matcher.go51
1 files changed, 44 insertions, 7 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index e33de018a..32a660337 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -18,6 +18,7 @@ package bloombits
import (
"bytes"
+ "context"
"errors"
"math"
"sort"
@@ -60,6 +61,8 @@ type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
+ Error error
+ Context context.Context
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
@@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) {
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
-func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
+func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
@@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
+ ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
@@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher
- quit chan struct{} // Quit channel to request pipeline termination
- kill chan struct{} // Term channel to signal non-graceful forced shutdown
- pend sync.WaitGroup
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+ ctx context.Context
+ err error
+ stopping bool
+ lock sync.Mutex
+ pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
-func (s *MatcherSession) Close(timeout time.Duration) {
+func (s *MatcherSession) Close() {
+ s.lock.Lock()
+ stopping := s.stopping
+ s.stopping = true
+ s.lock.Unlock()
+ // ensure that we only close the session once
+ if stopping {
+ return
+ }
+
// Bail out if the matcher is not running
select {
case <-s.quit:
@@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) {
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
- time.AfterFunc(timeout, func() { close(s.kill) })
+ time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
}
+// setError sets an error and stops the session
+func (s *MatcherSession) setError(err error) {
+ s.lock.Lock()
+ s.err = err
+ s.lock.Unlock()
+ s.Close()
+}
+
+// Error returns an error if one has happened during the session
+func (s *MatcherSession) Error() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ return s.err
+}
+
// AllocateRetrieval assigns a bloom bit index to a client process that can either
// immediately reuest and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
@@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
- request <- &Retrieval{Bit: bit, Sections: sections}
+ request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
+ if result.Error != nil {
+ s.setError(result.Error)
+ }
+
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}