diff options
author | zelig <viktor.tron@gmail.com> | 2014-12-10 07:55:50 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-12-15 04:27:06 +0800 |
commit | d957dd2c9f5adefdedf702223de39634c1f4a32b (patch) | |
tree | ead5ecce2824785794cc153f9e5c5697835747be /eth/protocol.go | |
parent | eb5cb04aa991d59a6597479fbddf5dec51093ed6 (diff) | |
download | go-tangerine-d957dd2c9f5adefdedf702223de39634c1f4a32b.tar.gz go-tangerine-d957dd2c9f5adefdedf702223de39634c1f4a32b.tar.zst go-tangerine-d957dd2c9f5adefdedf702223de39634c1f4a32b.zip |
eth protocol changes
- changed backend interface
- using callbacks for blockPool
- use rlp stream for lazy decoding
- use peer as logger
- add id (peer pubkey) to ethProtocol fields
- add testPeer to protocol test (temporary)
Diffstat (limited to 'eth/protocol.go')
-rw-r--r-- | eth/protocol.go | 136 |
1 files changed, 79 insertions, 57 deletions
diff --git a/eth/protocol.go b/eth/protocol.go index 992fc7550..380bcc8d2 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,18 +7,16 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethutil" - ethlogger "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" ) -var logger = ethlogger.NewLogger("SERV") - // ethProtocol represents the ethereum wire protocol // instance is running on each peer type ethProtocol struct { eth backend - td *big.Int peer *p2p.Peer + id string rw p2p.MsgReadWriter } @@ -26,28 +24,21 @@ type ethProtocol struct { // used as an argument to EthProtocol type backend interface { GetTransactions() (txs []*types.Transaction) - AddTransactions(txs []*types.Transaction) + AddTransactions([]*types.Transaction) GetBlockHashes(hash []byte, amount uint32) (hashes [][]byte) - AddHash(hash []byte, peer *p2p.Peer) (more bool) + AddBlockHashes(next func() ([]byte, bool), peerId string) GetBlock(hash []byte) (block *types.Block) - AddBlock(td *big.Int, block *types.Block, peer *p2p.Peer) (fetchHashes bool, err error) - AddPeer(td *big.Int, currentBlock []byte, peer *p2p.Peer) (fetchHashes bool) + AddBlock(block *types.Block, peerId string) (err error) + AddPeer(td *big.Int, currentBlock []byte, peerId string, requestHashes func([]byte) error, requestBlocks func([][]byte) error, invalidBlock func(error)) (best bool) + RemovePeer(peerId string) Status() (td *big.Int, currentBlock []byte, genesisBlock []byte) } const ( - ProtocolVersion = 43 - // 0x00 // PoC-1 - // 0x01 // PoC-2 - // 0x07 // PoC-3 - // 0x09 // PoC-4 - // 0x17 // PoC-5 - // 0x1c // PoC-6 + ProtocolVersion = 43 NetworkId = 0 ProtocolLength = uint64(8) ProtocolMaxMsgSize = 10 * 1024 * 1024 - - blockHashesBatchSize = 256 ) // eth protocol message codes @@ -74,7 +65,8 @@ type getBlockHashesMsgData struct { } // main entrypoint, wrappers starting a server running the eth protocol -// use this constructor to attach the protocol (class) to server caps +// use this constructor to attach the protocol ("class") to server caps +// the Dev p2p layer then runs the protocol instance on each peer func EthProtocol(eth backend) *p2p.Protocol { return &p2p.Protocol{ Name: "eth", @@ -86,11 +78,14 @@ func EthProtocol(eth backend) *p2p.Protocol { } } +// the main loop that handles incoming messages +// note RemovePeer in the post-disconnect hook func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) { self := ðProtocol{ eth: eth, rw: rw, peer: peer, + id: (string)(peer.Identity().Pubkey()), } err = self.handleStatus() if err == nil { @@ -98,6 +93,7 @@ func runEthProtocol(eth backend, peer *p2p.Peer, rw p2p.MsgReadWriter) (err erro for { err = self.handle() if err != nil { + self.eth.RemovePeer(self.id) break } } @@ -132,6 +128,7 @@ func (self *ethProtocol) handle() error { return self.rw.EncodeMsg(TxMsg, txsInterface...) case TxMsg: + // TODO: rework using lazy RLP stream var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return ProtocolError(ErrDecode, "%v", err) @@ -148,29 +145,26 @@ func (self *ethProtocol) handle() error { case BlockHashesMsg: // TODO: redo using lazy decode , this way very inefficient on known chains - // s := rlp.NewListStream(msg.Payload, uint64(msg.Size)) - var blockHashes [][]byte - if err := msg.Decode(&blockHashes); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } - fetchMore := true - for _, hash := range blockHashes { - fetchMore = self.eth.AddHash(hash, self.peer) - if !fetchMore { - break + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + var err error + iter := func() (hash []byte, ok bool) { + hash, err = msgStream.Bytes() + if err == nil { + ok = true } + return } - if fetchMore { - return self.FetchHashes(blockHashes[len(blockHashes)-1]) + self.eth.AddBlockHashes(iter, self.id) + if err != nil && err != rlp.EOL { + return ProtocolError(ErrDecode, "%v", err) } case GetBlocksMsg: - // Limit to max 300 blocks var blockHashes [][]byte if err := msg.Decode(&blockHashes); err != nil { return ProtocolError(ErrDecode, "%v", err) } - max := int(math.Min(float64(len(blockHashes)), 300.0)) + max := int(math.Min(float64(len(blockHashes)), blockHashesBatchSize)) var blocks []interface{} for i, hash := range blockHashes { if i >= max { @@ -184,20 +178,19 @@ func (self *ethProtocol) handle() error { return self.rw.EncodeMsg(BlocksMsg, blocks...) case BlocksMsg: - var blocks []*types.Block - if err := msg.Decode(&blocks); err != nil { - return ProtocolError(ErrDecode, "%v", err) - } - for _, block := range blocks { - fetchHashes, err := self.eth.AddBlock(nil, block, self.peer) - if err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - if err := self.FetchHashes(block.Hash()); err != nil { - return err + msgStream := rlp.NewListStream(msg.Payload, uint64(msg.Size)) + for { + var block *types.Block + if err := msgStream.Decode(&block); err != nil { + if err == rlp.EOL { + break + } else { + return ProtocolError(ErrDecode, "%v", err) } } + if err := self.eth.AddBlock(block, self.id); err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } } case NewBlockMsg: @@ -205,13 +198,24 @@ func (self *ethProtocol) handle() error { if err := msg.Decode(&request); err != nil { return ProtocolError(ErrDecode, "%v", err) } - var fetchHashes bool - // this should reset td and offer blockpool as candidate new peer? - if fetchHashes, err = self.eth.AddBlock(request.TD, request.Block, self.peer); err != nil { - return ProtocolError(ErrInvalidBlock, "%v", err) - } - if fetchHashes { - return self.FetchHashes(request.Block.Hash()) + hash := request.Block.Hash() + // to simplify backend interface adding a new block + // uses AddPeer followed by AddHashes, AddBlock only if peer is the best peer + // (or selected as new best peer) + if self.eth.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) { + called := true + iter := func() (hash []byte, ok bool) { + if called { + called = false + return hash, true + } else { + return + } + } + self.eth.AddBlockHashes(iter, self.id) + if err := self.eth.AddBlock(request.Block, self.id); err != nil { + return ProtocolError(ErrInvalidBlock, "%v", err) + } } default: @@ -279,16 +283,34 @@ func (self *ethProtocol) handleStatus() error { return ProtocolError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, ProtocolVersion) } - logger.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) + self.peer.Infof("Peer is [eth] capable (%d/%d). TD = %v ~ %x", status.ProtocolVersion, status.NetworkId, status.CurrentBlock) - if self.eth.AddPeer(status.TD, status.CurrentBlock, self.peer) { - return self.FetchHashes(status.CurrentBlock) - } + self.eth.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.invalidBlock) return nil } -func (self *ethProtocol) FetchHashes(from []byte) error { - logger.Debugf("Fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) +func (self *ethProtocol) requestBlockHashes(from []byte) error { + self.peer.Debugf("fetching hashes (%d) %x...\n", blockHashesBatchSize, from[0:4]) return self.rw.EncodeMsg(GetBlockHashesMsg, from, blockHashesBatchSize) } + +func (self *ethProtocol) requestBlocks(hashes [][]byte) error { + self.peer.Debugf("fetching %v blocks", len(hashes)) + return self.rw.EncodeMsg(GetBlocksMsg, ethutil.ByteSliceToInterface(hashes)) +} + +func (self *ethProtocol) invalidBlock(err error) { + ProtocolError(ErrInvalidBlock, "%v", err) + self.peer.Disconnect(p2p.DiscSubprotocolError) +} + +func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *protocolError) { + err = ProtocolError(code, format, params...) + if err.Fatal() { + self.peer.Errorln(err) + } else { + self.peer.Debugln(err) + } + return +} |