diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-16 01:44:30 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-10-16 01:44:30 +0800 |
commit | cefe5c80b1cdcab606a169c0be65d9d2ba9bc941 (patch) | |
tree | dbbb116fa71ee396fdeb743a725b14007b43e845 /eth/filters | |
parent | 2f1f2e4811a6f3094f99b55f6553fe27d83f9aad (diff) | |
parent | 402fd6e8c6a2e379351e0aae10a833fae6bcae6c (diff) | |
download | go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.gz go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.tar.zst go-tangerine-cefe5c80b1cdcab606a169c0be65d9d2ba9bc941.zip |
Merge pull request #1898 from karalabe/eventmux-post-race
core, eth, event, miner, xeth: fix event post / subscription race
Diffstat (limited to 'eth/filters')
-rw-r--r-- | eth/filters/filter_system.go | 44 |
1 files changed, 28 insertions, 16 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4972dcd59..ae6093525 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,6 +20,7 @@ package filters import ( "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -35,6 +36,7 @@ type FilterSystem struct { filterMu sync.RWMutex filterId int filters map[int]*Filter + created map[int]time.Time quit chan struct{} } @@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ eventMux: mux, filters: make(map[int]*Filter), + created: make(map[int]time.Time), } go fs.filterLoop() return fs @@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { defer fs.filterMu.Unlock() id = fs.filterId fs.filters[id] = filter + fs.created[id] = time.Now() fs.filterId++ return id @@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - if _, ok := fs.filters[id]; ok { - delete(fs.filters, id) - } + + delete(fs.filters, id) + delete(fs.created, id) } // Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() + return fs.filters[id] } @@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter { // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { // Subscribe to events - events := fs.eventMux.Subscribe( + eventCh := fs.eventMux.Subscribe( //core.PendingBlockEvent{}, core.ChainEvent{}, core.TxPreEvent{}, - vm.Logs(nil)) + vm.Logs(nil), + ).Chan() out: for { select { case <-fs.quit: break out - case event := <-events.Chan(): - switch event := event.(type) { + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, notify the registered filters + switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block, event.Logs) + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() case core.TxPreEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.TransactionCallback != nil { - filter.TransactionCallback(event.Tx) + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } } fs.filterMu.RUnlock() case vm.Logs: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) if len(msgs) > 0 { filter.LogsCallback(msgs) } |