diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-28 17:17:41 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-28 17:17:41 +0800 |
commit | 99027c79fe7406919d654ab482d8ad37fcf098ce (patch) | |
tree | 9f6625b30b9d63d28f7e86c69d31991daac4fbf7 /xeth/xeth.go | |
parent | bac455c0117f7095ee9c60ac75a249ddd66c2660 (diff) | |
parent | a05c420371aa56657b86ba3dce6ebb087adb708d (diff) | |
download | dexon-99027c79fe7406919d654ab482d8ad37fcf098ce.tar.gz dexon-99027c79fe7406919d654ab482d8ad37fcf098ce.tar.zst dexon-99027c79fe7406919d654ab482d8ad37fcf098ce.zip |
Merge branch 'develop' of github.com-obscure:ethereum/go-ethereum into develop
Conflicts:
rpc/api.go
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r-- | xeth/xeth.go | 68 |
1 files changed, 39 insertions, 29 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go index 710fec5c5..692fb338c 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -97,7 +97,7 @@ done: } for id, filter := range self.messages { - if time.Since(filter.timeout) > filterTickerTime { + if time.Since(filter.activity()) > filterTickerTime { self.Whisper().Unwatch(id) delete(self.messages, id) } @@ -456,35 +456,61 @@ func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []strin return filter.Find() } -func (p *XEth) NewWhisperFilter(opts *Options) int { +// NewWhisperFilter creates and registers a new message filter to watch for +// inbound whisper messages. All parameters at this point are assumed to be +// HEX encoded. +func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int { + // Pre-define the id to be filled later var id int - opts.Fn = func(msg WhisperMessage) { - p.messagesMut.Lock() - defer p.messagesMut.Unlock() - p.messages[id].add(msg) // = append(p.messages[id], msg) + + // Callback to delegate core whisper messages to this xeth filter + callback := func(msg WhisperMessage) { + p.messagesMut.RLock() // Only read lock to the filter pool + defer p.messagesMut.RUnlock() + p.messages[id].insert(msg) } - id = p.Whisper().Watch(opts) - p.messages[id] = &whisperFilter{timeout: time.Now()} + // Initialize the core whisper filter and wrap into xeth + id = p.Whisper().Watch(to, from, topics, callback) + + p.messagesMut.Lock() + p.messages[id] = newWhisperFilter(id, p.Whisper()) + p.messagesMut.Unlock() + return id } +// UninstallWhisperFilter disables and removes an existing filter. func (p *XEth) UninstallWhisperFilter(id int) bool { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + if _, ok := p.messages[id]; ok { delete(p.messages, id) return true } - return false } -func (self *XEth) MessagesChanged(id int) []WhisperMessage { - self.messagesMut.Lock() - defer self.messagesMut.Unlock() +// WhisperMessages retrieves all the known messages that match a specific filter. +func (self *XEth) WhisperMessages(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() if self.messages[id] != nil { - return self.messages[id].get() + return self.messages[id].messages() } + return nil +} + +// WhisperMessagesChanged retrieves all the new messages matched by a filter +// since the last retrieval +func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() + if self.messages[id] != nil { + return self.messages[id].retrieve() + } return nil } @@ -735,22 +761,6 @@ func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } -type whisperFilter struct { - messages []WhisperMessage - timeout time.Time - id int -} - -func (w *whisperFilter) add(msgs ...WhisperMessage) { - w.messages = append(w.messages, msgs...) -} -func (w *whisperFilter) get() []WhisperMessage { - w.timeout = time.Now() - tmp := w.messages - w.messages = nil - return tmp -} - type logFilter struct { logs state.Logs timeout time.Time |