From 402fd6e8c6a2e379351e0aae10a833fae6bcae6c Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Mon, 12 Oct 2015 15:04:38 +0300 Subject: core, eth, event, miner, xeth: fix event post / subscription race --- eth/filters/filter_system.go | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) (limited to 'eth/filters') diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 4972dcd59..ae6093525 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -20,6 +20,7 @@ package filters import ( "sync" + "time" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -35,6 +36,7 @@ type FilterSystem struct { filterMu sync.RWMutex filterId int filters map[int]*Filter + created map[int]time.Time quit chan struct{} } @@ -44,6 +46,7 @@ func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ eventMux: mux, filters: make(map[int]*Filter), + created: make(map[int]time.Time), } go fs.filterLoop() return fs @@ -60,6 +63,7 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { defer fs.filterMu.Unlock() id = fs.filterId fs.filters[id] = filter + fs.created[id] = time.Now() fs.filterId++ return id @@ -69,15 +73,16 @@ func (fs *FilterSystem) Add(filter *Filter) (id int) { func (fs *FilterSystem) Remove(id int) { fs.filterMu.Lock() defer fs.filterMu.Unlock() - if _, ok := fs.filters[id]; ok { - delete(fs.filters, id) - } + + delete(fs.filters, id) + delete(fs.created, id) } // Get retrieves a filter installed using Add The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter { fs.filterMu.RLock() defer fs.filterMu.RUnlock() + return fs.filters[id] } @@ -85,42 +90,49 @@ func (fs *FilterSystem) Get(id int) *Filter { // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { // Subscribe to events - events := fs.eventMux.Subscribe( + eventCh := fs.eventMux.Subscribe( //core.PendingBlockEvent{}, core.ChainEvent{}, core.TxPreEvent{}, - vm.Logs(nil)) + vm.Logs(nil), + ).Chan() out: for { select { case <-fs.quit: break out - case event := <-events.Chan(): - switch event := event.(type) { + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, notify the registered filters + switch ev := event.Data.(type) { case core.ChainEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block, event.Logs) + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } } fs.filterMu.RUnlock() case core.TxPreEvent: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.TransactionCallback != nil { - filter.TransactionCallback(event.Tx) + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } } fs.filterMu.RUnlock() case vm.Logs: fs.filterMu.RLock() - for _, filter := range fs.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) if len(msgs) > 0 { filter.LogsCallback(msgs) } -- cgit