aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go336
1 files changed, 261 insertions, 75 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 4f3d1f34c..f22afecb7 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -36,10 +36,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-// This is the target maximum size of returned blocks for the
-// getBlocks message. The reply message may exceed it
-// if a single block is larger than the limit.
-const maxBlockRespSize = 2 * 1024 * 1024
+// This is the target maximum size of returned blocks, headers or node data.
+const softResponseLimit = 2 * 1024 * 1024
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
@@ -59,12 +57,13 @@ func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(has
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
type ProtocolManager struct {
- protVer, netId int
- txpool txPool
- chainman *core.ChainManager
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
+ txpool txPool
+ chainman *core.ChainManager
+ chaindb common.Database
+
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
SubProtocols []p2p.Protocol
@@ -85,17 +84,17 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
+func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager, chaindb common.Database) *ProtocolManager {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
eventMux: mux,
txpool: txpool,
chainman: chainman,
+ chaindb: chaindb,
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
- netId: networkId,
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions))
@@ -176,7 +175,7 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- return newPeer(pv, nv, p, rw)
+ return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@@ -190,6 +189,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
+ if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
+ rw.Init(p.version)
+ }
// Register the peer locally
glog.V(logger.Detail).Infof("%v: adding peer", p)
if err := pm.peers.Register(p); err != nil {
@@ -199,7 +201,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
- if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil {
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
+ p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
+ p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -230,12 +234,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
defer msg.Discard()
// Handle the message depending on its contents
- switch msg.Code {
- case StatusMsg:
+ switch {
+ case msg.Code == StatusMsg:
// Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
- case GetBlockHashesMsg:
+ case p.version < eth62 && msg.Code == GetBlockHashesMsg:
// Retrieve the number of hashes to return and from which origin hash
var request getBlockHashesData
if err := msg.Decode(&request); err != nil {
@@ -251,7 +255,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHashes(hashes)
- case GetBlockHashesFromNumberMsg:
+ case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
// Retrieve and decode the number of hashes to return and from which origin number
var request getBlockHashesFromNumberData
if err := msg.Decode(&request); err != nil {
@@ -278,24 +282,19 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockHashes(hashes)
- case BlockHashesMsg:
+ case p.version < eth62 && msg.Code == BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
- msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
- reqHashInPacketsMeter.Mark(1)
-
var hashes []common.Hash
- if err := msgStream.Decode(&hashes); err != nil {
+ if err := msg.Decode(&hashes); err != nil {
break
}
- reqHashInTrafficMeter.Mark(int64(32 * len(hashes)))
-
// Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes(p.id, hashes)
+ err := pm.downloader.DeliverHashes61(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
- case GetBlocksMsg:
+ case p.version < eth62 && msg.Code == GetBlocksMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
@@ -305,94 +304,279 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var (
hash common.Hash
bytes common.StorageSize
- hashes []common.Hash
blocks []*types.Block
)
- for {
+ for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
+ //Retrieve the hash of the next block
err := msgStream.Decode(&hash)
if err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- hashes = append(hashes, hash)
-
// Retrieve the requested block, stopping if enough was found
if block := pm.chainman.GetBlock(hash); block != nil {
blocks = append(blocks, block)
bytes += block.Size()
- if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize {
- break
- }
}
}
- if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 {
- list := "["
- for _, hash := range hashes {
- list += fmt.Sprintf("%x, ", hash[:4])
- }
- list = list[:len(list)-2] + "]"
-
- glog.Infof("%v: no blocks found for requested hashes %s", p, list)
- }
return p.SendBlocks(blocks)
- case BlocksMsg:
+ case p.version < eth62 && msg.Code == BlocksMsg:
// Decode the arrived block message
- msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
- reqBlockInPacketsMeter.Mark(1)
-
var blocks []*types.Block
- if err := msgStream.Decode(&blocks); err != nil {
+ if err := msg.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
// Update the receive timestamp of each block
for _, block := range blocks {
- reqBlockInTrafficMeter.Mark(block.Size().Int64())
block.ReceivedAt = msg.ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
- if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks(p.id, blocks)
+ if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
+ pm.downloader.DeliverBlocks61(p.id, blocks)
}
- case NewBlockHashesMsg:
- // Retrieve and deseralize the remote new block hashes notification
+ // Block header query, collect the requested headers and reply
+ case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
+ // Decode the complex header query
+ var query getBlockHeadersData
+ if err := msg.Decode(&query); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
+ // Retrieve the next block satisfying the query
+ var origin *types.Block
+ if query.Origin.Hash != (common.Hash{}) {
+ origin = pm.chainman.GetBlock(query.Origin.Hash)
+ } else {
+ origin = pm.chainman.GetBlockByNumber(query.Origin.Number)
+ }
+ if origin == nil {
+ break
+ }
+ headers = append(headers, origin.Header())
+ bytes += origin.Size()
+
+ // Advance to the next block of the query
+ switch {
+ case query.Origin.Hash != (common.Hash{}) && query.Reverse:
+ // Hash based traversal towards the genesis block
+ for i := 0; i < int(query.Skip)+1; i++ {
+ if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil {
+ query.Origin.Hash = block.ParentHash()
+ } else {
+ unknown = true
+ break
+ }
+ }
+ case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil {
+ if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
+ query.Origin.Hash = block.Hash()
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= (query.Skip + 1)
+ } else {
+ unknown = true
+ }
+
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += (query.Skip + 1)
+ }
+ }
+ return p.SendBlockHeaders(headers)
+
+ case p.version >= eth62 && msg.Code == BlockHeadersMsg:
+ // A batch of headers arrived to one of our previous requests
+ var headers []*types.Header
+ if err := msg.Decode(&headers); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Filter out any explicitly requested headers, deliver the rest to the downloader
+ filter := len(headers) == 1
+ if filter {
+ headers = pm.fetcher.FilterHeaders(headers, time.Now())
+ }
+ if len(headers) > 0 || !filter {
+ err := pm.downloader.DeliverHeaders(p.id, headers)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
+ case p.version >= eth62 && msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ trasactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ trasactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
+ err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
+ case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
+ // Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes common.StorageSize
+ bodies []*blockBody
+ )
+ for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
+ //Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block, stopping if enough was found
+ if block := pm.chainman.GetBlock(hash); block != nil {
+ bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
+ bytes += block.Size()
+ }
+ }
+ return p.SendBlockBodies(bodies)
- var hashes []common.Hash
- if err := msgStream.Decode(&hashes); err != nil {
- break
+ case p.version >= eth63 && msg.Code == GetNodeDataMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
}
- propHashInPacketsMeter.Mark(1)
- propHashInTrafficMeter.Mark(int64(32 * len(hashes)))
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ data [][]byte
+ )
+ for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
+ // Retrieve the hash of the next state entry
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
+ data = append(data, entry)
+ bytes += len(entry)
+ }
+ }
+ return p.SendNodeData(data)
+ case p.version >= eth63 && msg.Code == GetReceiptsMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ receipts []*types.Receipt
+ )
+ for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch {
+ // Retrieve the hash of the next transaction receipt
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested receipt, stopping if enough was found
+ if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil {
+ receipts = append(receipts, receipt)
+ bytes += len(receipt.RlpEncode())
+ }
+ }
+ return p.SendReceipts(receipts)
+
+ case msg.Code == NewBlockHashesMsg:
+ // Retrieve and deseralize the remote new block hashes notification
+ type announce struct {
+ Hash common.Hash
+ Number uint64
+ }
+ var announces = []announce{}
+
+ if p.version < eth62 {
+ // We're running the old protocol, make block number unknown (0)
+ var hashes []common.Hash
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ for _, hash := range hashes {
+ announces = append(announces, announce{hash, 0})
+ }
+ } else {
+ // Otherwise extract both block hash and number
+ var request newBlockHashesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ for _, block := range request {
+ announces = append(announces, announce{block.Hash, block.Number})
+ }
+ }
// Mark the hashes as present at the remote node
- for _, hash := range hashes {
- p.MarkBlock(hash)
- p.SetHead(hash)
+ for _, block := range announces {
+ p.MarkBlock(block.Hash)
+ p.SetHead(block.Hash)
}
// Schedule all the unknown hashes for retrieval
- unknown := make([]common.Hash, 0, len(hashes))
- for _, hash := range hashes {
- if !pm.chainman.HasBlock(hash) {
- unknown = append(unknown, hash)
+ unknown := make([]announce, 0, len(announces))
+ for _, block := range announces {
+ if !pm.chainman.HasBlock(block.Hash) {
+ unknown = append(unknown, block)
}
}
- for _, hash := range unknown {
- pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
+ for _, block := range unknown {
+ if p.version < eth62 {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
+ } else {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
+ }
}
- case NewBlockMsg:
+ case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
- propBlockInPacketsMeter.Mark(1)
- propBlockInTrafficMeter.Mark(request.Block.Size().Int64())
-
if err := request.Block.ValidateFields(); err != nil {
return errResp(ErrDecode, "block validation %v: %v", msg, err)
}
@@ -421,13 +605,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
- case TxMsg:
+ case msg.Code == TxMsg:
// Transactions arrived, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
@@ -436,7 +619,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkTransaction(tx.Hash())
// Log it's arrival for later analysis
- propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
RemoteId: p.ID().String(),
@@ -476,7 +658,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
- peer.SendNewBlockHashes([]common.Hash{hash})
+ if peer.version < eth62 {
+ peer.SendNewBlockHashes61([]common.Hash{hash})
+ } else {
+ peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
+ }
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}