From 6c04c19eb4506efa5f6de47561025b3702619f79 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Thu, 19 Mar 2015 22:58:07 -0400 Subject: Reorg filter logic to XEth --- xeth/xeth.go | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 1 deletion(-) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index 115bd787a..922fce8f1 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "math/big" + "sync" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -13,13 +15,17 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/whisper" ) -var pipelogger = logger.NewLogger("XETH") +var ( + pipelogger = logger.NewLogger("XETH") + filterTickerTime = 5 * time.Minute +) // to resolve the import cycle type Backend interface { @@ -71,6 +77,15 @@ type XEth struct { whisper *Whisper frontend Frontend + + quit chan struct{} + filterManager *filter.FilterManager + + logMut sync.RWMutex + logs map[int]*logFilter + + messagesMut sync.RWMutex + messages map[int]*whisperFilter } // dummyFrontend is a non-interactive frontend that allows all @@ -90,15 +105,55 @@ func New(eth Backend, frontend Frontend) *XEth { chainManager: eth.ChainManager(), accountManager: eth.AccountManager(), whisper: NewWhisper(eth.Whisper()), + quit: make(chan struct{}), + filterManager: filter.NewFilterManager(eth.EventMux()), frontend: frontend, + logs: make(map[int]*logFilter), + messages: make(map[int]*whisperFilter), } if frontend == nil { xeth.frontend = dummyFrontend{} } xeth.state = NewState(xeth, xeth.chainManager.TransState()) + go xeth.start() + go xeth.filterManager.Start() + return xeth } +func (self *XEth) start() { + timer := time.NewTicker(2 * time.Second) +done: + for { + select { + case <-timer.C: + self.logMut.Lock() + self.messagesMut.Lock() + for id, filter := range self.logs { + if time.Since(filter.timeout) > filterTickerTime { + self.filterManager.UninstallFilter(id) + delete(self.logs, id) + } + } + + for id, filter := range self.messages { + if time.Since(filter.timeout) > filterTickerTime { + self.Whisper().Unwatch(id) + delete(self.messages, id) + } + } + self.messagesMut.Unlock() + self.logMut.Unlock() + case <-self.quit: + break done + } + } +} + +func (self *XEth) stop() { + close(self.quit) +} + func (self *XEth) Backend() Backend { return self.eth } func (self *XEth) WithState(statedb *state.StateDB) *XEth { xeth := &XEth{ @@ -241,6 +296,121 @@ func (self *XEth) SecretToAddress(key string) string { return common.ToHex(pair.Address()) } +func (self *XEth) RegisterFilter(args *core.FilterOptions) int { + var id int + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + filter.LogsCallback = func(logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + self.logs[id].add(logs...) + } + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) UninstallFilter(id int) bool { + if _, ok := self.logs[id]; ok { + delete(self.logs, id) + self.filterManager.UninstallFilter(id) + return true + } + + return false +} + +func (self *XEth) NewFilterString(word string) int { + var id int + filter := core.NewFilter(self.Backend()) + + callback := func(block *types.Block, logs state.Logs) { + self.logMut.Lock() + defer self.logMut.Unlock() + + for _, log := range logs { + self.logs[id].add(log) + } + self.logs[id].add(&state.StateLog{}) + } + + switch word { + case "pending": + filter.PendingCallback = callback + case "latest": + filter.BlockCallback = callback + } + + id = self.filterManager.InstallFilter(filter) + self.logs[id] = &logFilter{timeout: time.Now()} + + return id +} + +func (self *XEth) FilterChanged(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + if self.logs[id] != nil { + return self.logs[id].get() + } + + return nil +} + +func (self *XEth) Logs(id int) state.Logs { + self.logMut.Lock() + defer self.logMut.Unlock() + + filter := self.filterManager.GetFilter(id) + if filter != nil { + return filter.Find() + } + + return nil +} + +func (self *XEth) AllLogs(args *core.FilterOptions) state.Logs { + filter := core.NewFilter(self.Backend()) + filter.SetOptions(args) + + return filter.Find() +} + +func (p *XEth) NewWhisperFilter(opts *Options) int { + var id int + opts.Fn = func(msg WhisperMessage) { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + p.messages[id].add(msg) // = append(p.messages[id], msg) + } + id = p.Whisper().Watch(opts) + p.messages[id] = &whisperFilter{timeout: time.Now()} + return id +} + +func (p *XEth) UninstallWhisperFilter(id int) bool { + if _, ok := p.messages[id]; ok { + delete(p.messages, id) + return true + } + + return false +} + +func (self *XEth) MessagesChanged(id int) []WhisperMessage { + self.messagesMut.Lock() + defer self.messagesMut.Unlock() + + if self.messages[id] != nil { + return self.messages[id].get() + } + + return nil +} + type KeyVal struct { Key string `json:"key"` Value string `json:"value"` @@ -411,3 +581,36 @@ func (m callmsg) GasPrice() *big.Int { return m.gasPrice } func (m callmsg) Gas() *big.Int { return m.gas } func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Data() []byte { return m.data } + +type whisperFilter struct { + messages []WhisperMessage + timeout time.Time + id int +} + +func (w *whisperFilter) add(msgs ...WhisperMessage) { + w.messages = append(w.messages, msgs...) +} +func (w *whisperFilter) get() []WhisperMessage { + w.timeout = time.Now() + tmp := w.messages + w.messages = nil + return tmp +} + +type logFilter struct { + logs state.Logs + timeout time.Time + id int +} + +func (l *logFilter) add(logs ...state.Log) { + l.logs = append(l.logs, logs...) +} + +func (l *logFilter) get() state.Logs { + l.timeout = time.Now() + tmp := l.logs + l.logs = nil + return tmp +} -- cgit From 19360c00795d356d052a379663c3f36aedba3f9e Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Thu, 19 Mar 2015 23:28:45 -0400 Subject: Move stateAt func to XEth --- xeth/xeth.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index 922fce8f1..504a93f58 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -154,6 +154,24 @@ func (self *XEth) stop() { close(self.quit) } +func (self *XEth) AtStateNum(num int64) *XEth { + chain := self.Backend().ChainManager() + var block *types.Block + + if num < 0 { + num = chain.CurrentBlock().Number().Int64() + num + 1 + } + block = chain.GetBlockByNumber(uint64(num)) + + var st *state.StateDB + if block != nil { + st = state.New(block.Root(), self.Backend().StateDb()) + } else { + st = chain.State() + } + return self.WithState(st) +} + func (self *XEth) Backend() Backend { return self.eth } func (self *XEth) WithState(statedb *state.StateDB) *XEth { xeth := &XEth{ -- cgit From 754160afea7fc230c3236d5494beefeb03b94140 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Fri, 20 Mar 2015 00:23:48 -0400 Subject: Move gas defaults to XEth --- xeth/xeth.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index 504a93f58..690db5135 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -25,6 +25,8 @@ import ( var ( pipelogger = logger.NewLogger("XETH") filterTickerTime = 5 * time.Minute + defaultGasPrice = big.NewInt(10000000000000) //150000000000 + defaultGas = big.NewInt(90000) //500000 ) // to resolve the import cycle @@ -154,6 +156,9 @@ func (self *XEth) stop() { close(self.quit) } +func (self *XEth) DefaultGas() *big.Int { return defaultGas } +func (self *XEth) DefaultGasPrice() *big.Int { return defaultGasPrice } + func (self *XEth) AtStateNum(num int64) *XEth { chain := self.Backend().ChainManager() var block *types.Block @@ -486,11 +491,6 @@ func (self *XEth) PushTx(encodedTx string) (string, error) { return common.ToHex(tx.Hash()), nil } -var ( - defaultGasPrice = big.NewInt(10000000000000) - defaultGas = big.NewInt(90000) -) - func (self *XEth) Call(fromStr, toStr, valueStr, gasStr, gasPriceStr, dataStr string) (string, error) { statedb := self.State().State() //self.chainManager.TransState() msg := callmsg{ -- cgit From b56e20be2760343147f72ca62a8db8bd216903bf Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Fri, 20 Mar 2015 00:24:23 -0400 Subject: Reorg for clarity --- xeth/xeth.go | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index 690db5135..3d44e292c 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -70,6 +70,13 @@ type Frontend interface { ConfirmTransaction(tx *types.Transaction) bool } +// dummyFrontend is a non-interactive frontend that allows all +// transactions but cannot not unlock any keys. +type dummyFrontend struct{} + +func (dummyFrontend) UnlockAccount([]byte) bool { return false } +func (dummyFrontend) ConfirmTransaction(*types.Transaction) bool { return true } + type XEth struct { eth Backend blockProcessor *core.BlockProcessor @@ -90,13 +97,6 @@ type XEth struct { messages map[int]*whisperFilter } -// dummyFrontend is a non-interactive frontend that allows all -// transactions but cannot not unlock any keys. -type dummyFrontend struct{} - -func (dummyFrontend) UnlockAccount([]byte) bool { return false } -func (dummyFrontend) ConfirmTransaction(*types.Transaction) bool { return true } - // New creates an XEth that uses the given frontend. // If a nil Frontend is provided, a default frontend which // confirms all transactions will be used. @@ -527,6 +527,28 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt contractCreation bool ) + // TODO if no_private_key then + //if _, exists := p.register[args.From]; exists { + // p.register[args.From] = append(p.register[args.From], args) + //} else { + /* + account := accounts.Get(common.FromHex(args.From)) + if account != nil { + if account.Unlocked() { + if !unlockAccount(account) { + return + } + } + + result, _ := account.Transact(common.FromHex(args.To), common.FromHex(args.Value), common.FromHex(args.Gas), common.FromHex(args.GasPrice), common.FromHex(args.Data)) + if len(result) > 0 { + *reply = common.ToHex(result) + } + } else if _, exists := p.register[args.From]; exists { + p.register[ags.From] = append(p.register[args.From], args) + } + */ + from = common.FromHex(fromStr) data = common.FromHex(codeStr) to = common.FromHex(toStr) -- cgit From aa3918efa711a51c241a70d675b27fc0f0c01ec3 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Fri, 20 Mar 2015 07:13:29 +0100 Subject: Move transact gas check to XEth --- xeth/xeth.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index 3d44e292c..e1e25ba09 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -521,8 +521,8 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt from []byte to []byte value = common.NewValue(valueStr) - gas = common.NewValue(gasStr) - price = common.NewValue(gasPriceStr) + gas = common.Big(gasStr) + price = common.Big(gasPriceStr) data []byte contractCreation bool ) @@ -549,6 +549,16 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt } */ + // TODO: align default values to have the same type, e.g. not depend on + // common.Value conversions later on + if gas.Cmp(big.NewInt(0)) == 0 { + gas = defaultGas + } + + if price.Cmp(big.NewInt(0)) == 0 { + price = defaultGasPrice + } + from = common.FromHex(fromStr) data = common.FromHex(codeStr) to = common.FromHex(toStr) @@ -558,9 +568,9 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt var tx *types.Transaction if contractCreation { - tx = types.NewContractCreationTx(value.BigInt(), gas.BigInt(), price.BigInt(), data) + tx = types.NewContractCreationTx(value.BigInt(), gas, price, data) } else { - tx = types.NewTransactionMessage(to, value.BigInt(), gas.BigInt(), price.BigInt(), data) + tx = types.NewTransactionMessage(to, value.BigInt(), gas, price, data) } state := self.chainManager.TxState() -- cgit From efcc93e7da9f47b99fc9252dc741b20086aeb4b2 Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Fri, 20 Mar 2015 14:12:07 +0100 Subject: Move Account register to xeth --- xeth/xeth.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'xeth/xeth.go') diff --git a/xeth/xeth.go b/xeth/xeth.go index e1e25ba09..636ee32c9 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -95,6 +95,9 @@ type XEth struct { messagesMut sync.RWMutex messages map[int]*whisperFilter + + // regmut sync.Mutex + // register map[string][]*interface{} // TODO improve return type } // New creates an XEth that uses the given frontend. @@ -434,6 +437,39 @@ func (self *XEth) MessagesChanged(id int) []WhisperMessage { return nil } +// func (self *XEth) Register(args string) bool { +// self.regmut.Lock() +// defer self.regmut.Unlock() + +// if _, ok := self.register[args]; ok { +// self.register[args] = nil // register with empty +// } +// return true +// } + +// func (self *XEth) Unregister(args string) bool { +// self.regmut.Lock() +// defer self.regmut.Unlock() + +// if _, ok := self.register[args]; ok { +// delete(self.register, args) +// return true +// } + +// return false +// } + +// // TODO improve return type +// func (self *XEth) PullWatchTx(args string) []*interface{} { +// self.regmut.Lock() +// defer self.regmut.Unlock() + +// txs := self.register[args] +// self.register[args] = nil + +// return txs +// } + type KeyVal struct { Key string `json:"key"` Value string `json:"value"` -- cgit