From 36cdab206849c7e363e0b9911553098c3e8ca644 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:58:31 +0200 Subject: all: use (blocking) event package instead of ethreact --- ethchain/dagger.go | 7 +++--- ethchain/events.go | 10 ++++++++ ethchain/state_manager.go | 55 ++++++++++++++++---------------------------- ethchain/transaction_pool.go | 3 ++- 4 files changed, 35 insertions(+), 40 deletions(-) create mode 100644 ethchain/events.go (limited to 'ethchain') diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 916d7e9c8..2d2b5720f 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" ) @@ -16,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethreact.Event) []byte + Search(block *Block, stop <-chan struct{}) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool GetHashrate() int64 Turbo(bool) @@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) { pow.turbo = on } -func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { +func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty @@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { for { select { - case <-reactChan: + case <-stop: powlogger.Infoln("Breaking from mining") return nil default: diff --git a/ethchain/events.go b/ethchain/events.go new file mode 100644 index 000000000..05c21edfe --- /dev/null +++ b/ethchain/events.go @@ -0,0 +1,10 @@ +package ethchain + +type TxEvent struct { + Type int // TxPre || TxPost + Tx *Transaction +} + +type NewBlockEvent struct { + Block *Block +} diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 589b99ac2..b71cbe8a1 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -11,11 +11,10 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) var statelogger = ethlog.NewLogger("STATE") @@ -37,7 +36,6 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool @@ -45,7 +43,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database - Eventer() *eventer.EventMachine + EventMux() *event.TypeMux } type StateManager struct { @@ -73,17 +71,15 @@ type StateManager struct { // 'Process' & canonical validation. lastAttemptedBlock *Block - // Quit chan - quit chan bool + events event.Subscription } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - eth: ethereum, - bc: ethereum.BlockChain(), - quit: make(chan bool), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager { func (self *StateManager) Start() { statelogger.Debugln("Starting state manager") - + self.events = self.eth.EventMux().Subscribe(Blocks(nil)) go self.updateThread() } func (self *StateManager) Stop() { statelogger.Debugln("Stopping state manager") - - close(self.quit) + self.events.Unsubscribe() } func (self *StateManager) updateThread() { - blockChan := self.eth.Eventer().Register("blocks") - -out: - for { - select { - case event := <-blockChan: - blocks := event.Data.(Blocks) - for _, block := range blocks { - err := self.Process(block, false) - if err != nil { - statelogger.Infoln(err) - statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - statelogger.Debugln(block) - break - } + for ev := range self.events.Chan() { + for _, block := range ev.(Blocks) { + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + break } - - case <-self.quit: - break out } } } @@ -202,7 +187,7 @@ done: } // Notify all subscribers - self.eth.Reactor().Post("newTx:post", tx) + self.eth.EventMux().Post(TxEvent{TxPost, tx}) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.eth.Reactor().Post("newBlock", block) + sm.eth.EventMux().Post(NewBlockEvent{block}) state.Manifest().Reset() } @@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.eth.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.EventMux().Post(state.Manifest().Messages) return bloomf } diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index da6c3d6ba..0676af3a3 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -24,6 +24,7 @@ type TxMsgTy byte const ( TxPre = iota TxPost + minGasPrice = 1000000 ) @@ -160,7 +161,7 @@ out: txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash()) // Notify the subscribers - pool.Ethereum.Reactor().Post("newTx:pre", tx) + pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx}) } case <-pool.quit: break out -- cgit