From cdb2ebbdfa510294b8443e33c32f9e0ec414f78e Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 15 Dec 2014 12:08:10 +0100 Subject: Added old filter. Needs some refactoring --- event/filter/old_filter.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 event/filter/old_filter.go (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go new file mode 100644 index 000000000..1a9a88173 --- /dev/null +++ b/event/filter/old_filter.go @@ -0,0 +1,94 @@ +// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring +package filter + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +} -- cgit From 52b54631a47dfa46742635be178f2f8d33dd9f41 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 16 Dec 2014 19:55:57 +0100 Subject: Whisper watches fixes --- event/filter/generic_filter.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'event/filter') diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go index b04b4801e..2ce0f0642 100644 --- a/event/filter/generic_filter.go +++ b/event/filter/generic_filter.go @@ -2,19 +2,29 @@ package filter type Generic struct { Str1, Str2, Str3 string + Data map[string]struct{} Fn func(data interface{}) } +// self = registered, f = incoming func (self Generic) Compare(f Filter) bool { + var strMatch, dataMatch = true, true + filter := f.(Generic) - if (len(self.Str1) == 0 || filter.Str1 == self.Str1) && - (len(self.Str2) == 0 || filter.Str2 == self.Str2) && - (len(self.Str3) == 0 || filter.Str3 == self.Str3) { - return true + if (len(self.Str1) > 0 && filter.Str1 != self.Str1) || + (len(self.Str2) > 0 && filter.Str2 != self.Str2) || + (len(self.Str3) > 0 && filter.Str3 != self.Str3) { + strMatch = false + } + + for k, _ := range self.Data { + if _, ok := filter.Data[k]; !ok { + return false + } } - return false + return strMatch && dataMatch } func (self Generic) Trigger(data interface{}) { -- cgit From e3da85faedf21a3ddb73a0fa29decf65364e6c39 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sat, 10 Jan 2015 00:51:56 +0100 Subject: Implemented filter for ws + fixes * proper 0xhex * filters fixed * start of filter manager * accounts for ws. Closes #246 --- event/filter/old_filter.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go index 1a9a88173..6c7f053d4 100644 --- a/event/filter/old_filter.go +++ b/event/filter/old_filter.go @@ -2,6 +2,7 @@ package filter import ( + "fmt" "sync" "github.com/ethereum/go-ethereum/core" @@ -78,6 +79,7 @@ out: self.filterMu.RUnlock() case state.Messages: + fmt.Println("got messages") self.filterMu.RLock() for _, filter := range self.filters { if filter.MessageCallback != nil { -- cgit From 35fe4313d57e1df6c3c8af0bc0b530bd7033e21b Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 12 Jan 2015 10:19:27 +0100 Subject: pre-pow --- event/filter/old_filter.go | 2 -- 1 file changed, 2 deletions(-) (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go index 6c7f053d4..1a9a88173 100644 --- a/event/filter/old_filter.go +++ b/event/filter/old_filter.go @@ -2,7 +2,6 @@ package filter import ( - "fmt" "sync" "github.com/ethereum/go-ethereum/core" @@ -79,7 +78,6 @@ out: self.filterMu.RUnlock() case state.Messages: - fmt.Println("got messages") self.filterMu.RLock() for _, filter := range self.filters { if filter.MessageCallback != nil { -- cgit From 34689cb3f369ad71164b81d0c05238d78cb67945 Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 12 Jan 2015 20:36:45 +0100 Subject: Added manual triggering of filters --- event/filter/filter.go | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'event/filter') diff --git a/event/filter/filter.go b/event/filter/filter.go index 9817d5782..ca767f413 100644 --- a/event/filter/filter.go +++ b/event/filter/filter.go @@ -68,3 +68,11 @@ out: } } } + +func (self *Filters) Match(a, b Filter) bool { + return reflect.TypeOf(a) == reflect.TypeOf(b) && a.Compare(b) +} + +func (self *Filters) Get(i int) Filter { + return self.watchers[i] +} -- cgit From f3e78c8f3cd2196ef70a41f298b6df556543d581 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 28 Jan 2015 10:23:18 +0100 Subject: reworking messages => log --- event/filter/old_filter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go index 1a9a88173..c30a7e584 100644 --- a/event/filter/old_filter.go +++ b/event/filter/old_filter.go @@ -77,13 +77,13 @@ out: } self.filterMu.RUnlock() - case state.Messages: + case state.Logs: self.filterMu.RLock() for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(event) + if filter.LogsCallback != nil { + msgs := filter.FilterLogs(event) if len(msgs) > 0 { - filter.MessageCallback(msgs) + filter.LogsCallback(msgs) } } } -- cgit From 65158d39b0632226c168b9a3415365ca8f072cbf Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 4 Feb 2015 15:05:47 -0800 Subject: Filtering --- event/filter/old_filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go index c30a7e584..4c01572db 100644 --- a/event/filter/old_filter.go +++ b/event/filter/old_filter.go @@ -59,7 +59,7 @@ func (self *FilterManager) GetFilter(id int) *core.Filter { func (self *FilterManager) filterLoop() { // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Messages(nil)) + events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Logs(nil)) out: for { -- cgit From c64852dbccd0c8eb57cab994aefd0243c65b351b Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 5 Feb 2015 11:55:03 -0800 Subject: pending / chain event --- event/filter/old_filter.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'event/filter') diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go index 4c01572db..ab0127ffb 100644 --- a/event/filter/old_filter.go +++ b/event/filter/old_filter.go @@ -59,7 +59,7 @@ func (self *FilterManager) GetFilter(id int) *core.Filter { func (self *FilterManager) filterLoop() { // Subscribe to events - events := self.eventMux.Subscribe(core.NewBlockEvent{}, state.Logs(nil)) + events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) out: for { @@ -77,6 +77,15 @@ out: } self.filterMu.RUnlock() + case core.PendingBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.PendingCallback != nil { + filter.PendingCallback(event.Block) + } + } + self.filterMu.RUnlock() + case state.Logs: self.filterMu.RLock() for _, filter := range self.filters { -- cgit From 44eafb15e0581ef37c3e3cfeccb703381acc2ae2 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sat, 7 Feb 2015 17:03:12 +0100 Subject: Renamed filter --- event/filter/eth_filter.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ event/filter/old_filter.go | 103 -------------------------------------------- 2 files changed, 104 insertions(+), 103 deletions(-) create mode 100644 event/filter/eth_filter.go delete mode 100644 event/filter/old_filter.go (limited to 'event/filter') diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go new file mode 100644 index 000000000..295fcfbbf --- /dev/null +++ b/event/filter/eth_filter.go @@ -0,0 +1,104 @@ +package filter + +// TODO make use of the generic filtering system + +import ( + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/state" +) + +type FilterManager struct { + eventMux *event.TypeMux + + filterMu sync.RWMutex + filterId int + filters map[int]*core.Filter + + quit chan struct{} +} + +func NewFilterManager(mux *event.TypeMux) *FilterManager { + return &FilterManager{ + eventMux: mux, + filters: make(map[int]*core.Filter), + } +} + +func (self *FilterManager) Start() { + go self.filterLoop() +} + +func (self *FilterManager) Stop() { + close(self.quit) +} + +func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { + self.filterMu.Lock() + id = self.filterId + self.filters[id] = filter + self.filterId++ + self.filterMu.Unlock() + return id +} + +func (self *FilterManager) UninstallFilter(id int) { + self.filterMu.Lock() + delete(self.filters, id) + self.filterMu.Unlock() +} + +// GetFilter retrieves a filter installed using InstallFilter. +// The filter may not be modified. +func (self *FilterManager) GetFilter(id int) *core.Filter { + self.filterMu.RLock() + defer self.filterMu.RUnlock() + return self.filters[id] +} + +func (self *FilterManager) filterLoop() { + // Subscribe to events + events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) + +out: + for { + select { + case <-self.quit: + break out + case event := <-events.Chan(): + switch event := event.(type) { + case core.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case core.PendingBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.PendingCallback != nil { + filter.PendingCallback(event.Block) + } + } + self.filterMu.RUnlock() + + case state.Logs: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.LogsCallback != nil { + msgs := filter.FilterLogs(event) + if len(msgs) > 0 { + filter.LogsCallback(msgs) + } + } + } + self.filterMu.RUnlock() + } + } + } +} diff --git a/event/filter/old_filter.go b/event/filter/old_filter.go deleted file mode 100644 index ab0127ffb..000000000 --- a/event/filter/old_filter.go +++ /dev/null @@ -1,103 +0,0 @@ -// XXX This is the old filter system specifically for messages. This is till in used and could use some refactoring -package filter - -import ( - "sync" - - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/state" -) - -type FilterManager struct { - eventMux *event.TypeMux - - filterMu sync.RWMutex - filterId int - filters map[int]*core.Filter - - quit chan struct{} -} - -func NewFilterManager(mux *event.TypeMux) *FilterManager { - return &FilterManager{ - eventMux: mux, - filters: make(map[int]*core.Filter), - } -} - -func (self *FilterManager) Start() { - go self.filterLoop() -} - -func (self *FilterManager) Stop() { - close(self.quit) -} - -func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { - self.filterMu.Lock() - id = self.filterId - self.filters[id] = filter - self.filterId++ - self.filterMu.Unlock() - return id -} - -func (self *FilterManager) UninstallFilter(id int) { - self.filterMu.Lock() - delete(self.filters, id) - self.filterMu.Unlock() -} - -// GetFilter retrieves a filter installed using InstallFilter. -// The filter may not be modified. -func (self *FilterManager) GetFilter(id int) *core.Filter { - self.filterMu.RLock() - defer self.filterMu.RUnlock() - return self.filters[id] -} - -func (self *FilterManager) filterLoop() { - // Subscribe to events - events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) - -out: - for { - select { - case <-self.quit: - break out - case event := <-events.Chan(): - switch event := event.(type) { - case core.NewBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case core.PendingBlockEvent: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.PendingCallback != nil { - filter.PendingCallback(event.Block) - } - } - self.filterMu.RUnlock() - - case state.Logs: - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.LogsCallback != nil { - msgs := filter.FilterLogs(event) - if len(msgs) > 0 { - filter.LogsCallback(msgs) - } - } - } - self.filterMu.RUnlock() - } - } - } -} -- cgit From 7fc9b5b3f9ca0111cc4bc1b2a6b4bb2eccd3e048 Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 17 Feb 2015 22:20:47 +0100 Subject: Changed to ChainEvent and fixed a nil pointer in transact --- event/filter/eth_filter.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'event/filter') diff --git a/event/filter/eth_filter.go b/event/filter/eth_filter.go index 295fcfbbf..d298d914d 100644 --- a/event/filter/eth_filter.go +++ b/event/filter/eth_filter.go @@ -60,7 +60,10 @@ func (self *FilterManager) GetFilter(id int) *core.Filter { func (self *FilterManager) filterLoop() { // Subscribe to events - events := self.eventMux.Subscribe(core.PendingBlockEvent{}, core.NewBlockEvent{}, state.Logs(nil)) + events := self.eventMux.Subscribe( + core.PendingBlockEvent{}, + core.ChainEvent{}, + state.Logs(nil)) out: for { @@ -69,7 +72,7 @@ out: break out case event := <-events.Chan(): switch event := event.(type) { - case core.NewBlockEvent: + case core.ChainEvent: self.filterMu.RLock() for _, filter := range self.filters { if filter.BlockCallback != nil { -- cgit