aboutsummaryrefslogtreecommitdiffstats
path: root/eth/filters
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2015-10-29 20:28:00 +0800
committerFelix Lange <fjl@twurst.com>2015-10-30 00:26:26 +0800
commitfbdb44dcc17240a01b45e55d3aa4e4b8db0868cd (patch)
tree2363ce8738074226cfedf8ede1612e0ef3a03494 /eth/filters
parent56f8699a6c6bfe613d2ab28c47631a1f4a29e36f (diff)
downloadgo-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.tar.gz
go-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.tar.zst
go-tangerine-fbdb44dcc17240a01b45e55d3aa4e4b8db0868cd.zip
cmd/utils, rpc/comms: stop XEth when IPC connection ends
There are a bunch of changes required to make this work: - in miner: allow unregistering agents, fix RemoteAgent.Stop - in eth/filters: make FilterSystem.Stop not crash - in rpc/comms: move listen loop to platform-independent code Fixes #1930. I ran the shell loop there for a few minutes and didn't see any changes in the memory profile.
Diffstat (limited to 'eth/filters')
-rw-r--r--eth/filters/filter_system.go88
1 files changed, 35 insertions, 53 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ae6093525..df3ce90c6 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -31,30 +31,32 @@ import (
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
- eventMux *event.TypeMux
-
filterMu sync.RWMutex
filterId int
filters map[int]*Filter
created map[int]time.Time
-
- quit chan struct{}
+ sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
- eventMux: mux,
- filters: make(map[int]*Filter),
- created: make(map[int]time.Time),
+ filters: make(map[int]*Filter),
+ created: make(map[int]time.Time),
}
+ fs.sub = mux.Subscribe(
+ //core.PendingBlockEvent{},
+ core.ChainEvent{},
+ core.TxPreEvent{},
+ vm.Logs(nil),
+ )
go fs.filterLoop()
return fs
}
// Stop quits the filter loop required for polling events
func (fs *FilterSystem) Stop() {
- close(fs.quit)
+ fs.sub.Unsubscribe()
}
// Add adds a filter to the filter manager
@@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
// filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
- // Subscribe to events
- eventCh := fs.eventMux.Subscribe(
- //core.PendingBlockEvent{},
- core.ChainEvent{},
- core.TxPreEvent{},
- vm.Logs(nil),
- ).Chan()
-
-out:
- for {
- select {
- case <-fs.quit:
- break out
- case event, ok := <-eventCh:
- if !ok {
- // Event subscription closed, set the channel to nil to stop spinning
- eventCh = nil
- continue
- }
- // A real event arrived, notify the registered filters
- switch ev := event.Data.(type) {
- case core.ChainEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
- filter.BlockCallback(ev.Block, ev.Logs)
- }
+ for event := range fs.sub.Chan() {
+ switch ev := event.Data.(type) {
+ case core.ChainEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
+ filter.BlockCallback(ev.Block, ev.Logs)
}
- fs.filterMu.RUnlock()
+ }
+ fs.filterMu.RUnlock()
- case core.TxPreEvent:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
- filter.TransactionCallback(ev.Tx)
- }
+ case core.TxPreEvent:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
+ filter.TransactionCallback(ev.Tx)
}
- fs.filterMu.RUnlock()
-
- case vm.Logs:
- fs.filterMu.RLock()
- for id, filter := range fs.filters {
- if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
- msgs := filter.FilterLogs(ev)
- if len(msgs) > 0 {
- filter.LogsCallback(msgs)
- }
+ }
+ fs.filterMu.RUnlock()
+
+ case vm.Logs:
+ fs.filterMu.RLock()
+ for id, filter := range fs.filters {
+ if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
+ msgs := filter.FilterLogs(ev)
+ if len(msgs) > 0 {
+ filter.LogsCallback(msgs)
}
}
- fs.filterMu.RUnlock()
}
+ fs.filterMu.RUnlock()
}
}
}