aboutsummaryrefslogtreecommitdiffstats
path: root/les/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'les/handler.go')
-rw-r--r--les/handler.go207
1 files changed, 79 insertions, 128 deletions
diff --git a/les/handler.go b/les/handler.go
index 64023af0f..77bc077a2 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -102,7 +102,9 @@ type ProtocolManager struct {
odr *LesOdr
server *LesServer
serverPool *serverPool
+ lesTopic discv5.Topic
reqDist *requestDistributor
+ retriever *retrieveManager
downloader *downloader.Downloader
fetcher *lightFetcher
@@ -123,12 +125,12 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
- wg sync.WaitGroup
+ wg *sync.WaitGroup
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay) (*ProtocolManager, error) {
+func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
@@ -136,15 +138,20 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain: blockchain,
chainConfig: chainConfig,
chainDb: chainDb,
+ odr: odr,
networkId: networkId,
txpool: txpool,
txrelay: txrelay,
- odr: odr,
- peers: newPeerSet(),
+ peers: peers,
newPeerCh: make(chan *peer),
- quitSync: make(chan struct{}),
+ quitSync: quitSync,
+ wg: wg,
noMorePeers: make(chan struct{}),
}
+ if odr != nil {
+ manager.retriever = odr.retriever
+ manager.reqDist = odr.retriever.dist
+ }
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
@@ -202,84 +209,22 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash,
nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash,
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
+ manager.peers.notify((*downloaderPeerNotify)(manager))
+ manager.fetcher = newLightFetcher(manager)
}
- manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
- m := make(map[distPeer]struct{})
- peers := manager.peers.AllPeers()
- for _, peer := range peers {
- m[peer] = struct{}{}
- }
- return m
- }, manager.quitSync)
- if odr != nil {
- odr.removePeer = removePeer
- odr.reqDist = manager.reqDist
- }
-
- /*validator := func(block *types.Block, parent *types.Block) error {
- return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
- }
- heighter := func() uint64 {
- return chainman.LastBlockNumberU64()
- }
- manager.fetcher = fetcher.New(chainman.GetBlockNoOdr, validator, nil, heighter, chainman.InsertChain, manager.removePeer)
- */
return manager, nil
}
+// removePeer initiates disconnection from a peer by removing it from the peer set
func (pm *ProtocolManager) removePeer(id string) {
- // Short circuit if the peer was already removed
- peer := pm.peers.Peer(id)
- if peer == nil {
- return
- }
- log.Debug("Removing light Ethereum peer", "peer", id)
- if err := pm.peers.Unregister(id); err != nil {
- if err == errNotRegistered {
- return
- }
- }
- // Unregister the peer from the downloader and Ethereum peer set
- if pm.lightSync {
- pm.downloader.UnregisterPeer(id)
- if pm.txrelay != nil {
- pm.txrelay.removePeer(id)
- }
- if pm.fetcher != nil {
- pm.fetcher.removePeer(peer)
- }
- }
- // Hard disconnect at the networking layer
- if peer != nil {
- peer.Peer.Disconnect(p2p.DiscUselessPeer)
- }
+ pm.peers.Unregister(id)
}
-func (pm *ProtocolManager) Start(srvr *p2p.Server) {
- var topicDisc *discv5.Network
- if srvr != nil {
- topicDisc = srvr.DiscV5
- }
- lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8]))
+func (pm *ProtocolManager) Start() {
if pm.lightSync {
- // start sync handler
- if srvr != nil { // srvr is nil during testing
- pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg)
- pm.odr.serverPool = pm.serverPool
- pm.fetcher = newLightFetcher(pm)
- }
go pm.syncer()
} else {
- if topicDisc != nil {
- go func() {
- logger := log.New("topic", lesTopic)
- logger.Info("Starting topic registration")
- defer logger.Info("Terminated topic registration")
-
- topicDisc.RegisterTopic(lesTopic, pm.quitSync)
- }()
- }
go func() {
for range pm.newPeerCh {
}
@@ -342,65 +287,10 @@ func (pm *ProtocolManager) handle(p *peer) error {
}()
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if pm.lightSync {
- requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
- reqID := getNextReqID()
- rq := &distReq{
- getCost: func(dp distPeer) uint64 {
- peer := dp.(*peer)
- return peer.GetRequestCost(GetBlockHeadersMsg, amount)
- },
- canSend: func(dp distPeer) bool {
- return dp.(*peer) == p
- },
- request: func(dp distPeer) func() {
- peer := dp.(*peer)
- cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
- },
- }
- _, ok := <-pm.reqDist.queue(rq)
- if !ok {
- return ErrNoPeers
- }
- return nil
- }
- requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
- reqID := getNextReqID()
- rq := &distReq{
- getCost: func(dp distPeer) uint64 {
- peer := dp.(*peer)
- return peer.GetRequestCost(GetBlockHeadersMsg, amount)
- },
- canSend: func(dp distPeer) bool {
- return dp.(*peer) == p
- },
- request: func(dp distPeer) func() {
- peer := dp.(*peer)
- cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
- peer.fcServer.QueueRequest(reqID, cost)
- return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
- },
- }
- _, ok := <-pm.reqDist.queue(rq)
- if !ok {
- return ErrNoPeers
- }
- return nil
- }
- if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
- requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
- return err
- }
- if pm.txrelay != nil {
- pm.txrelay.addPeer(p)
- }
-
p.lock.Lock()
head := p.headInfo
p.lock.Unlock()
if pm.fetcher != nil {
- pm.fetcher.addPeer(p)
pm.fetcher.announce(p, head)
}
@@ -926,7 +816,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if deliverMsg != nil {
- err := pm.odr.Deliver(p, deliverMsg)
+ err := pm.retriever.deliver(p, deliverMsg)
if err != nil {
p.responseErrors++
if p.responseErrors > maxResponseErrors {
@@ -946,3 +836,64 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
Head: self.blockchain.LastBlockHash(),
}
}
+
+// downloaderPeerNotify implements peerSetNotify
+type downloaderPeerNotify ProtocolManager
+
+func (d *downloaderPeerNotify) registerPeer(p *peer) {
+ pm := (*ProtocolManager)(d)
+
+ requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
+ reqID := genReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
+ }
+ requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
+ reqID := genReqID()
+ rq := &distReq{
+ getCost: func(dp distPeer) uint64 {
+ peer := dp.(*peer)
+ return peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ },
+ canSend: func(dp distPeer) bool {
+ return dp.(*peer) == p
+ },
+ request: func(dp distPeer) func() {
+ peer := dp.(*peer)
+ cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
+ peer.fcServer.QueueRequest(reqID, cost)
+ return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
+ },
+ }
+ _, ok := <-pm.reqDist.queue(rq)
+ if !ok {
+ return ErrNoPeers
+ }
+ return nil
+ }
+
+ pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil)
+}
+
+func (d *downloaderPeerNotify) unregisterPeer(p *peer) {
+ pm := (*ProtocolManager)(d)
+ pm.downloader.UnregisterPeer(p.id)
+}