diff options
Diffstat (limited to 'les/handler.go')
-rw-r--r-- | les/handler.go | 109 |
1 files changed, 53 insertions, 56 deletions
diff --git a/les/handler.go b/les/handler.go index fdf4e6e8a..b024841f2 100644 --- a/les/handler.go +++ b/les/handler.go @@ -22,8 +22,8 @@ import ( "errors" "fmt" "math/big" + "net" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -58,7 +58,7 @@ const ( MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request MaxTxSend = 64 // Amount of transactions to be send per request - disableClientRemovePeer = true + disableClientRemovePeer = false ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -101,10 +101,7 @@ type ProtocolManager struct { chainDb ethdb.Database odr *LesOdr server *LesServer - - topicDisc *discv5.Network - lesTopic discv5.Topic - p2pServer *p2p.Server + serverPool *serverPool downloader *downloader.Downloader fetcher *lightFetcher @@ -157,13 +154,29 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + var entry *poolEntry peer := manager.newPeer(int(version), networkId, p, rw) + if manager.serverPool != nil { + addr := p.RemoteAddr().(*net.TCPAddr) + entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port)) + if entry == nil { + return fmt.Errorf("unwanted connection") + } + } + peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - return manager.handle(peer) + err := manager.handle(peer) + if entry != nil { + manager.serverPool.disconnect(entry) + } + return err case <-manager.quitSync: + if entry != nil { + manager.serverPool.disconnect(entry) + } return p2p.DiscQuitting } }, @@ -192,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) - manager.fetcher = newLightFetcher(manager) } if odr != nil { @@ -222,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) { glog.V(logger.Debug).Infof("LES: unregister peer %v", id) if pm.lightSync { pm.downloader.UnregisterPeer(id) - pm.odr.UnregisterPeer(peer) if pm.txrelay != nil { pm.txrelay.removePeer(id) } + if pm.fetcher != nil { + pm.fetcher.removePeer(peer) + } } if err := pm.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Removal failed:", err) @@ -236,54 +250,26 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) findServers() { - if pm.p2pServer == nil || pm.topicDisc == nil { - return - } - glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic)) - enodes := make(chan string, 100) - stop := make(chan struct{}) - go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes) - go func() { - added := make(map[string]bool) - for { - select { - case enode := <-enodes: - if !added[enode] { - glog.V(logger.Info).Infoln("Found LES server:", enode) - added[enode] = true - if node, err := discover.ParseNode(enode); err == nil { - pm.p2pServer.AddPeer(node) - } - } - case <-stop: - return - } - } - }() - select { - case <-time.After(time.Second * 20): - case <-pm.quitSync: - } - close(stop) -} - func (pm *ProtocolManager) Start(srvr *p2p.Server) { - pm.p2pServer = srvr + var topicDisc *discv5.Network if srvr != nil { - pm.topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5 } - pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) + lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - go pm.findServers() + if srvr != nil { // srvr is nil during testing + pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + pm.odr.serverPool = pm.serverPool + pm.fetcher = newLightFetcher(pm) + } go pm.syncer() } else { - if pm.topicDisc != nil { + if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic)) - pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic)) + glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic)) + topicDisc.RegisterTopic(lesTopic, pm.quitSync) + glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { @@ -352,13 +338,13 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("LES: register peer %v", p.id) if pm.lightSync { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) @@ -367,12 +353,21 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { return err } - pm.odr.RegisterPeer(p) if pm.txrelay != nil { pm.txrelay.addPeer(p) } - pm.fetcher.notify(p, nil) + p.lock.Lock() + head := p.headInfo + p.lock.Unlock() + if pm.fetcher != nil { + pm.fetcher.addPeer(p) + pm.fetcher.announce(p, head) + } + + if p.poolEntry != nil { + pm.serverPool.registered(p.poolEntry) + } } stop := make(chan struct{}) @@ -454,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "%v: %v", msg, err) } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) - pm.fetcher.notify(p, &req) + if pm.fetcher != nil { + go pm.fetcher.announce(p, &req) + } case GetBlockHeadersMsg: glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id) @@ -552,8 +549,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.GotReply(resp.ReqID, resp.BV) - if pm.fetcher.requestedID(resp.ReqID) { - pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers) + if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) { + pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { err := pm.downloader.DeliverHeaders(p.id, resp.Headers) if err != nil { |