aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/backend.go63
-rw-r--r--event/filter/old_filter.go94
2 files changed, 100 insertions, 57 deletions
diff --git a/eth/backend.go b/eth/backend.go
index 5b7dc6435..fb401a68d 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -14,7 +14,6 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/pow/ezp"
"github.com/ethereum/go-ethereum/rpc"
- "github.com/ethereum/go-ethereum/state"
"github.com/ethereum/go-ethereum/whisper"
)
@@ -75,7 +74,6 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke
clientIdentity: identity,
blacklist: p2p.NewBlacklist(),
eventMux: &event.TypeMux{},
- filters: make(map[int]*core.Filter),
}
eth.txPool = core.NewTxPool(eth)
@@ -83,6 +81,7 @@ func New(db ethutil.Database, identity p2p.ClientIdentity, keyManager *crypto.Ke
eth.blockManager = core.NewBlockManager(eth)
eth.chainManager.SetProcessor(eth.blockManager)
eth.whisper = whisper.New()
+ eth.filterManager = filter.NewFilterManager(eth.EventMux())
hasBlock := eth.chainManager.HasBlock
insertChain := eth.chainManager.InsertChain
@@ -164,8 +163,7 @@ func (s *Ethereum) Start(seed bool) error {
}
s.blockPool.Start()
s.whisper.Start()
-
- go s.filterLoop()
+ s.filterManager.Start()
// broadcast transactions
s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
@@ -267,58 +265,9 @@ func saveProtocolVersion(db ethutil.Database) {
}
}
-// InstallFilter adds filter for blockchain events.
-// The filter's callbacks will run for matching blocks and messages.
-// The filter should not be modified after it has been installed.
+// XXX Refactor me & MOVE
func (self *Ethereum) InstallFilter(filter *core.Filter) (id int) {
- self.filterMu.Lock()
- id = self.filterId
- self.filters[id] = filter
- self.filterId++
- self.filterMu.Unlock()
- return id
-}
-
-func (self *Ethereum) UninstallFilter(id int) {
- self.filterMu.Lock()
- delete(self.filters, id)
- self.filterMu.Unlock()
-}
-
-// GetFilter retrieves a filter installed using InstallFilter.
-// The filter may not be modified.
-func (self *Ethereum) GetFilter(id int) *core.Filter {
- self.filterMu.RLock()
- defer self.filterMu.RUnlock()
- return self.filters[id]
-}
-
-func (self *Ethereum) filterLoop() {
- // Subscribe to events
- events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
- for event := range events.Chan() {
- switch event.(type) {
- case core.NewBlockEvent:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.BlockCallback != nil {
- e := event.(core.NewBlockEvent)
- filter.BlockCallback(e.Block)
- }
- }
- self.filterMu.RUnlock()
- case state.Messages:
- self.filterMu.RLock()
- for _, filter := range self.filters {
- if filter.MessageCallback != nil {
- e := event.(state.Messages)
- msgs := filter.FilterMessages(e)
- if len(msgs) > 0 {
- filter.MessageCallback(msgs)
- }
- }
- }
- self.filterMu.RUnlock()
- }
- }
+ return self.filterManager.InstallFilter(filter)
}
+func (self *Ethereum) UninstallFilter(id int) { self.filterManager.UninstallFilter(id) }
+func (self *Ethereum) GetFilter(id int) *core.Filter { return self.filterManager.GetFilter(id) }
diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go
new file mode 100644
index 000000000..1a9a88173
--- /dev/null
+++ b/event/filter/old_filter.go
@@ -0,0 +1,94 @@
+// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring
+package filter
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/state"
+)
+
+type FilterManager struct {
+ eventMux *event.TypeMux
+
+ filterMu sync.RWMutex
+ filterId int
+ filters map[int]*core.Filter
+
+ quit chan struct{}
+}
+
+func NewFilterManager(mux *event.TypeMux) *FilterManager {
+ return &FilterManager{
+ eventMux: mux,
+ filters: make(map[int]*core.Filter),
+ }
+}
+
+func (self *FilterManager) Start() {
+ go self.filterLoop()
+}
+
+func (self *FilterManager) Stop() {
+ close(self.quit)
+}
+
+func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) {
+ self.filterMu.Lock()
+ id = self.filterId
+ self.filters[id] = filter
+ self.filterId++
+ self.filterMu.Unlock()
+ return id
+}
+
+func (self *FilterManager) UninstallFilter(id int) {
+ self.filterMu.Lock()
+ delete(self.filters, id)
+ self.filterMu.Unlock()
+}
+
+// GetFilter retrieves a filter installed using InstallFilter.
+// The filter may not be modified.
+func (self *FilterManager) GetFilter(id int) *core.Filter {
+ self.filterMu.RLock()
+ defer self.filterMu.RUnlock()
+ return self.filters[id]
+}
+
+func (self *FilterManager) filterLoop() {
+ // Subscribe to events
+ events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil))
+
+out:
+ for {
+ select {
+ case <-self.quit:
+ break out
+ case event := <-events.Chan():
+ switch event := event.(type) {
+ case core.NewBlockEvent:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.BlockCallback != nil {
+ filter.BlockCallback(event.Block)
+ }
+ }
+ self.filterMu.RUnlock()
+
+ case state.Messages:
+ self.filterMu.RLock()
+ for _, filter := range self.filters {
+ if filter.MessageCallback != nil {
+ msgs := filter.FilterMessages(event)
+ if len(msgs) > 0 {
+ filter.MessageCallback(msgs)
+ }
+ }
+ }
+ self.filterMu.RUnlock()
+ }
+ }
+ }
+}