diff options
author | Felix Lange <fjl@twurst.com> | 2015-10-29 20:28:00 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-10-30 00:26:26 +0800 |
commit | fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd (patch) | |
tree | 2363ce8738074226cfedf8ede1612e0ef3a03494 /eth/filters | |
parent | 56f8699a6c6bfe613d2ab28c47631a1f4a29e36f (diff) | |
download | go-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.tar.gz go-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.tar.zst go-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.zip |
cmd/utils, rpc/comms: stop XEth when IPC connection ends
There are a bunch of changes required to make this work:
- in miner: allow unregistering agents, fix RemoteAgent.Stop
- in eth/filters: make FilterSystem.Stop not crash
- in rpc/comms: move listen loop to platform-independent code
Fixes #1930. I ran the shell loop there for a few minutes and didn't see
any changes in the memory profile.
Diffstat (limited to 'eth/filters')
-rw-r--r-- | eth/filters/filter_system.go | 88 |
1 files changed, 35 insertions, 53 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ae6093525..df3ce90c6 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,30 +31,32 @@ import ( // block, transaction and log events. The Filtering system can be used to listen // for specific LOG events fired by the EVM (Ethereum Virtual Machine). type FilterSystem struct { - eventMux *event.TypeMux - filterMu sync.RWMutex filterId int filters map[int]*Filter created map[int]time.Time - - quit chan struct{} + sub event.Subscription } // NewFilterSystem returns a newly allocated filter manager func NewFilterSystem(mux *event.TypeMux) *FilterSystem { fs := &FilterSystem{ - eventMux: mux, - filters: make(map[int]*Filter), - created: make(map[int]time.Time), + filters: make(map[int]*Filter), + created: make(map[int]time.Time), } + fs.sub = mux.Subscribe( + //core.PendingBlockEvent{}, + core.ChainEvent{}, + core.TxPreEvent{}, + vm.Logs(nil), + ) go fs.filterLoop() return fs } // Stop quits the filter loop required for polling events func (fs *FilterSystem) Stop() { - close(fs.quit) + fs.sub.Unsubscribe() } // Add adds a filter to the filter manager @@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter { // filterLoop waits for specific events from ethereum and fires their handlers // when the filter matches the requirements. func (fs *FilterSystem) filterLoop() { - // Subscribe to events - eventCh := fs.eventMux.Subscribe( - //core.PendingBlockEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ).Chan() - -out: - for { - select { - case <-fs.quit: - break out - case event, ok := <-eventCh: - if !ok { - // Event subscription closed, set the channel to nil to stop spinning - eventCh = nil - continue - } - // A real event arrived, notify the registered filters - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) - } + for event := range fs.sub.Chan() { + switch ev := event.Data.(type) { + case core.ChainEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.BlockCallback != nil && fs.created[id].Before(event.Time) { + filter.BlockCallback(ev.Block, ev.Logs) } - fs.filterMu.RUnlock() + } + fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { - filter.TransactionCallback(ev.Tx) - } + case core.TxPreEvent: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) { + filter.TransactionCallback(ev.Tx) } - fs.filterMu.RUnlock() - - case vm.Logs: - fs.filterMu.RLock() - for id, filter := range fs.filters { - if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { - msgs := filter.FilterLogs(ev) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } + } + fs.filterMu.RUnlock() + + case vm.Logs: + fs.filterMu.RLock() + for id, filter := range fs.filters { + if filter.LogsCallback != nil && fs.created[id].Before(event.Time) { + msgs := filter.FilterLogs(ev) + if len(msgs) > 0 { + filter.LogsCallback(msgs) } } - fs.filterMu.RUnlock() } + fs.filterMu.RUnlock() } } } |