diff options
author | zelig <viktor.tron@gmail.com> | 2014-12-15 02:04:50 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-12-15 04:40:08 +0800 |
commit | 4dfce4624dcc89302ec0b1f22cf853a8382fb7c7 (patch) | |
tree | eb7d2136dc810edf043c02ee6ad516798fc834ce | |
parent | 5e4d77b2b8020a106a13762c49ef40acac619a9c (diff) | |
download | dexon-4dfce4624dcc89302ec0b1f22cf853a8382fb7c7.tar.gz dexon-4dfce4624dcc89302ec0b1f22cf853a8382fb7c7.tar.zst dexon-4dfce4624dcc89302ec0b1f22cf853a8382fb7c7.zip |
protocol
- new interface explicit backend components txPool, chainManager, blockPool
- added protoErrorDisconnect for blockpool callback (FIXME: handling peer disconnects)
-rw-r--r-- | eth/protocol.go | 90 |
1 files changed, 83 insertions, 7 deletions
diff --git a/eth/protocol.go b/eth/protocol.go index fbc4610ec..37e642fd0 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -20,7 +20,7 @@ type ethProtocol struct { peer *p2p.Peer id string rw p2p.MsgReadWriter - } +} // backend is the interface the ethereum protocol backend should implement // used as an argument to EthProtocol @@ -69,6 +69,7 @@ type newBlockMsgData struct { type getBlockHashesMsgData struct { Hash []byte <<<<<<< HEAD +<<<<<<< HEAD Amount uint64 } @@ -79,23 +80,35 @@ func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) return p2p.Protocol{ ======= Amount uint32 +======= + Amount uint64 +>>>>>>> protocol } // main entrypoint, wrappers starting a server running the eth protocol // use this constructor to attach the protocol ("class") to server caps // the Dev p2p layer then runs the protocol instance on each peer +<<<<<<< HEAD func EthProtocol(eth backend) *p2p.Protocol { return &p2p.Protocol{ >>>>>>> initial commit for eth-p2p integration +======= +func EthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol { + return p2p.Protocol{ +>>>>>>> protocol Name: "eth", Version: ProtocolVersion, Length: ProtocolLength, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { <<<<<<< HEAD +<<<<<<< HEAD return runEthProtocol(txPool, chainManager, blockPool, peer, rw) ======= return runEthProtocol(eth, peer, rw) >>>>>>> initial commit for eth-p2p integration +======= + return runEthProtocol(txPool, chainManager, blockPool, peer, rw) +>>>>>>> protocol }, } } @@ -105,6 +118,7 @@ func EthProtocol(eth backend) *p2p.Protocol { // the main loop that handles incoming messages // note RemovePeer in the post-disconnect hook func runEthProtocol(txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { +<<<<<<< HEAD self := ðProtocol{ txPool: txPool, chainManager: chainManager, @@ -127,6 +141,15 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro ======= id: (string)(peer.Identity().Pubkey()), >>>>>>> eth protocol changes +======= + self := ðProtocol{ + txPool: txPool, + chainManager: chainManager, + blockPool: blockPool, + rw: rw, + peer: peer, + id: (string)(peer.Identity().Pubkey()), +>>>>>>> protocol } err = self.handleStatus() if err == nil { @@ -136,12 +159,16 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro if err != nil { <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD self.blockPool.RemovePeer(self.id) ======= >>>>>>> initial commit for eth-p2p integration ======= self.eth.RemovePeer(self.id) >>>>>>> eth protocol changes +======= + self.blockPool.RemovePeer(self.id) +>>>>>>> protocol break } } @@ -167,6 +194,7 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrExtraStatusMsg, "") <<<<<<< HEAD +<<<<<<< HEAD case TxMsg: // TODO: rework using lazy RLP stream ======= @@ -179,6 +207,8 @@ func (self *ethProtocol) handle() error { } return self.rw.EncodeMsg(TxMsg, txsInterface...) +======= +>>>>>>> protocol case TxMsg: <<<<<<< HEAD >>>>>>> initial commit for eth-p2p integration @@ -190,10 +220,14 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrDecode, "%v", err) } <<<<<<< HEAD +<<<<<<< HEAD self.txPool.AddTransactions(txs) ======= self.eth.AddTransactions(txs) >>>>>>> initial commit for eth-p2p integration +======= + self.txPool.AddTransactions(txs) +>>>>>>> protocol case GetBlockHashesMsg: var request getBlockHashesMsgData @@ -201,10 +235,14 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrDecode, "%v", err) } <<<<<<< HEAD +<<<<<<< HEAD hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) ======= hashes := self.eth.GetBlockHashes(request.Hash, request.Amount) >>>>>>> initial commit for eth-p2p integration +======= + hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount) +>>>>>>> protocol return self.rw.EncodeMsg(BlockHashesMsg, ethutil.ByteSliceToInterface(hashes)...) case BlockHashesMsg: @@ -245,7 +283,7 @@ func (self *ethProtocol) handle() error { } return } - self.eth.AddBlockHashes(iter, self.id) + self.blockPool.AddBlockHashes(iter, self.id) if err != nil && err != rlp.EOL { return ProtocolError(ErrDecode, "%v", err) } @@ -275,10 +313,14 @@ func (self *ethProtocol) handle() error { break } <<<<<<< HEAD +<<<<<<< HEAD block := self.chainManager.GetBlock(hash) ======= block := self.eth.GetBlock(hash) >>>>>>> initial commit for eth-p2p integration +======= + block := self.chainManager.GetBlock(hash) +>>>>>>> protocol if block != nil { blocks = append(blocks, block.Value().Raw()) } @@ -321,10 +363,14 @@ func (self *ethProtocol) handle() error { ======= } } +<<<<<<< HEAD if err := self.eth.AddBlock(block, self.id); err != nil { return ProtocolError(ErrInvalidBlock, "%v", err) } >>>>>>> eth protocol changes +======= + self.blockPool.AddBlock(block, self.id) +>>>>>>> protocol } case NewBlockMsg: @@ -341,10 +387,14 @@ func (self *ethProtocol) handle() error { // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer // (or selected as new best peer) <<<<<<< HEAD +<<<<<<< HEAD if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { ======= if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { >>>>>>> eth protocol changes +======= + if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) { +>>>>>>> protocol called := true iter := func() (hash []byte, ok bool) { if called { @@ -355,6 +405,7 @@ func (self *ethProtocol) handle() error { } } <<<<<<< HEAD +<<<<<<< HEAD self.blockPool.AddBlockHashes(iter, self.id) self.blockPool.AddBlock(request.Block, self.id) ======= @@ -372,6 +423,10 @@ func (self *ethProtocol) handle() error { return ProtocolError(ErrInvalidBlock, "%v", err) } >>>>>>> eth protocol changes +======= + self.blockPool.AddBlockHashes(iter, self.id) + self.blockPool.AddBlock(request.Block, self.id) +>>>>>>> protocol } default: @@ -390,10 +445,14 @@ type statusMsgData struct { func (self *ethProtocol) statusMsg() p2p.Msg { <<<<<<< HEAD +<<<<<<< HEAD td, currentBlock, genesisBlock := self.chainManager.Status() ======= td, currentBlock, genesisBlock := self.eth.Status() >>>>>>> initial commit for eth-p2p integration +======= + td, currentBlock, genesisBlock := self.chainManager.Status() +>>>>>>> protocol return p2p.NewMsg(StatusMsg, uint32(ProtocolVersion), @@ -430,10 +489,14 @@ func (self *ethProtocol) handleStatus() error { } <<<<<<< HEAD +<<<<<<< HEAD _, _, genesisBlock := self.chainManager.Status() ======= _, _, genesisBlock := self.eth.Status() >>>>>>> initial commit for eth-p2p integration +======= + _, _, genesisBlock := self.chainManager.Status() +>>>>>>> protocol if bytes.Compare(status.GenesisBlock, genesisBlock) != 0 { return ProtocolError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock) @@ -462,8 +525,12 @@ func (self *ethProtocol) handleStatus() error { ======= self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) +<<<<<<< HEAD self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) >>>>>>> eth protocol changes +======= + self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) +>>>>>>> protocol return nil } @@ -517,11 +584,6 @@ func (self *ethProtocol) requestBlocks(hashes [][]byte) error { return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) } -func (self *ethProtocol) invalidBlock(err error) { - ProtocolError(ErrInvalidBlock, "%v", err) - self.peer.Disconnect(p2p.DiscSubprotocolError) -} - func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { err = ProtocolError(code, format, params...) if err.Fatal() { @@ -531,4 +593,18 @@ func (self *ethProtocol) protoError(code int, format string, params ...interface } return } +<<<<<<< HEAD >>>>>>> eth protocol changes +======= + +func (self *ethProtocol) protoErrorDisconnect(code int, format string, params ...interface{}) { + err := ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + // disconnect + } else { + self.peer.Debugln(err) + } + +} +>>>>>>> protocol |