diff options
Diffstat (limited to 'xeth/xeth.go')
-rw-r--r-- | xeth/xeth.go | 205 |
1 files changed, 204 insertions, 1 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go index 115bd787a..922fce8f1 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "math/big" + "sync" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -13,13 +15,17 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/whisper" ) -var pipelogger = logger.NewLogger("XETH") +var ( + pipelogger = logger.NewLogger("XETH") + filterTickerTime = 5 * time.Minute +) // to resolve the import cycle type Backend interface { @@ -71,6 +77,15 @@ type XEth struct { whisper *Whisper frontend Frontend + + quit chan struct{} + filterManager *filter.FilterManager + + logMut sync.RWMutex + logs map[int]*logFilter + + messagesMut sync.RWMutex + messages map[int]*whisperFilter } // dummyFrontend is a non-interactive frontend that allows all @@ -90,15 +105,55 @@ func New(eth Backend, frontend Frontend) *XEth { chainManager: eth.ChainManager(), accountManager: eth.AccountManager(), whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), frontend: frontend, + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), } if frontend == nil { xeth.frontend = dummyFrontend{} } xeth.state = NewState(xeth, xeth.chainManager.TransState()) + go xeth.start() + go xeth.filterManager.Start() + return xeth } +func (self *XEth) start() { + timer := time.NewTicker(2 * time.Second) +done: + for { + select { + case <-timer.C: + self.logMut.Lock() + self.messagesMut.Lock() + for id, filter := range self.logs { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.logs, id) + } + } + + for id, filter := range self.messages { + if time.Since(filter.timeout) > filterTickerTime { + self.Whisper().Unwatch(id) + delete(self.messages, id) + } + } + self.messagesMut.Unlock() + self.logMut.Unlock() + case <-self.quit: + break done + } + } +} + +func (self *XEth) stop() { + close(self.quit) +} + func (self *XEth) Backend() Backend { return self.eth } func (self *XEth) WithState(statedb *state.StateDB) *XEth { xeth := &XEth{ @@ -241,6 +296,121 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } +func (self *XEth) RegisterFilter(args *core.FilterOptions) int { + var id int + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + filter.LogsCallback = func(logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + self.logs[id].add(logs...) + } + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) UninstallFilter(id int) bool { + if _, ok := self.logs[id]; ok { + delete(self.logs, id) + self.filterManager.UninstallFilter(id) + return true + } + + return false +} + +func (self *XEth) NewFilterString(word string) int { + var id int + filter := core.NewFilter(self.Backend()) + + callback := func(block *types.Block, logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + for _, log := range logs { + self.logs[id].add(log) + } + self.logs[id].add(&state.StateLog{}) + } + + switch word { + case "pending": + filter.PendingCallback = callback + case "latest": + filter.BlockCallback = callback + } + + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) FilterChanged(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + if self.logs[id] != nil { + return self.logs[id].get() + } + + return nil +} + +func (self *XEth) Logs(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + filter := self.filterManager.GetFilter(id) + if filter != nil { + return filter.Find() + } + + return nil +} + +func (self *XEth) AllLogs(args *core.FilterOptions) state.Logs { + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + + return filter.Find() +} + +func (p *XEth) NewWhisperFilter(opts *Options) int { + var id int + opts.Fn = func(msg WhisperMessage) { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + p.messages[id].add(msg) // = append(p.messages[id], msg) + } + id = p.Whisper().Watch(opts) + p.messages[id] = &whisperFilter{timeout: time.Now()} + return id +} + +func (p *XEth) UninstallWhisperFilter(id int) bool { + if _, ok := p.messages[id]; ok { + delete(p.messages, id) + return true + } + + return false +} + +func (self *XEth) MessagesChanged(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + + if self.messages[id] != nil { + return self.messages[id].get() + } + + return nil +} + type KeyVal struct { Key string `json:"key"` Value string `json:"value"` @@ -411,3 +581,36 @@ func (m callmsg) GasPrice() *big.Int { return m.gasPrice } func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } + +type whisperFilter struct { + messages []WhisperMessage + timeout time.Time + id int +} + +func (w *whisperFilter) add(msgs ...WhisperMessage) { + w.messages = append(w.messages, msgs...) +} +func (w *whisperFilter) get() []WhisperMessage { + w.timeout = time.Now() + tmp := w.messages + w.messages = nil + return tmp +} + +type logFilter struct { + logs state.Logs + timeout time.Time + id int +} + +func (l *logFilter) add(logs ...state.Log) { + l.logs = append(l.logs, logs...) +} + +func (l *logFilter) get() state.Logs { + l.timeout = time.Now() + tmp := l.logs + l.logs = nil + return tmp +} |