diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-29 17:44:00 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-07-01 00:00:01 +0800 |
commit | 6fc85f1ec221f976399af071a75ad7264b0ee905 (patch) | |
tree | 6ef805673f49ae9662f5c724f917f88bbba9a023 /eth/handler.go | |
parent | 2c8ed76e01161d9fe4e69064404cd888b4e327f3 (diff) | |
download | dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.gz dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.tar.zst dexon-6fc85f1ec221f976399af071a75ad7264b0ee905.zip |
eth: clean up peer struct a bit, fix double txn bcast
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/eth/handler.go b/eth/handler.go index d25118337..d0456446d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -158,9 +158,7 @@ func (pm *ProtocolManager) Stop() { } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - td, current, genesis := pm.chainman.Status() - - return newPeer(pv, nv, genesis, current, td, p, rw) + return newPeer(pv, nv, p, rw) } // handle is the callback invoked to manage the life cycle of an eth peer. When @@ -169,7 +167,8 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("%v: peer connected", p) // Execute the Ethereum handshake - if err := p.handleStatus(); err != nil { + td, head, genesis := pm.chainman.Status() + if err := p.Handshake(td, head, genesis); err != nil { glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } @@ -182,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil { return err } // Propagate existing transactions. new transactions appearing @@ -225,9 +224,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } propTxnInPacketsMeter.Mark(1) for i, tx := range txs { + // Validate and mark the remote transaction if tx == nil { return errResp(ErrDecode, "transaction %d is nil", i) } + p.MarkTransaction(tx.Hash()) + + // Log it's arrival for later analysis propTxnInTrafficMeter.Mark(tx.Size().Int64()) jsonlogger.LogJson(&logger.EthTxReceived{ TxHash: tx.Hash().Hex(), @@ -255,7 +258,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } // returns either requested hashes or nothing (i.e. not found) - return p.sendBlockHashes(hashes) + return p.SendBlockHashes(hashes) case BlockHashesMsg: // A batch of hashes arrived to one of our previous requests @@ -314,7 +317,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { glog.Infof("%v: no blocks found for requested hashes %s", p, list) } - return p.sendBlocks(blocks) + return p.SendBlocks(blocks) case BlocksMsg: // Decode the arrived block message @@ -349,7 +352,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Mark the hashes as present at the remote node for _, hash := range hashes { - p.blockHashes.Add(hash) + p.MarkBlock(hash) p.SetHead(hash) } // Schedule all the unknown hashes for retrieval @@ -360,7 +363,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } for _, hash := range unknown { - pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks) + pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks) } case NewBlockMsg: @@ -387,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { RemoteId: p.ID().String(), }) // Mark the peer as owning the block and schedule it for import - p.blockHashes.Add(request.Block.Hash()) + p.MarkBlock(request.Block.Hash()) p.SetHead(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block) @@ -412,14 +415,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { if propagate { transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { - peer.sendNewBlock(block) + peer.SendNewBlock(block) } glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt)) } // Otherwise if the block is indeed in out own chain, announce it if pm.chainman.HasBlock(hash) { for _, peer := range peers { - peer.sendNewBlockHashes([]common.Hash{hash}) + peer.SendNewBlockHashes([]common.Hash{hash}) } glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) } @@ -432,7 +435,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { - peer.sendTransactions(types.Transactions{tx}) + peer.SendTransactions(types.Transactions{tx}) } glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") } |