diff options
author | obscuren <geffobscura@gmail.com> | 2014-09-29 18:57:51 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2014-09-29 18:57:51 +0800 |
commit | ab6ede51d7fedb9270cab08ee732a834be34dab2 (patch) | |
tree | d8252f27d51c456e637140a312cadfe2ced71528 | |
parent | ea0357bf02b61db94bd0ad8806ba7337a55a4f79 (diff) | |
download | dexon-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.gz dexon-ab6ede51d7fedb9270cab08ee732a834be34dab2.tar.zst dexon-ab6ede51d7fedb9270cab08ee732a834be34dab2.zip |
Working on new (blocking) event machine.
The new event machine will be used for loose coupling and handle the
communications between the services:
1) Block pool finds blocks which "links" with our current canonical
chain
2) Posts the blocks on to the event machine
3) State manager receives blocks & processes them
4) Broadcasts new post block event
-rw-r--r-- | block_pool.go | 38 | ||||
-rw-r--r-- | ethchain/block_chain_test.go | 144 | ||||
-rw-r--r-- | ethchain/helper_test.go | 88 | ||||
-rw-r--r-- | ethchain/state_manager.go | 71 | ||||
-rw-r--r-- | ethereum.go | 11 | ||||
-rw-r--r-- | eventer/eventer.go | 79 | ||||
-rw-r--r-- | eventer/eventer_test.go | 66 | ||||
-rw-r--r-- | peer.go | 4 | ||||
-rw-r--r-- | types/ethereum.go | 1 |
9 files changed, 322 insertions, 180 deletions
diff --git a/block_pool.go b/block_pool.go index 4ac096bda..957b7601b 100644 --- a/block_pool.go +++ b/block_pool.go @@ -1,6 +1,7 @@ package eth import ( + "bytes" "container/list" "math" "math/big" @@ -236,22 +237,31 @@ out: case <-self.quit: break out case <-procTimer.C: - // XXX We can optimize this lifting this on to a new goroutine. - // We'd need to make sure that the pools are properly protected by a mutex - // XXX This should moved in The Great Refactor(TM) - amount := self.ProcessCanonical(func(block *ethchain.Block) { - err := self.eth.StateManager().Process(block, false) - if err != nil { - poollogger.Infoln(err) - poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - poollogger.Debugln(block) + blocks := self.Blocks() + ethchain.BlockBy(ethchain.Number).Sort(blocks) + + if len(blocks) > 0 { + if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) { + for i, block := range blocks[1:] { + // NOTE: The Ith element in this loop refers to the previous block in + // outer "blocks" + if bytes.Compare(block.PrevHash, blocks[i].Hash()) != 0 { + blocks = blocks[:i] + + break + } + } + } else { + blocks = nil } - }) + } + + // Handle in batches of 4k + max := int(math.Min(4000, float64(len(blocks)))) + for _, block := range blocks[:max] { + self.eth.Eventer().Post("block", block) - // Do not propagate to the network on catchups - if amount == 1 { - block := self.eth.BlockChain().CurrentBlock - self.eth.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) + self.Remove(block.Hash()) } } } diff --git a/ethchain/block_chain_test.go b/ethchain/block_chain_test.go index 1edcf9c7b..3603fd8a7 100644 --- a/ethchain/block_chain_test.go +++ b/ethchain/block_chain_test.go @@ -1,145 +1 @@ package ethchain - -import ( - "container/list" - "fmt" - "testing" - - "github.com/ethereum/eth-go/ethcrypto" - "github.com/ethereum/eth-go/ethdb" - "github.com/ethereum/eth-go/ethreact" - "github.com/ethereum/eth-go/ethutil" - "github.com/ethereum/eth-go/ethwire" -) - -// Implement our EthTest Manager -type TestManager struct { - stateManager *StateManager - reactor *ethreact.ReactorEngine - - txPool *TxPool - blockChain *BlockChain - Blocks []*Block -} - -func (s *TestManager) IsListening() bool { - return false -} - -func (s *TestManager) IsMining() bool { - return false -} - -func (s *TestManager) PeerCount() int { - return 0 -} - -func (s *TestManager) Peers() *list.List { - return list.New() -} - -func (s *TestManager) BlockChain() *BlockChain { - return s.blockChain -} - -func (tm *TestManager) TxPool() *TxPool { - return tm.txPool -} - -func (tm *TestManager) StateManager() *StateManager { - return tm.stateManager -} - -func (tm *TestManager) Reactor() *ethreact.ReactorEngine { - return tm.reactor -} -func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) { - fmt.Println("Broadcast not implemented") -} - -func (tm *TestManager) ClientIdentity() ethwire.ClientIdentity { - return nil -} -func (tm *TestManager) KeyManager() *ethcrypto.KeyManager { - return nil -} - -func (tm *TestManager) Db() ethutil.Database { return nil } - -func NewTestManager() *TestManager { - ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH") - - db, err := ethdb.NewMemDatabase() - if err != nil { - fmt.Println("Could not create mem-db, failing") - return nil - } - ethutil.Config.Db = db - - testManager := &TestManager{} - testManager.reactor = ethreact.New() - - testManager.txPool = NewTxPool(testManager) - testManager.blockChain = NewBlockChain(testManager) - testManager.stateManager = NewStateManager(testManager) - - // Start the tx pool - testManager.txPool.Start() - - return testManager -} - -func (tm *TestManager) AddFakeBlock(blk []byte) error { - block := NewBlockFromBytes(blk) - tm.Blocks = append(tm.Blocks, block) - err := tm.StateManager().Process(block, false) - return err -} - -func (tm *TestManager) CreateChain1() error { - err := tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 58, 253, 98, 206, 198, 181, 152, 223, 201, 116, 197, 154, 111, 104, 54, 113, 249, 184, 246, 15, 226, 142, 187, 47, 138, 60, 201, 66, 226, 237, 29, 7, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 240, 0, 132, 83, 48, 32, 251, 128, 160, 4, 10, 11, 225, 132, 86, 146, 227, 229, 137, 164, 245, 16, 139, 219, 12, 251, 178, 154, 168, 210, 18, 84, 40, 250, 41, 124, 92, 169, 242, 246, 180, 192, 192}) - err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 222, 229, 152, 228, 200, 163, 244, 144, 120, 18, 203, 253, 195, 185, 105, 131, 163, 226, 116, 40, 140, 68, 249, 198, 221, 152, 121, 0, 124, 11, 180, 125, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 224, 4, 132, 83, 48, 36, 250, 128, 160, 79, 58, 51, 246, 238, 249, 210, 253, 136, 83, 71, 134, 49, 114, 190, 189, 242, 78, 100, 238, 101, 84, 204, 176, 198, 25, 139, 151, 60, 84, 51, 126, 192, 192}) - err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 68, 52, 33, 210, 160, 189, 217, 255, 78, 37, 196, 217, 94, 247, 166, 169, 224, 199, 102, 110, 85, 213, 45, 13, 173, 106, 4, 103, 151, 195, 38, 86, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 103, 109, 19, 120, 219, 91, 248, 48, 204, 17, 28, 7, 146, 72, 203, 15, 207, 251, 31, 216, 138, 26, 59, 34, 238, 40, 114, 233, 1, 13, 207, 90, 71, 136, 124, 86, 196, 127, 10, 176, 193, 154, 165, 76, 155, 154, 59, 45, 34, 96, 183, 212, 99, 41, 27, 40, 119, 171, 231, 160, 114, 56, 218, 173, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 208, 12, 132, 83, 48, 38, 206, 128, 160, 65, 147, 32, 128, 177, 198, 131, 57, 57, 68, 135, 65, 198, 178, 138, 43, 25, 135, 92, 174, 208, 119, 103, 225, 26, 207, 243, 31, 225, 29, 173, 119, 192, 192}) - return err -} -func (tm *TestManager) CreateChain2() error { - err := tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 58, 253, 98, 206, 198, 181, 152, 223, 201, 116, 197, 154, 111, 104, 54, 113, 249, 184, 246, 15, 226, 142, 187, 47, 138, 60, 201, 66, 226, 237, 29, 7, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 72, 201, 77, 81, 160, 103, 70, 18, 102, 204, 82, 192, 86, 157, 40, 30, 117, 218, 224, 202, 1, 36, 249, 88, 82, 210, 19, 156, 112, 31, 13, 117, 227, 0, 125, 221, 190, 165, 16, 193, 163, 161, 175, 33, 37, 184, 235, 62, 201, 93, 102, 185, 143, 54, 146, 114, 30, 253, 178, 245, 87, 38, 191, 214, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 240, 0, 132, 83, 48, 40, 35, 128, 160, 162, 214, 119, 207, 212, 186, 64, 47, 14, 186, 98, 118, 203, 79, 172, 205, 33, 206, 225, 177, 225, 194, 98, 188, 63, 219, 13, 151, 47, 32, 204, 27, 192, 192}) - err = tm.AddFakeBlock([]byte{248, 246, 248, 242, 160, 0, 210, 76, 6, 13, 18, 219, 190, 18, 250, 23, 178, 198, 117, 254, 85, 14, 74, 104, 116, 56, 144, 116, 172, 14, 3, 236, 99, 248, 228, 142, 91, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 184, 65, 4, 72, 201, 77, 81, 160, 103, 70, 18, 102, 204, 82, 192, 86, 157, 40, 30, 117, 218, 224, 202, 1, 36, 249, 88, 82, 210, 19, 156, 112, 31, 13, 117, 227, 0, 125, 221, 190, 165, 16, 193, 163, 161, 175, 33, 37, 184, 235, 62, 201, 93, 102, 185, 143, 54, 146, 114, 30, 253, 178, 245, 87, 38, 191, 214, 160, 80, 218, 177, 253, 147, 35, 101, 59, 37, 87, 97, 193, 119, 21, 132, 111, 93, 53, 152, 203, 38, 134, 25, 104, 138, 236, 92, 27, 176, 89, 229, 176, 160, 29, 204, 77, 232, 222, 199, 93, 122, 171, 133, 181, 103, 182, 204, 212, 26, 211, 18, 69, 27, 148, 138, 116, 19, 240, 161, 66, 253, 64, 212, 147, 71, 131, 63, 255, 252, 132, 83, 48, 40, 74, 128, 160, 185, 20, 138, 2, 210, 15, 71, 144, 89, 167, 94, 155, 148, 118, 170, 157, 122, 70, 70, 114, 50, 221, 231, 8, 132, 167, 115, 239, 44, 245, 41, 226, 192, 192}) - return err -} - -func TestNegativeBlockChainReorg(t *testing.T) { - // We are resetting the database between creation so we need to cache our information - testManager2 := NewTestManager() - testManager2.CreateChain2() - tm2Blocks := testManager2.Blocks - - testManager := NewTestManager() - testManager.CreateChain1() - oldState := testManager.BlockChain().CurrentBlock.State() - - if testManager.BlockChain().FindCanonicalChain(tm2Blocks, testManager.BlockChain().GenesisBlock().Hash()) != true { - t.Error("I expected TestManager to have the longest chain, but it was TestManager2 instead.") - } - if testManager.BlockChain().CurrentBlock.State() != oldState { - t.Error("I expected the top state to be the same as it was as before the reorg") - } - -} - -func TestPositiveBlockChainReorg(t *testing.T) { - testManager := NewTestManager() - testManager.CreateChain1() - tm1Blocks := testManager.Blocks - - testManager2 := NewTestManager() - testManager2.CreateChain2() - oldState := testManager2.BlockChain().CurrentBlock.State() - - if testManager2.BlockChain().FindCanonicalChain(tm1Blocks, testManager.BlockChain().GenesisBlock().Hash()) == true { - t.Error("I expected TestManager to have the longest chain, but it was TestManager2 instead.") - } - if testManager2.BlockChain().CurrentBlock.State() == oldState { - t.Error("I expected the top state to have been modified but it was not") - } -} diff --git a/ethchain/helper_test.go b/ethchain/helper_test.go new file mode 100644 index 000000000..75d7771fc --- /dev/null +++ b/ethchain/helper_test.go @@ -0,0 +1,88 @@ +package ethchain + +import ( + "container/list" + "fmt" + + "github.com/ethereum/eth-go/ethcrypto" + "github.com/ethereum/eth-go/ethdb" + "github.com/ethereum/eth-go/ethreact" + "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethwire" +) + +// Implement our EthTest Manager +type TestManager struct { + stateManager *StateManager + reactor *ethreact.ReactorEngine + + txPool *TxPool + blockChain *BlockChain + Blocks []*Block +} + +func (s *TestManager) IsListening() bool { + return false +} + +func (s *TestManager) IsMining() bool { + return false +} + +func (s *TestManager) PeerCount() int { + return 0 +} + +func (s *TestManager) Peers() *list.List { + return list.New() +} + +func (s *TestManager) BlockChain() *BlockChain { + return s.blockChain +} + +func (tm *TestManager) TxPool() *TxPool { + return tm.txPool +} + +func (tm *TestManager) StateManager() *StateManager { + return tm.stateManager +} + +func (tm *TestManager) Reactor() *ethreact.ReactorEngine { + return tm.reactor +} +func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) { + fmt.Println("Broadcast not implemented") +} + +func (tm *TestManager) ClientIdentity() ethwire.ClientIdentity { + return nil +} +func (tm *TestManager) KeyManager() *ethcrypto.KeyManager { + return nil +} + +func (tm *TestManager) Db() ethutil.Database { return nil } +func NewTestManager() *TestManager { + ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH") + + db, err := ethdb.NewMemDatabase() + if err != nil { + fmt.Println("Could not create mem-db, failing") + return nil + } + ethutil.Config.Db = db + + testManager := &TestManager{} + testManager.reactor = ethreact.New() + + testManager.txPool = NewTxPool(testManager) + testManager.blockChain = NewBlockChain(testManager) + testManager.stateManager = NewStateManager(testManager) + + // Start the tx pool + testManager.txPool.Start() + + return testManager +} diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index cd2d57af9..f38666572 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -15,14 +15,11 @@ import ( "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/eventer" ) var statelogger = ethlog.NewLogger("STATE") -type BlockProcessor interface { - ProcessBlock(block *Block) -} - type Peer interface { Inbound() bool LastSend() time.Time @@ -48,6 +45,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database + Eventer() *eventer.EventMachine } type StateManager struct { @@ -60,7 +58,7 @@ type StateManager struct { // Proof of work used for validating Pow PoW // The ethereum manager interface - Ethereum EthManager + eth EthManager // The managed states // Transiently state. The trans state isn't ever saved, validated and // it could be used for setting account nonces without effecting @@ -74,14 +72,18 @@ type StateManager struct { // This does not have to be a valid block and will be set during // 'Process' & canonical validation. lastAttemptedBlock *Block + + // Quit chan + quit chan bool } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - Ethereum: ethereum, - bc: ethereum.BlockChain(), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), + quit: make(chan bool), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -89,8 +91,41 @@ func NewStateManager(ethereum EthManager) *StateManager { return sm } +func (self *StateManager) Start() { + statelogger.Debugln("Starting state manager") + + go self.updateThread() +} + +func (self *StateManager) Stop() { + statelogger.Debugln("Stopping state manager") + + close(self.quit) +} + +func (self *StateManager) updateThread() { + blockChan := self.eth.Eventer().Register("block") + +out: + for { + select { + case event := <-blockChan: + block := event.Data.(*Block) + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + } + + case <-self.quit: + break out + } + } +} + func (sm *StateManager) CurrentState() *ethstate.State { - return sm.Ethereum.BlockChain().CurrentBlock.State() + return sm.eth.BlockChain().CurrentBlock.State() } func (sm *StateManager) TransState() *ethstate.State { @@ -102,7 +137,7 @@ func (sm *StateManager) MiningState() *ethstate.State { } func (sm *StateManager) NewMiningState() *ethstate.State { - sm.miningState = sm.Ethereum.BlockChain().CurrentBlock.State().Copy() + sm.miningState = sm.eth.BlockChain().CurrentBlock.State().Copy() return sm.miningState } @@ -164,7 +199,7 @@ done: } // Notify all subscribers - self.Ethereum.Reactor().Post("newTx:post", tx) + self.eth.Reactor().Post("newTx:post", tx) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -251,16 +286,16 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { filter := sm.createBloomFilter(state) // Persist the data fk := append([]byte("bloom"), block.Hash()...) - sm.Ethereum.Db().Put(fk, filter.Bin()) + sm.eth.Db().Put(fk, filter.Bin()) statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.Ethereum.Reactor().Post("newBlock", block) + sm.eth.Reactor().Post("newBlock", block) state.Manifest().Reset() } - sm.Ethereum.TxPool().RemoveInvalid(state) + sm.eth.TxPool().RemoveInvalid(state) } else { statelogger.Errorln("total diff failed") } @@ -385,10 +420,6 @@ func (sm *StateManager) AccumelateRewards(state *ethstate.State, block, parent * return nil } -func (sm *StateManager) Stop() { - sm.bc.Stop() -} - // Manifest will handle both creating notifications and generating bloom bin data func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf := NewBloomFilter(nil) @@ -398,7 +429,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.Ethereum.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.Reactor().Post("messages", state.Manifest().Messages) return bloomf } diff --git a/ethereum.go b/ethereum.go index d04b08960..5fb3f2909 100644 --- a/ethereum.go +++ b/ethereum.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/eventer" ) const ( @@ -58,7 +59,9 @@ type Ethereum struct { blockChain *ethchain.BlockChain // The block pool blockPool *BlockPool - // Peers (NYI) + // Eventer + eventer *eventer.EventMachine + // Peers peers *list.List // Nonce Nonce uint64 @@ -123,6 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager filters: make(map[int]*ethchain.Filter), } ethereum.reactor = ethreact.New() + ethereum.eventer = eventer.New() ethereum.blockPool = NewBlockPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum) @@ -161,6 +165,9 @@ func (s *Ethereum) TxPool() *ethchain.TxPool { func (s *Ethereum) BlockPool() *BlockPool { return s.blockPool } +func (s *Ethereum) Eventer() *eventer.EventMachine { + return s.eventer +} func (self *Ethereum) Db() ethutil.Database { return self.db } @@ -387,6 +394,8 @@ func (s *Ethereum) ReapDeadPeerHandler() { func (s *Ethereum) Start(seed bool) { s.reactor.Start() s.blockPool.Start() + s.stateManager.Start() + // Bind to addr and port ln, err := net.Listen("tcp", ":"+s.Port) if err != nil { diff --git a/eventer/eventer.go b/eventer/eventer.go new file mode 100644 index 000000000..fb2f299a3 --- /dev/null +++ b/eventer/eventer.go @@ -0,0 +1,79 @@ +package eventer + +// Basic receiver interface. +type Receiver interface { + Send(Event) +} + +// Receiver as channel +type Channel chan Event + +func (self Channel) Send(ev Event) { + self <- ev +} + +// Receiver as function +type Function func(ev Event) + +func (self Function) Send(ev Event) { + self(ev) +} + +type Event struct { + Type string + Data interface{} +} + +type Channels map[string][]Receiver + +type EventMachine struct { + channels Channels +} + +func New() *EventMachine { + return &EventMachine{ + channels: make(Channels), + } +} + +func (self *EventMachine) add(typ string, r Receiver) { + self.channels[typ] = append(self.channels[typ], r) +} + +// Generalised methods for the known receiver types +// * Channel +// * Function +func (self *EventMachine) On(typ string, r interface{}) { + if eventFunc, ok := r.(func(Event)); ok { + self.RegisterFunc(typ, eventFunc) + } else if eventChan, ok := r.(Channel); ok { + self.RegisterChannel(typ, eventChan) + } else { + panic("Invalid type for EventMachine::On") + } +} + +func (self *EventMachine) RegisterChannel(typ string, c Channel) { + self.add(typ, c) +} + +func (self *EventMachine) RegisterFunc(typ string, f Function) { + self.add(typ, f) +} + +func (self *EventMachine) Register(typ string) Channel { + c := make(Channel, 1) + self.add(typ, c) + + return c +} + +func (self *EventMachine) Post(typ string, data interface{}) { + if self.channels[typ] != nil { + ev := Event{typ, data} + for _, receiver := range self.channels[typ] { + // Blocking is OK. These are internals and need to be handled + receiver.Send(ev) + } + } +} diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go new file mode 100644 index 000000000..b35267af6 --- /dev/null +++ b/eventer/eventer_test.go @@ -0,0 +1,66 @@ +package eventer + +import "testing" + +func TestChannel(t *testing.T) { + eventer := New(nil) + + c := make(Channel, 1) + eventer.RegisterChannel("test", c) + eventer.Post("test", "hello world") + + res := <-c + + if res.Data.(string) != "hello world" { + t.Error("Expected event with data 'hello world'. Got", res.Data) + } +} + +func TestFunction(t *testing.T) { + eventer := New(nil) + + var data string + eventer.RegisterFunc("test", func(ev Event) { + data = ev.Data.(string) + }) + eventer.Post("test", "hello world") + + if data != "hello world" { + t.Error("Expected event with data 'hello world'. Got", data) + } +} + +func TestRegister(t *testing.T) { + eventer := New(nil) + + c := eventer.Register("test") + eventer.Post("test", "hello world") + + res := <-c + + if res.Data.(string) != "hello world" { + t.Error("Expected event with data 'hello world'. Got", res.Data) + } +} + +func TestOn(t *testing.T) { + eventer := New(nil) + + c := make(Channel, 1) + eventer.On("test", c) + + var data string + eventer.On("test", func(ev Event) { + data = ev.Data.(string) + }) + eventer.Post("test", "hello world") + + res := <-c + if res.Data.(string) != "hello world" { + t.Error("Expected channel event with data 'hello world'. Got", res.Data) + } + + if data != "hello world" { + t.Error("Expected function event with data 'hello world'. Got", data) + } +} @@ -554,7 +554,9 @@ func (self *Peer) FetchHashes() { blockPool.td = self.td if !blockPool.HasLatestHash() { - self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(256)})) + const amount = 256 + peerlogger.Debugf("Fetching hashes (%d)\n", amount) + self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)})) } } } diff --git a/types/ethereum.go b/types/ethereum.go new file mode 100644 index 000000000..ab1254f4c --- /dev/null +++ b/types/ethereum.go @@ -0,0 +1 @@ +package types |