diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 336 |
1 files changed, 261 insertions, 75 deletions
diff --git a/eth/handler.go b/eth/handler.go index 4f3d1f34c..f22afecb7 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,10 +36,8 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -// This is the target maximum size of returned blocks for the -// getBlocks message. The reply message may exceed it -// if a single block is larger than the limit. -const maxBlockRespSize = 2 * 1024 * 1024 +// This is the target maximum size of returned blocks, headers or node data. +const softResponseLimit = 2 * 1024 * 1024 func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) @@ -59,12 +57,13 @@ func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(has func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) } type ProtocolManager struct { - protVer, netId int - txpool txPool - chainman *core.ChainManager - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet + txpool txPool + chainman *core.ChainManager + chaindb common.Database + + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol @@ -85,17 +84,17 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { +func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager, chaindb common.Database) *ProtocolManager { // Create the protocol manager with the base fields manager := &ProtocolManager{ eventMux: mux, txpool: txpool, chainman: chainman, + chaindb: chaindb, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), - netId: networkId, } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions)) @@ -176,7 +175,7 @@ func (pm *ProtocolManager) Stop() { } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, nv, p, rw) + return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) } // handle is the callback invoked to manage the life cycle of an eth peer. When @@ -190,6 +189,9 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } // Register the peer locally glog.V(logger.Detail).Infof("%v: adding peer", p) if err := pm.peers.Register(p); err != nil { @@ -199,7 +201,9 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), + p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, + p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -230,12 +234,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { defer msg.Discard() // Handle the message depending on its contents - switch msg.Code { - case StatusMsg: + switch { + case msg.Code == StatusMsg: // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message") - case GetBlockHashesMsg: + case p.version < eth62 && msg.Code == GetBlockHashesMsg: // Retrieve the number of hashes to return and from which origin hash var request getBlockHashesData if err := msg.Decode(&request); err != nil { @@ -251,7 +255,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockHashes(hashes) - case GetBlockHashesFromNumberMsg: + case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg: // Retrieve and decode the number of hashes to return and from which origin number var request getBlockHashesFromNumberData if err := msg.Decode(&request); err != nil { @@ -278,24 +282,19 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockHashes(hashes) - case BlockHashesMsg: + case p.version < eth62 && msg.Code == BlockHashesMsg: // A batch of hashes arrived to one of our previous requests - msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - reqHashInPacketsMeter.Mark(1) - var hashes []common.Hash - if err := msgStream.Decode(&hashes); err != nil { + if err := msg.Decode(&hashes); err != nil { break } - reqHashInTrafficMeter.Mark(int64(32 * len(hashes))) - // Deliver them all to the downloader for queuing - err := pm.downloader.DeliverHashes(p.id, hashes) + err := pm.downloader.DeliverHashes61(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } - case GetBlocksMsg: + case p.version < eth62 && msg.Code == GetBlocksMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { @@ -305,94 +304,279 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { var ( hash common.Hash bytes common.StorageSize - hashes []common.Hash blocks []*types.Block ) - for { + for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit { + //Retrieve the hash of the next block err := msgStream.Decode(&hash) if err == rlp.EOL { break } else if err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - hashes = append(hashes, hash) - // Retrieve the requested block, stopping if enough was found if block := pm.chainman.GetBlock(hash); block != nil { blocks = append(blocks, block) bytes += block.Size() - if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize { - break - } } } - if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 { - list := "[" - for _, hash := range hashes { - list += fmt.Sprintf("%x, ", hash[:4]) - } - list = list[:len(list)-2] + "]" - - glog.Infof("%v: no blocks found for requested hashes %s", p, list) - } return p.SendBlocks(blocks) - case BlocksMsg: + case p.version < eth62 && msg.Code == BlocksMsg: // Decode the arrived block message - msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) - reqBlockInPacketsMeter.Mark(1) - var blocks []*types.Block - if err := msgStream.Decode(&blocks); err != nil { + if err := msg.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } // Update the receive timestamp of each block for _, block := range blocks { - reqBlockInTrafficMeter.Mark(block.Size().Int64()) block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader - if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 { - pm.downloader.DeliverBlocks(p.id, blocks) + if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { + pm.downloader.DeliverBlocks61(p.id, blocks) } - case NewBlockHashesMsg: - // Retrieve and deseralize the remote new block hashes notification + // Block header query, collect the requested headers and reply + case p.version >= eth62 && msg.Code == GetBlockHeadersMsg: + // Decode the complex header query + var query getBlockHeadersData + if err := msg.Decode(&query); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + // Gather blocks until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next block satisfying the query + var origin *types.Block + if query.Origin.Hash != (common.Hash{}) { + origin = pm.chainman.GetBlock(query.Origin.Hash) + } else { + origin = pm.chainman.GetBlockByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin.Header()) + bytes += origin.Size() + + // Advance to the next block of the query + switch { + case query.Origin.Hash != (common.Hash{}) && query.Reverse: + // Hash based traversal towards the genesis block + for i := 0; i < int(query.Skip)+1; i++ { + if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil { + query.Origin.Hash = block.ParentHash() + } else { + unknown = true + break + } + } + case query.Origin.Hash != (common.Hash{}) && !query.Reverse: + // Hash based traversal towards the leaf block + if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil { + if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash { + query.Origin.Hash = block.Hash() + } else { + unknown = true + } + } else { + unknown = true + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= (query.Skip + 1) + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += (query.Skip + 1) + } + } + return p.SendBlockHeaders(headers) + + case p.version >= eth62 && msg.Code == BlockHeadersMsg: + // A batch of headers arrived to one of our previous requests + var headers []*types.Header + if err := msg.Decode(&headers); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + headers = pm.fetcher.FilterHeaders(headers, time.Now()) + } + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } + } + + case p.version >= eth62 && msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + trasactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + trasactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 { + err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } + } + + case p.version >= eth62 && msg.Code == GetBlockBodiesMsg: + // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather blocks until the fetch or network limits is reached + var ( + hash common.Hash + bytes common.StorageSize + bodies []*blockBody + ) + for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { + //Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block, stopping if enough was found + if block := pm.chainman.GetBlock(hash); block != nil { + bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) + bytes += block.Size() + } + } + return p.SendBlockBodies(bodies) - var hashes []common.Hash - if err := msgStream.Decode(&hashes); err != nil { - break + case p.version >= eth63 && msg.Code == GetNodeDataMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err } - propHashInPacketsMeter.Mark(1) - propHashInTrafficMeter.Mark(int64(32 * len(hashes))) + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + data [][]byte + ) + for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { + // Retrieve the hash of the next state entry + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + case p.version >= eth63 && msg.Code == GetReceiptsMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + receipts []*types.Receipt + ) + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch { + // Retrieve the hash of the next transaction receipt + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested receipt, stopping if enough was found + if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil { + receipts = append(receipts, receipt) + bytes += len(receipt.RlpEncode()) + } + } + return p.SendReceipts(receipts) + + case msg.Code == NewBlockHashesMsg: + // Retrieve and deseralize the remote new block hashes notification + type announce struct { + Hash common.Hash + Number uint64 + } + var announces = []announce{} + + if p.version < eth62 { + // We're running the old protocol, make block number unknown (0) + var hashes []common.Hash + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + for _, hash := range hashes { + announces = append(announces, announce{hash, 0}) + } + } else { + // Otherwise extract both block hash and number + var request newBlockHashesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + for _, block := range request { + announces = append(announces, announce{block.Hash, block.Number}) + } + } // Mark the hashes as present at the remote node - for _, hash := range hashes { - p.MarkBlock(hash) - p.SetHead(hash) + for _, block := range announces { + p.MarkBlock(block.Hash) + p.SetHead(block.Hash) } // Schedule all the unknown hashes for retrieval - unknown := make([]common.Hash, 0, len(hashes)) - for _, hash := range hashes { - if !pm.chainman.HasBlock(hash) { - unknown = append(unknown, hash) + unknown := make([]announce, 0, len(announces)) + for _, block := range announces { + if !pm.chainman.HasBlock(block.Hash) { + unknown = append(unknown, block) } } - for _, hash := range unknown { - pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks) + for _, block := range unknown { + if p.version < eth62 { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil) + } else { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies) + } } - case NewBlockMsg: + case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } - propBlockInPacketsMeter.Mark(1) - propBlockInTrafficMeter.Mark(request.Block.Size().Int64()) - if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } @@ -421,13 +605,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } - case TxMsg: + case msg.Code == TxMsg: // Transactions arrived, parse all of them and deliver to the pool var txs []*types.Transaction if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - propTxnInPacketsMeter.Mark(1) for i, tx := range txs { // Validate and mark the remote transaction if tx == nil { @@ -436,7 +619,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkTransaction(tx.Hash()) // Log it's arrival for later analysis - propTxnInTrafficMeter.Mark(tx.Size().Int64()) jsonlogger.LogJson(&logger.EthTxReceived{ TxHash: tx.Hash().Hex(), RemoteId: p.ID().String(), @@ -476,7 +658,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // Otherwise if the block is indeed in out own chain, announce it if pm.chainman.HasBlock(hash) { for _, peer := range peers { - peer.SendNewBlockHashes([]common.Hash{hash}) + if peer.version < eth62 { + peer.SendNewBlockHashes61([]common.Hash{hash}) + } else { + peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) + } } glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) } |