diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-08-15 02:25:41 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-08-25 22:48:47 +0800 |
commit | 47a7fe5d22fe2a6be783f6576070814fe951eaaf (patch) | |
tree | 61f2f691c6775fa5ae3547b8d769a709b7b3f04c /eth/handler.go | |
parent | ca88e18f59af84f34ad67da21fd27a6407eea87c (diff) | |
download | go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.gz go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.tar.zst go-tangerine-47a7fe5d22fe2a6be783f6576070814fe951eaaf.zip |
eth: port the synchronisation algo to eth/62
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 62 |
1 files changed, 56 insertions, 6 deletions
diff --git a/eth/handler.go b/eth/handler.go index 25ff0eef0..e7404e36a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -201,7 +201,9 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), + p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, + p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -287,7 +289,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { break } // Deliver them all to the downloader for queuing - err := pm.downloader.DeliverHashes(p.id, hashes) + err := pm.downloader.DeliverHashes61(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) } @@ -332,8 +334,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader - if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 { - pm.downloader.DeliverBlocks(p.id, blocks) + if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 { + pm.downloader.DeliverBlocks61(p.id, blocks) } // Block header query, collect the requested headers and reply @@ -401,6 +403,46 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendBlockHeaders(headers) + case p.version >= eth62 && msg.Code == BlockHeadersMsg: + // A batch of headers arrived to one of our previous requests + var headers []*types.Header + if err := msg.Decode(&headers); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + headers = pm.fetcher.FilterHeaders(headers, time.Now()) + } + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } + } + + case p.version >= eth62 && msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + trasactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + trasactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 { + err := pm.downloader.DeliverBodies(p.id, trasactions, uncles) + if err != nil { + glog.V(logger.Debug).Infoln(err) + } + } + case p.version >= eth62 && msg.Code == GetBlockBodiesMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) @@ -522,7 +564,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } for _, block := range unknown { - pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks) + if p.version < eth62 { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil) + } else { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies) + } } case msg.Code == NewBlockMsg: @@ -612,7 +658,11 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // Otherwise if the block is indeed in out own chain, announce it if pm.chainman.HasBlock(hash) { for _, peer := range peers { - peer.SendNewBlockHashes([]common.Hash{hash}) + if peer.version < eth62 { + peer.SendNewBlockHashes61([]common.Hash{hash}) + } else { + peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) + } } glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) } |