diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2016-12-13 03:46:15 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-12-13 03:46:15 +0800 |
commit | a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b (patch) | |
tree | faccd87d6e1634b51f788fa170bc1f03a829ca42 | |
parent | ee445a2ba4013f8b32e4e5386322babf022e5b81 (diff) | |
parent | f12f8a6c14dbaf6e6531cea1b0cf169b851e1894 (diff) | |
download | go-tangerine-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.tar.gz go-tangerine-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.tar.zst go-tangerine-a98e8c0889d7c4c1bded452c577bd4b9c7fa0f6b.zip |
Merge pull request #3413 from zsfelfoldi/light-topic4
les, p2p/discv5: implement server pool, improve peer selection, light fetcher and topic searching
-rw-r--r-- | eth/backend.go | 2 | ||||
-rw-r--r-- | eth/handler.go | 4 | ||||
-rw-r--r-- | eth/sync.go | 9 | ||||
-rw-r--r-- | les/fetcher.go | 766 | ||||
-rw-r--r-- | les/handler.go | 109 | ||||
-rw-r--r-- | les/helper_test.go | 11 | ||||
-rw-r--r-- | les/odr.go | 73 | ||||
-rw-r--r-- | les/odr_peerset.go | 120 | ||||
-rw-r--r-- | les/odr_requests.go | 29 | ||||
-rw-r--r-- | les/odr_test.go | 8 | ||||
-rw-r--r-- | les/peer.go | 84 | ||||
-rw-r--r-- | les/randselect.go | 173 | ||||
-rw-r--r-- | les/randselect_test.go | 67 | ||||
-rw-r--r-- | les/request_test.go | 7 | ||||
-rw-r--r-- | les/server.go | 37 | ||||
-rw-r--r-- | les/serverpool.go | 766 | ||||
-rw-r--r-- | light/lightchain.go | 11 | ||||
-rw-r--r-- | light/odr.go | 15 | ||||
-rw-r--r-- | light/odr_util.go | 5 | ||||
-rw-r--r-- | p2p/discv5/net.go | 119 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 44 |
21 files changed, 1913 insertions, 546 deletions
diff --git a/eth/backend.go b/eth/backend.go index d5b767b12..f98c9b724 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -104,6 +104,7 @@ type Config struct { type LesServer interface { Start(srvr *p2p.Server) + Synced() Stop() Protocols() []p2p.Protocol } @@ -145,6 +146,7 @@ type Ethereum struct { func (s *Ethereum) AddLesServer(ls LesServer) { s.lesServer = ls + s.protocolManager.lesServer = ls } // New creates a new Ethereum object (including the diff --git a/eth/handler.go b/eth/handler.go index 8f05cc5b1..771e69b8d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -87,6 +87,8 @@ type ProtocolManager struct { quitSync chan struct{} noMorePeers chan struct{} + lesServer LesServer + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -171,7 +173,7 @@ func NewProtocolManager(config *params.ChainConfig, fastSync bool, networkId int return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { - atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import + manager.setSynced() // Mark initial sync done on any fetcher import return manager.insertChain(blocks) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) diff --git a/eth/sync.go b/eth/sync.go index 373cc2054..234534b4f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -181,7 +181,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } - atomic.StoreUint32(&pm.synced, 1) // Mark initial sync done + pm.setSynced() // Mark initial sync done // If fast sync was enabled, and we synced up, disable it if atomic.LoadUint32(&pm.fastSync) == 1 { @@ -192,3 +192,10 @@ func (pm *ProtocolManager) synchronise(peer *peer) { } } } + +// setSynced sets the synced flag and notifies the light server if present +func (pm *ProtocolManager) setSynced() { + if atomic.SwapUint32(&pm.synced, 1) == 0 && pm.lesServer != nil { + pm.lesServer.Synced() + } +} diff --git a/les/fetcher.go b/les/fetcher.go index ae9bf8474..c23af8da3 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -23,172 +23,426 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/light" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" ) +const ( + blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others + maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer +) + +// lightFetcher type lightFetcher struct { pm *ProtocolManager odr *LesOdr - chain BlockChain - - headAnnouncedMu sync.Mutex - headAnnouncedBy map[common.Hash][]*peer - currentTd *big.Int - deliverChn chan fetchResponse - reqMu sync.RWMutex - requested map[uint64]fetchRequest - timeoutChn chan uint64 - notifyChn chan bool // true if initiated from outside - syncing bool - syncDone chan struct{} + chain *light.LightChain + + maxConfirmedTd *big.Int + peers map[*peer]*fetcherPeerInfo + lastUpdateStats *updateStatsEntry + + lock sync.Mutex // qwerqwerqwe + deliverChn chan fetchResponse + reqMu sync.RWMutex + requested map[uint64]fetchRequest + timeoutChn chan uint64 + requestChn chan bool // true if initiated from outside + syncing bool + syncDone chan *peer +} + +// fetcherPeerInfo holds fetcher-specific information about each active peer +type fetcherPeerInfo struct { + root, lastAnnounced *fetcherTreeNode + nodeCnt int + confirmedTd *big.Int + bestConfirmed *fetcherTreeNode + nodeByHash map[common.Hash]*fetcherTreeNode + firstUpdateStats *updateStatsEntry } +// fetcherTreeNode is a node of a tree that holds information about blocks recently +// announced and confirmed by a certain peer. Each new announce message from a peer +// adds nodes to the tree, based on the previous announced head and the reorg depth. +// There are three possible states for a tree node: +// - announced: not downloaded (known) yet, but we know its head, number and td +// - intermediate: not known, hash and td are empty, they are filled out when it becomes known +// - known: both announced by this peer and downloaded (from any peer). +// This structure makes it possible to always know which peer has a certain block, +// which is necessary for selecting a suitable peer for ODR requests and also for +// canonizing new heads. It also helps to always download the minimum necessary +// amount of headers with a single request. +type fetcherTreeNode struct { + hash common.Hash + number uint64 + td *big.Int + known, requested bool + parent *fetcherTreeNode + children []*fetcherTreeNode +} + +// fetchRequest represents a header download request type fetchRequest struct { - hash common.Hash - amount uint64 - peer *peer + hash common.Hash + amount uint64 + peer *peer + sent mclock.AbsTime + timeout bool } +// fetchResponse represents a header download response type fetchResponse struct { reqID uint64 headers []*types.Header + peer *peer } +// newLightFetcher creates a new light fetcher func newLightFetcher(pm *ProtocolManager) *lightFetcher { f := &lightFetcher{ - pm: pm, - chain: pm.blockchain, - odr: pm.odr, - headAnnouncedBy: make(map[common.Hash][]*peer), - deliverChn: make(chan fetchResponse, 100), - requested: make(map[uint64]fetchRequest), - timeoutChn: make(chan uint64), - notifyChn: make(chan bool, 100), - syncDone: make(chan struct{}), - currentTd: big.NewInt(0), + pm: pm, + chain: pm.blockchain.(*light.LightChain), + odr: pm.odr, + peers: make(map[*peer]*fetcherPeerInfo), + deliverChn: make(chan fetchResponse, 100), + requested: make(map[uint64]fetchRequest), + timeoutChn: make(chan uint64), + requestChn: make(chan bool, 100), + syncDone: make(chan *peer), + maxConfirmedTd: big.NewInt(0), } go f.syncLoop() return f } -func (f *lightFetcher) notify(p *peer, head *announceData) { - var headHash common.Hash - if head == nil { - // initial notify - headHash = p.Head() - } else { - if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil { - head.haveHeaders = head.Number - } - //fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders) - if !p.addNotify(head) { - //fmt.Println("addNotify fail") - f.pm.removePeer(p.id) +// syncLoop is the main event loop of the light fetcher +func (f *lightFetcher) syncLoop() { + f.pm.wg.Add(1) + defer f.pm.wg.Done() + + requestStarted := false + for { + select { + case <-f.pm.quitSync: + return + // when a new announce is received, request loop keeps running until + // no further requests are necessary or possible + case newAnnounce := <-f.requestChn: + f.lock.Lock() + s := requestStarted + requestStarted = false + if !f.syncing && !(newAnnounce && s) { + if peer, node, amount := f.nextRequest(); node != nil { + requestStarted = true + reqID, started := f.request(peer, node, amount) + if started { + go func() { + time.Sleep(softRequestTimeout) + f.reqMu.Lock() + req, ok := f.requested[reqID] + if ok { + req.timeout = true + f.requested[reqID] = req + } + f.reqMu.Unlock() + // keep starting new requests while possible + f.requestChn <- false + }() + } + } + } + f.lock.Unlock() + case reqID := <-f.timeoutChn: + f.reqMu.Lock() + req, ok := f.requested[reqID] + if ok { + delete(f.requested, reqID) + } + f.reqMu.Unlock() + if ok { + f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true) + glog.V(logger.Debug).Infof("hard timeout by peer %v", req.peer.id) + go f.pm.removePeer(req.peer.id) + } + case resp := <-f.deliverChn: + f.reqMu.Lock() + req, ok := f.requested[resp.reqID] + if ok && req.peer != resp.peer { + ok = false + } + if ok { + delete(f.requested, resp.reqID) + } + f.reqMu.Unlock() + if ok { + f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout) + } + f.lock.Lock() + if !ok || !(f.syncing || f.processResponse(req, resp)) { + glog.V(logger.Debug).Infof("failed processing response by peer %v", resp.peer.id) + go f.pm.removePeer(resp.peer.id) + } + f.lock.Unlock() + case p := <-f.syncDone: + f.lock.Lock() + glog.V(logger.Debug).Infof("done synchronising with peer %v", p.id) + f.checkSyncedHeaders(p) + f.syncing = false + f.lock.Unlock() } - headHash = head.Hash } - f.headAnnouncedMu.Lock() - f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p) - f.headAnnouncedMu.Unlock() - f.notifyChn <- true } -func (f *lightFetcher) gotHeader(header *types.Header) { - f.headAnnouncedMu.Lock() - defer f.headAnnouncedMu.Unlock() +// addPeer adds a new peer to the fetcher's peer set +func (f *lightFetcher) addPeer(p *peer) { + p.lock.Lock() + p.hasBlock = func(hash common.Hash, number uint64) bool { + return f.peerHasBlock(p, hash, number) + } + p.lock.Unlock() + + f.lock.Lock() + defer f.lock.Unlock() + + f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} +} + +// removePeer removes a new peer from the fetcher's peer set +func (f *lightFetcher) removePeer(p *peer) { + p.lock.Lock() + p.hasBlock = nil + p.lock.Unlock() + + f.lock.Lock() + defer f.lock.Unlock() + + // check for potential timed out block delay statistics + f.checkUpdateStats(p, nil) + delete(f.peers, p) +} + +// announce processes a new announcement message received from a peer, adding new +// nodes to the peer's block tree and removing old nodes if necessary +func (f *lightFetcher) announce(p *peer, head *announceData) { + f.lock.Lock() + defer f.lock.Unlock() + glog.V(logger.Debug).Infof("received announce from peer %v #%d %016x reorg: %d", p.id, head.Number, head.Hash[:8], head.ReorgDepth) + + fp := f.peers[p] + if fp == nil { + glog.V(logger.Debug).Infof("announce: unknown peer") + return + } - hash := header.Hash() - peerList := f.headAnnouncedBy[hash] - if peerList == nil { + if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 { + // announced tds should be strictly monotonic + glog.V(logger.Debug).Infof("non-monotonic Td from peer %v", p.id) + go f.pm.removePeer(p.id) return } - number := header.Number.Uint64() - td := core.GetTd(f.pm.chainDb, hash, number) - for _, peer := range peerList { - peer.lock.Lock() - ok := peer.gotHeader(hash, number, td) - peer.lock.Unlock() - if !ok { - //fmt.Println("gotHeader fail") - f.pm.removePeer(peer.id) + + n := fp.lastAnnounced + for i := uint64(0); i < head.ReorgDepth; i++ { + if n == nil { + break } + n = n.parent } - delete(f.headAnnouncedBy, hash) -} + if n != nil { + // n is now the reorg common ancestor, add a new branch of nodes + // check if the node count is too high to add new nodes + locked := false + for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { + if !locked { + f.chain.LockChain() + defer f.chain.UnlockChain() + locked = true + } + // if one of root's children is canonical, keep it, delete other branches and root itself + var newRoot *fetcherTreeNode + for i, nn := range fp.root.children { + if core.GetCanonicalHash(f.pm.chainDb, nn.number) == nn.hash { + fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...) + nn.parent = nil + newRoot = nn + break + } + } + fp.deleteNode(fp.root) + if n == fp.root { + n = newRoot + } + fp.root = newRoot + if newRoot == nil || !f.checkKnownNode(p, newRoot) { + fp.bestConfirmed = nil + fp.confirmedTd = nil + } -func (f *lightFetcher) nextRequest() (*peer, *announceData) { - var bestPeer *peer - bestTd := f.currentTd - for _, peer := range f.pm.peers.AllPeers() { - peer.lock.RLock() - if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 || - (bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) { - bestPeer = peer - bestTd = peer.headInfo.Td + if n == nil { + break + } } - peer.lock.RUnlock() - } - if bestPeer == nil { - return nil, nil - } - bestPeer.lock.Lock() - res := bestPeer.headInfo - res.requested = true - bestPeer.lock.Unlock() - for _, peer := range f.pm.peers.AllPeers() { - if peer != bestPeer { - peer.lock.Lock() - if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders { - peer.headInfo.requested = true + if n != nil { + for n.number < head.Number { + nn := &fetcherTreeNode{number: n.number + 1, parent: n} + n.children = append(n.children, nn) + n = nn + fp.nodeCnt++ } - peer.lock.Unlock() + n.hash = head.Hash + n.td = head.Td + fp.nodeByHash[n.hash] = n } } - return bestPeer, res -} + if n == nil { + // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed + if fp.root != nil { + fp.deleteNode(fp.root) + } + n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td} + fp.root = n + fp.nodeCnt++ + fp.nodeByHash[n.hash] = n + fp.bestConfirmed = nil + fp.confirmedTd = nil + } -func (f *lightFetcher) deliverHeaders(reqID uint64, headers []*types.Header) { - f.deliverChn <- fetchResponse{reqID: reqID, headers: headers} + f.checkKnownNode(p, n) + p.lock.Lock() + p.headInfo = head + fp.lastAnnounced = n + p.lock.Unlock() + f.checkUpdateStats(p, nil) + f.requestChn <- true } -func (f *lightFetcher) requestedID(reqID uint64) bool { - f.reqMu.RLock() - _, ok := f.requested[reqID] - f.reqMu.RUnlock() - return ok +// peerHasBlock returns true if we can assume the peer knows the given block +// based on its announcements +func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool { + f.lock.Lock() + defer f.lock.Unlock() + + fp := f.peers[p] + if fp == nil || fp.root == nil { + return false + } + + if number >= fp.root.number { + // it is recent enough that if it is known, is should be in the peer's block tree + return fp.nodeByHash[hash] != nil + } + f.chain.LockChain() + defer f.chain.UnlockChain() + // if it's older than the peer's block tree root but it's in the same canonical chain + // than the root, we can still be sure the peer knows it + return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash } -func (f *lightFetcher) request(p *peer, block *announceData) { - //fmt.Println("request", p.id, block.Number, block.haveHeaders) - amount := block.Number - block.haveHeaders - if amount == 0 { - return +// request initiates a header download request from a certain peer +func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) { + fp := f.peers[p] + if fp == nil { + glog.V(logger.Debug).Infof("request: unknown peer") + return 0, false } - if amount > 100 { + if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { f.syncing = true go func() { - //fmt.Println("f.pm.synchronise(p)") + glog.V(logger.Debug).Infof("synchronising with peer %v", p.id) f.pm.synchronise(p) - //fmt.Println("sync done") - f.syncDone <- struct{}{} + f.syncDone <- p }() - return + return 0, false } - reqID := f.odr.getNextReqID() - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p} - f.reqMu.Unlock() + reqID := getNextReqID() + n.requested = true cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) p.fcServer.SendRequest(reqID, cost) - go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true) + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()} + f.reqMu.Unlock() + go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true) go func() { time.Sleep(hardRequestTimeout) f.timeoutChn <- reqID }() + return reqID, true +} + +// requestAmount calculates the amount of headers to be downloaded starting +// from a certain head backwards +func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { + amount := uint64(0) + nn := n + for nn != nil && !f.checkKnownNode(p, nn) { + nn = nn.parent + amount++ + } + if nn == nil { + amount = n.number + } + return amount } +// requestedID tells if a certain reqID has been requested by the fetcher +func (f *lightFetcher) requestedID(reqID uint64) bool { + f.reqMu.RLock() + _, ok := f.requested[reqID] + f.reqMu.RUnlock() + return ok +} + +// nextRequest selects the peer and announced head to be requested next, amount +// to be downloaded starting from the head backwards is also returned +func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { + var ( + bestHash common.Hash + bestAmount uint64 + ) + bestTd := f.maxConfirmedTd + + for p, fp := range f.peers { + for hash, n := range fp.nodeByHash { + if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) { + amount := f.requestAmount(p, n) + if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount { + bestHash = hash + bestAmount = amount + bestTd = n.td + } + } + } + } + if bestTd == f.maxConfirmedTd { + return nil, nil, 0 + } + + peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) { + fp := f.peers[p] + if fp == nil || fp.nodeByHash[bestHash] == nil { + return false, 0 + } + return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) + }) + var node *fetcherTreeNode + if peer != nil { + node = f.peers[peer].nodeByHash[bestHash] + } + return peer, node, bestAmount +} + +// deliverHeaders delivers header download request responses for processing +func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) { + f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} +} + +// processResponse processes header download request responses func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { return false @@ -200,96 +454,248 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { return false } - for _, header := range headers { - td := core.GetTd(f.pm.chainDb, header.Hash(), header.Number.Uint64()) + tds := make([]*big.Int, len(headers)) + for i, header := range headers { + td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) if td == nil { return false } - if td.Cmp(f.currentTd) > 0 { - f.currentTd = td - } - f.gotHeader(header) + tds[i] = td } + f.newHeaders(headers, tds) return true } -func (f *lightFetcher) checkSyncedHeaders() { - //fmt.Println("checkSyncedHeaders()") - for _, peer := range f.pm.peers.AllPeers() { - peer.lock.Lock() - h := peer.firstHeadInfo - remove := false - loop: - for h != nil { - if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil { - //fmt.Println(" found", h.Number) - ok := peer.gotHeader(h.Hash, h.Number, td) - if !ok { - remove = true - break loop - } - if td.Cmp(f.currentTd) > 0 { - f.currentTd = td - } - } - h = h.next +// newHeaders updates the block trees of all active peers according to a newly +// downloaded and validated batch or headers +func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { + var maxTd *big.Int + for p, fp := range f.peers { + if !f.checkAnnouncedHeaders(fp, headers, tds) { + glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id) + go f.pm.removePeer(p.id) } - peer.lock.Unlock() - if remove { - //fmt.Println("checkSync fail") - f.pm.removePeer(peer.id) + if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) { + maxTd = fp.confirmedTd } } + if maxTd != nil { + f.updateMaxConfirmedTd(maxTd) + } } -func (f *lightFetcher) syncLoop() { - f.pm.wg.Add(1) - defer f.pm.wg.Done() +// checkAnnouncedHeaders updates peer's block tree if necessary after validating +// a batch of headers. It searches for the latest header in the batch that has a +// matching tree node (if any), and if it has not been marked as known already, +// sets it and its parents to known (even those which are older than the currently +// validated ones). Return value shows if all hashes, numbers and Tds matched +// correctly to the announced values (otherwise the peer should be dropped). +func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool { + var ( + n *fetcherTreeNode + header *types.Header + td *big.Int + ) - srtoNotify := false - for { - select { - case <-f.pm.quitSync: - return - case ext := <-f.notifyChn: - //fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify) - s := srtoNotify - srtoNotify = false - if !f.syncing && !(ext && s) { - if p, r := f.nextRequest(); r != nil { - srtoNotify = true - go func() { - time.Sleep(softRequestTimeout) - f.notifyChn <- false - }() - f.request(p, r) + for i := len(headers) - 1; ; i-- { + if i < 0 { + if n == nil { + // no more headers and nothing to match + return true + } + // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching + td = f.chain.GetTd(header.ParentHash, header.Number.Uint64()-1) + header = f.chain.GetHeader(header.ParentHash, header.Number.Uint64()-1) + } else { + header = headers[i] + td = tds[i] + } + hash := header.Hash() + number := header.Number.Uint64() + if n == nil { + n = fp.nodeByHash[hash] + } + if n != nil { + if n.td == nil { + // node was unannounced + if nn := fp.nodeByHash[hash]; nn != nil { + // if there was already a node with the same hash, continue there and drop this one + nn.children = append(nn.children, n.children...) + n.children = nil + fp.deleteNode(n) + n = nn + } else { + n.hash = hash + n.td = td + fp.nodeByHash[hash] = n } } - case reqID := <-f.timeoutChn: - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - delete(f.requested, reqID) + // check if it matches the header + if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 { + // peer has previously made an invalid announcement + return false } - f.reqMu.Unlock() - if ok { - //fmt.Println("hard timeout") - f.pm.removePeer(req.peer.id) + if n.known { + // we reached a known node that matched our expectations, return with success + return true } - case resp := <-f.deliverChn: - //fmt.Println("<-f.deliverChn", f.syncing) - f.reqMu.Lock() - req, ok := f.requested[resp.reqID] - delete(f.requested, resp.reqID) - f.reqMu.Unlock() - if !ok || !(f.syncing || f.processResponse(req, resp)) { - //fmt.Println("processResponse fail") - f.pm.removePeer(req.peer.id) + n.known = true + if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 { + fp.confirmedTd = td + fp.bestConfirmed = n } - case <-f.syncDone: - //fmt.Println("<-f.syncDone", f.syncing) - f.checkSyncedHeaders() - f.syncing = false + n = n.parent + if n == nil { + return true + } + } + } +} + +// checkSyncedHeaders updates peer's block tree after synchronisation by marking +// downloaded headers as known. If none of the announced headers are found after +// syncing, the peer is dropped. +func (f *lightFetcher) checkSyncedHeaders(p *peer) { + fp := f.peers[p] + if fp == nil { + glog.V(logger.Debug).Infof("checkSyncedHeaders: unknown peer") + return + } + n := fp.lastAnnounced + var td *big.Int + for n != nil { + if td = f.chain.GetTd(n.hash, n.number); td != nil { + break + } + n = n.parent + } + // now n is the latest downloaded header after syncing + if n == nil { + glog.V(logger.Debug).Infof("synchronisation failed with peer %v", p.id) + go f.pm.removePeer(p.id) + } else { + header := f.chain.GetHeader(n.hash, n.number) + f.newHeaders([]*types.Header{header}, []*big.Int{td}) + } +} + +// checkKnownNode checks if a block tree node is known (downloaded and validated) +// If it was not known previously but found in the database, sets its known flag +func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool { + if n.known { + return true + } + td := f.chain.GetTd(n.hash, n.number) + if td == nil { + return false + } + + fp := f.peers[p] + if fp == nil { + glog.V(logger.Debug).Infof("checkKnownNode: unknown peer") + return false + } + header := f.chain.GetHeader(n.hash, n.number) + if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) { + glog.V(logger.Debug).Infof("announce inconsistency by peer %v", p.id) + go f.pm.removePeer(p.id) + } + if fp.confirmedTd != nil { + f.updateMaxConfirmedTd(fp.confirmedTd) + } + return n.known +} + +// deleteNode deletes a node and its child subtrees from a peer's block tree +func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) { + if n.parent != nil { + for i, nn := range n.parent.children { + if nn == n { + n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...) + break + } + } + } + for { + if n.td != nil { + delete(fp.nodeByHash, n.hash) + } + fp.nodeCnt-- + if len(n.children) == 0 { + return + } + for i, nn := range n.children { + if i == 0 { + n = nn + } else { + fp.deleteNode(nn) + } + } + } +} + +// updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td +// than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values +// and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated +// both globally for all peers and also for each individual peer (meaning that the given peer has announced the head +// and it has also been downloaded from any peer, either before or after the given announcement). +// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer, +// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed +// the current global head). +type updateStatsEntry struct { + time mclock.AbsTime + td *big.Int + next *updateStatsEntry +} + +// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed, +// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have +// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics. +// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a +// positive block delay value. +func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { + if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 { + f.maxConfirmedTd = td + newEntry := &updateStatsEntry{ + time: mclock.Now(), + td: td, + } + if f.lastUpdateStats != nil { + f.lastUpdateStats.next = newEntry + } + f.lastUpdateStats = newEntry + for p, _ := range f.peers { + f.checkUpdateStats(p, newEntry) + } + } +} + +// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it +// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the +// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed, +// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry +// items are removed from the head of the linked list. +// If a new entry has been added to the global tail, it is passed as a parameter here even though this function +// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil), +// it can set the new head to newEntry. +func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { + now := mclock.Now() + fp := f.peers[p] + if fp == nil { + glog.V(logger.Debug).Infof("checkUpdateStats: unknown peer") + return + } + if newEntry != nil && fp.firstUpdateStats == nil { + fp.firstUpdateStats = newEntry + } + for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) { + f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout) + fp.firstUpdateStats = fp.firstUpdateStats.next + } + if fp.confirmedTd != nil { + for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 { + f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time)) + fp.firstUpdateStats = fp.firstUpdateStats.next } } } diff --git a/les/handler.go b/les/handler.go index fdf4e6e8a..b024841f2 100644 --- a/les/handler.go +++ b/les/handler.go @@ -22,8 +22,8 @@ import ( "errors" "fmt" "math/big" + "net" "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -58,7 +58,7 @@ const ( MaxHeaderProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request MaxTxSend = 64 // Amount of transactions to be send per request - disableClientRemovePeer = true + disableClientRemovePeer = false ) // errIncompatibleConfig is returned if the requested protocols and configs are @@ -101,10 +101,7 @@ type ProtocolManager struct { chainDb ethdb.Database odr *LesOdr server *LesServer - - topicDisc *discv5.Network - lesTopic discv5.Topic - p2pServer *p2p.Server + serverPool *serverPool downloader *downloader.Downloader fetcher *lightFetcher @@ -157,13 +154,29 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + var entry *poolEntry peer := manager.newPeer(int(version), networkId, p, rw) + if manager.serverPool != nil { + addr := p.RemoteAddr().(*net.TCPAddr) + entry = manager.serverPool.connect(peer, addr.IP, uint16(addr.Port)) + if entry == nil { + return fmt.Errorf("unwanted connection") + } + } + peer.poolEntry = entry select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() - return manager.handle(peer) + err := manager.handle(peer) + if entry != nil { + manager.serverPool.disconnect(entry) + } + return err case <-manager.quitSync: + if entry != nil { + manager.serverPool.disconnect(entry) + } return p2p.DiscQuitting } }, @@ -192,7 +205,6 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, blockchain.HasHeader, nil, blockchain.GetHeaderByHash, nil, blockchain.CurrentHeader, nil, nil, nil, blockchain.GetTdByHash, blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) - manager.fetcher = newLightFetcher(manager) } if odr != nil { @@ -222,10 +234,12 @@ func (pm *ProtocolManager) removePeer(id string) { glog.V(logger.Debug).Infof("LES: unregister peer %v", id) if pm.lightSync { pm.downloader.UnregisterPeer(id) - pm.odr.UnregisterPeer(peer) if pm.txrelay != nil { pm.txrelay.removePeer(id) } + if pm.fetcher != nil { + pm.fetcher.removePeer(peer) + } } if err := pm.peers.Unregister(id); err != nil { glog.V(logger.Error).Infoln("Removal failed:", err) @@ -236,54 +250,26 @@ func (pm *ProtocolManager) removePeer(id string) { } } -func (pm *ProtocolManager) findServers() { - if pm.p2pServer == nil || pm.topicDisc == nil { - return - } - glog.V(logger.Debug).Infoln("Looking for topic", string(pm.lesTopic)) - enodes := make(chan string, 100) - stop := make(chan struct{}) - go pm.topicDisc.SearchTopic(pm.lesTopic, stop, enodes) - go func() { - added := make(map[string]bool) - for { - select { - case enode := <-enodes: - if !added[enode] { - glog.V(logger.Info).Infoln("Found LES server:", enode) - added[enode] = true - if node, err := discover.ParseNode(enode); err == nil { - pm.p2pServer.AddPeer(node) - } - } - case <-stop: - return - } - } - }() - select { - case <-time.After(time.Second * 20): - case <-pm.quitSync: - } - close(stop) -} - func (pm *ProtocolManager) Start(srvr *p2p.Server) { - pm.p2pServer = srvr + var topicDisc *discv5.Network if srvr != nil { - pm.topicDisc = srvr.DiscV5 + topicDisc = srvr.DiscV5 } - pm.lesTopic = discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) + lesTopic := discv5.Topic("LES@" + common.Bytes2Hex(pm.blockchain.Genesis().Hash().Bytes()[0:8])) if pm.lightSync { // start sync handler - go pm.findServers() + if srvr != nil { // srvr is nil during testing + pm.serverPool = newServerPool(pm.chainDb, []byte("serverPool/"), srvr, lesTopic, pm.quitSync, &pm.wg) + pm.odr.serverPool = pm.serverPool + pm.fetcher = newLightFetcher(pm) + } go pm.syncer() } else { - if pm.topicDisc != nil { + if topicDisc != nil { go func() { - glog.V(logger.Debug).Infoln("Starting registering topic", string(pm.lesTopic)) - pm.topicDisc.RegisterTopic(pm.lesTopic, pm.quitSync) - glog.V(logger.Debug).Infoln("Stopped registering topic", string(pm.lesTopic)) + glog.V(logger.Info).Infoln("Starting registering topic", string(lesTopic)) + topicDisc.RegisterTopic(lesTopic, pm.quitSync) + glog.V(logger.Info).Infoln("Stopped registering topic", string(lesTopic)) }() } go func() { @@ -352,13 +338,13 @@ func (pm *ProtocolManager) handle(p *peer) error { glog.V(logger.Debug).Infof("LES: register peer %v", p.id) if pm.lightSync { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { - reqID := pm.odr.getNextReqID() + reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) @@ -367,12 +353,21 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { return err } - pm.odr.RegisterPeer(p) if pm.txrelay != nil { pm.txrelay.addPeer(p) } - pm.fetcher.notify(p, nil) + p.lock.Lock() + head := p.headInfo + p.lock.Unlock() + if pm.fetcher != nil { + pm.fetcher.addPeer(p) + pm.fetcher.announce(p, head) + } + + if p.poolEntry != nil { + pm.serverPool.registered(p.poolEntry) + } } stop := make(chan struct{}) @@ -454,7 +449,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "%v: %v", msg, err) } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) - pm.fetcher.notify(p, &req) + if pm.fetcher != nil { + go pm.fetcher.announce(p, &req) + } case GetBlockHeadersMsg: glog.V(logger.Debug).Infof("<=== GetBlockHeadersMsg from peer %v", p.id) @@ -552,8 +549,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.GotReply(resp.ReqID, resp.BV) - if pm.fetcher.requestedID(resp.ReqID) { - pm.fetcher.deliverHeaders(resp.ReqID, resp.Headers) + if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) { + pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { err := pm.downloader.DeliverHeaders(p.id, resp.Headers) if err != nil { diff --git a/les/helper_test.go b/les/helper_test.go index a5428e954..2df3faab2 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -25,6 +25,7 @@ import ( "math/big" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -334,3 +335,13 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu func (p *testPeer) close() { p.app.Close() } + +type testServerPool peer + +func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer { + return (*peer)(p) +} + +func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { + +} diff --git a/les/odr.go b/les/odr.go index 444b1da2a..8878508c4 100644 --- a/les/odr.go +++ b/les/odr.go @@ -17,6 +17,8 @@ package les import ( + "crypto/rand" + "encoding/binary" "sync" "time" @@ -37,6 +39,11 @@ var ( // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) +type odrPeerSelector interface { + selectPeer(func(*peer) (bool, uint64)) *peer + adjustResponseTime(*poolEntry, time.Duration, bool) +} + type LesOdr struct { light.OdrBackend db ethdb.Database @@ -44,15 +51,13 @@ type LesOdr struct { removePeer peerDropFn mlock, clock sync.Mutex sentReqs map[uint64]*sentReq - peers *odrPeerSet - lastReqID uint64 + serverPool odrPeerSelector } func NewLesOdr(db ethdb.Database) *LesOdr { return &LesOdr{ db: db, stop: make(chan struct{}), - peers: newOdrPeerSet(), sentReqs: make(map[uint64]*sentReq), } } @@ -77,16 +82,6 @@ type sentReq struct { answered chan struct{} // closed and set to nil when any peer answers it } -// RegisterPeer registers a new LES peer to the ODR capable peer set -func (self *LesOdr) RegisterPeer(p *peer) error { - return self.peers.register(p) -} - -// UnregisterPeer removes a peer from the ODR capable peer set -func (self *LesOdr) UnregisterPeer(p *peer) { - self.peers.unregister(p) -} - const ( MsgBlockBodies = iota MsgCode @@ -142,29 +137,26 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateTimeout(peer, false) - self.peers.updateServTime(peer, servTime) + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), false) + } return case <-time.After(softRequestTimeout): close(timeout) - if self.peers.updateTimeout(peer, true) { - self.removePeer(peer.id) - } case <-self.stop: return } select { case <-delivered: - servTime := uint64(mclock.Now() - stime) - self.peers.updateServTime(peer, servTime) - return case <-time.After(hardRequestTimeout): - self.removePeer(peer.id) + go self.removePeer(peer.id) case <-self.stop: return } + if self.serverPool != nil { + self.serverPool.adjustResponseTime(peer.poolEntry, time.Duration(mclock.Now()-stime), true) + } } // networkRequest sends a request to known peers until an answer is received @@ -176,7 +168,7 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro sentTo: make(map[*peer]chan struct{}), answered: answered, // reply delivered by any peer } - reqID := self.getNextReqID() + reqID := getNextReqID() self.mlock.Lock() self.sentReqs[reqID] = req self.mlock.Unlock() @@ -193,7 +185,16 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro exclude := make(map[*peer]struct{}) for { - if peer := self.peers.bestPeer(lreq, exclude); peer == nil { + var p *peer + if self.serverPool != nil { + p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) { + if !lreq.CanSend(p) { + return false, 0 + } + return true, p.fcServer.CanSend(lreq.GetCost(p)) + }) + } + if p == nil { select { case <-ctx.Done(): return ctx.Err() @@ -202,17 +203,17 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro case <-time.After(retryPeers): } } else { - exclude[peer] = struct{}{} + exclude[p] = struct{}{} delivered := make(chan struct{}) timeout := make(chan struct{}) req.lock.Lock() - req.sentTo[peer] = delivered + req.sentTo[p] = delivered req.lock.Unlock() reqWg.Add(1) - cost := lreq.GetCost(peer) - peer.fcServer.SendRequest(reqID, cost) - go self.requestPeer(req, peer, delivered, timeout, reqWg) - lreq.Request(reqID, peer) + cost := lreq.GetCost(p) + p.fcServer.SendRequest(reqID, cost) + go self.requestPeer(req, p, delivered, timeout, reqWg) + lreq.Request(reqID, p) select { case <-ctx.Done(): @@ -239,10 +240,8 @@ func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err err return } -func (self *LesOdr) getNextReqID() uint64 { - self.clock.Lock() - defer self.clock.Unlock() - - self.lastReqID++ - return self.lastReqID +func getNextReqID() uint64 { + var rnd [8]byte + rand.Read(rnd[:]) + return binary.BigEndian.Uint64(rnd[:]) } diff --git a/les/odr_peerset.go b/les/odr_peerset.go deleted file mode 100644 index e9b7eec7f..000000000 --- a/les/odr_peerset.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "sync" -) - -const dropTimeoutRatio = 20 - -type odrPeerInfo struct { - reqTimeSum, reqTimeCnt, reqCnt, timeoutCnt uint64 -} - -// odrPeerSet represents the collection of active peer participating in the block -// download procedure. -type odrPeerSet struct { - peers map[*peer]*odrPeerInfo - lock sync.RWMutex -} - -// newPeerSet creates a new peer set top track the active download sources. -func newOdrPeerSet() *odrPeerSet { - return &odrPeerSet{ - peers: make(map[*peer]*odrPeerInfo), - } -} - -// Register injects a new peer into the working set, or returns an error if the -// peer is already known. -func (ps *odrPeerSet) register(p *peer) error { - ps.lock.Lock() - defer ps.lock.Unlock() - - if _, ok := ps.peers[p]; ok { - return errAlreadyRegistered - } - ps.peers[p] = &odrPeerInfo{} - return nil -} - -// Unregister removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. -func (ps *odrPeerSet) unregister(p *peer) error { - ps.lock.Lock() - defer ps.lock.Unlock() - - if _, ok := ps.peers[p]; !ok { - return errNotRegistered - } - delete(ps.peers, p) - return nil -} - -func (ps *odrPeerSet) peerPriority(p *peer, info *odrPeerInfo, req LesOdrRequest) uint64 { - tm := p.fcServer.CanSend(req.GetCost(p)) - if info.reqTimeCnt > 0 { - tm += info.reqTimeSum / info.reqTimeCnt - } - return tm -} - -func (ps *odrPeerSet) bestPeer(req LesOdrRequest, exclude map[*peer]struct{}) *peer { - var best *peer - var bpv uint64 - ps.lock.Lock() - defer ps.lock.Unlock() - - for p, info := range ps.peers { - if _, ok := exclude[p]; !ok { - pv := ps.peerPriority(p, info, req) - if best == nil || pv < bpv { - best = p - bpv = pv - } - } - } - return best -} - -func (ps *odrPeerSet) updateTimeout(p *peer, timeout bool) (drop bool) { - ps.lock.Lock() - defer ps.lock.Unlock() - - if info, ok := ps.peers[p]; ok { - info.reqCnt++ - if timeout { - // check ratio before increase to allow an extra timeout - if info.timeoutCnt*dropTimeoutRatio >= info.reqCnt { - return true - } - info.timeoutCnt++ - } - } - return false -} - -func (ps *odrPeerSet) updateServTime(p *peer, servTime uint64) { - ps.lock.Lock() - defer ps.lock.Unlock() - - if info, ok := ps.peers[p]; ok { - info.reqTimeSum += servTime - info.reqTimeCnt++ - } -} diff --git a/les/odr_requests.go b/les/odr_requests.go index f4bd51888..a4fbd79f6 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -36,6 +36,7 @@ import ( type LesOdrRequest interface { GetCost(*peer) uint64 + CanSend(*peer) bool Request(uint64, *peer) error Valid(ethdb.Database, *Msg) bool // if true, keeps the retrieved object } @@ -66,6 +67,11 @@ func (self *BlockRequest) GetCost(peer *peer) uint64 { return peer.GetRequestCost(GetBlockBodiesMsg, 1) } +// CanSend tells if a certain peer is suitable for serving the given request +func (self *BlockRequest) CanSend(peer *peer) bool { + return peer.HasBlock(self.Hash, self.Number) +} + // Request sends an ODR request to the LES network (implementation of LesOdrRequest) func (self *BlockRequest) Request(reqID uint64, peer *peer) error { glog.V(logger.Debug).Infof("ODR: requesting body of block %08x from peer %v", self.Hash[:4], peer.id) @@ -121,6 +127,11 @@ func (self *ReceiptsRequest) GetCost(peer *peer) uint64 { return peer.GetRequestCost(GetReceiptsMsg, 1) } +// CanSend tells if a certain peer is suitable for serving the given request +func (self *ReceiptsRequest) CanSend(peer *peer) bool { + return peer.HasBlock(self.Hash, self.Number) +} + // Request sends an ODR request to the LES network (implementation of LesOdrRequest) func (self *ReceiptsRequest) Request(reqID uint64, peer *peer) error { glog.V(logger.Debug).Infof("ODR: requesting receipts for block %08x from peer %v", self.Hash[:4], peer.id) @@ -171,6 +182,11 @@ func (self *TrieRequest) GetCost(peer *peer) uint64 { return peer.GetRequestCost(GetProofsMsg, 1) } +// CanSend tells if a certain peer is suitable for serving the given request +func (self *TrieRequest) CanSend(peer *peer) bool { + return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber) +} + // Request sends an ODR request to the LES network (implementation of LesOdrRequest) func (self *TrieRequest) Request(reqID uint64, peer *peer) error { glog.V(logger.Debug).Infof("ODR: requesting trie root %08x key %08x from peer %v", self.Id.Root[:4], self.Key[:4], peer.id) @@ -221,6 +237,11 @@ func (self *CodeRequest) GetCost(peer *peer) uint64 { return peer.GetRequestCost(GetCodeMsg, 1) } +// CanSend tells if a certain peer is suitable for serving the given request +func (self *CodeRequest) CanSend(peer *peer) bool { + return peer.HasBlock(self.Id.BlockHash, self.Id.BlockNumber) +} + // Request sends an ODR request to the LES network (implementation of LesOdrRequest) func (self *CodeRequest) Request(reqID uint64, peer *peer) error { glog.V(logger.Debug).Infof("ODR: requesting node data for hash %08x from peer %v", self.Hash[:4], peer.id) @@ -274,6 +295,14 @@ func (self *ChtRequest) GetCost(peer *peer) uint64 { return peer.GetRequestCost(GetHeaderProofsMsg, 1) } +// CanSend tells if a certain peer is suitable for serving the given request +func (self *ChtRequest) CanSend(peer *peer) bool { + peer.lock.RLock() + defer peer.lock.RUnlock() + + return self.ChtNum <= (peer.headInfo.Number-light.ChtConfirmations)/light.ChtFrequency +} + // Request sends an ODR request to the LES network (implementation of LesOdrRequest) func (self *ChtRequest) Request(reqID uint64, peer *peer) error { glog.V(logger.Debug).Infof("ODR: requesting CHT #%d block #%d from peer %v", self.ChtNum, self.BlockNum, peer.id) diff --git a/les/odr_test.go b/les/odr_test.go index 27e3b283d..685435996 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -160,6 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) + pool := (*testServerPool)(lpeer) + odr.serverPool = pool select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -188,13 +190,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { } // temporarily remove peer to test odr fails - odr.UnregisterPeer(lpeer) + odr.serverPool = nil // expect retrievals to fail (except genesis block) without a les peer test(expFail) - odr.RegisterPeer(lpeer) + odr.serverPool = pool // expect all retrievals to pass test(5) - odr.UnregisterPeer(lpeer) + odr.serverPool = nil // still expect all retrievals to pass, now data should be cached locally test(5) } diff --git a/les/peer.go b/les/peer.go index 5d566d899..0a8db4975 100644 --- a/les/peer.go +++ b/les/peer.go @@ -51,12 +51,14 @@ type peer struct { id string - firstHeadInfo, headInfo *announceData - headInfoLen int - lock sync.RWMutex + headInfo *announceData + lock sync.RWMutex announceChn chan announceData + poolEntry *poolEntry + hasBlock func(common.Hash, uint64) bool + fcClient *flowcontrol.ClientNode // nil if the peer is server only fcServer *flowcontrol.ServerNode // nil if the peer is client only fcServerParams *flowcontrol.ServerParams @@ -109,67 +111,6 @@ func (p *peer) headBlockInfo() blockInfo { return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td} } -func (p *peer) addNotify(announce *announceData) bool { - p.lock.Lock() - defer p.lock.Unlock() - - if announce.Td.Cmp(p.headInfo.Td) < 1 { - return false - } - if p.headInfoLen >= maxHeadInfoLen { - //return false - p.firstHeadInfo = p.firstHeadInfo.next - p.headInfoLen-- - } - if announce.haveHeaders == 0 { - hh := p.headInfo.Number - announce.ReorgDepth - if p.headInfo.haveHeaders < hh { - hh = p.headInfo.haveHeaders - } - announce.haveHeaders = hh - } - p.headInfo.next = announce - p.headInfo = announce - p.headInfoLen++ - return true -} - -func (p *peer) gotHeader(hash common.Hash, number uint64, td *big.Int) bool { - h := p.firstHeadInfo - ptr := 0 - for h != nil { - if h.Hash == hash { - if h.Number != number || h.Td.Cmp(td) != 0 { - return false - } - h.headKnown = true - h.haveHeaders = h.Number - p.firstHeadInfo = h - p.headInfoLen -= ptr - last := h - h = h.next - // propagate haveHeaders through the chain - for h != nil { - hh := last.Number - h.ReorgDepth - if last.haveHeaders < hh { - hh = last.haveHeaders - } - if hh > h.haveHeaders { - h.haveHeaders = hh - } else { - return true - } - last = h - h = h.next - } - return true - } - h = h.next - ptr++ - } - return true -} - // Td retrieves the current total difficulty of a peer. func (p *peer) Td() *big.Int { p.lock.RLock() @@ -195,6 +136,9 @@ func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) } func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) if cost > p.fcServerParams.BufLimit { cost = p.fcServerParams.BufLimit @@ -202,6 +146,14 @@ func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { return cost } +// HasBlock checks if the peer has a given block +func (p *peer) HasBlock(hash common.Hash, number uint64) bool { + p.lock.RLock() + hashBlock := p.hasBlock + p.lock.RUnlock() + return hashBlock != nil && hashBlock(hash, number) +} + // SendAnnounce announces the availability of a number of blocks through // a hash notification. func (p *peer) SendAnnounce(request announceData) error { @@ -453,9 +405,7 @@ func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis p.fcCosts = MRC.decode() } - p.firstHeadInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} - p.headInfo = p.firstHeadInfo - p.headInfoLen = 1 + p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} return nil } diff --git a/les/randselect.go b/les/randselect.go new file mode 100644 index 000000000..1a9d0695b --- /dev/null +++ b/les/randselect.go @@ -0,0 +1,173 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "math/rand" +) + +// wrsItem interface should be implemented by any entries that are to be selected from +// a weightedRandomSelect set. Note that recalculating monotonously decreasing item +// weights on-demand (without constantly calling update) is allowed +type wrsItem interface { + Weight() int64 +} + +// weightedRandomSelect is capable of weighted random selection from a set of items +type weightedRandomSelect struct { + root *wrsNode + idx map[wrsItem]int +} + +// newWeightedRandomSelect returns a new weightedRandomSelect structure +func newWeightedRandomSelect() *weightedRandomSelect { + return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} +} + +// update updates an item's weight, adds it if it was non-existent or removes it if +// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. +func (w *weightedRandomSelect) update(item wrsItem) { + w.setWeight(item, item.Weight()) +} + +// remove removes an item from the set +func (w *weightedRandomSelect) remove(item wrsItem) { + w.setWeight(item, 0) +} + +// setWeight sets an item's weight to a specific value (removes it if zero) +func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { + idx, ok := w.idx[item] + if ok { + w.root.setWeight(idx, weight) + if weight == 0 { + delete(w.idx, item) + } + } else { + if weight != 0 { + if w.root.itemCnt == w.root.maxItems { + // add a new level + newRoot := &wrsNode{sumWeight: w.root.sumWeight, itemCnt: w.root.itemCnt, level: w.root.level + 1, maxItems: w.root.maxItems * wrsBranches} + newRoot.items[0] = w.root + newRoot.weights[0] = w.root.sumWeight + w.root = newRoot + } + w.idx[item] = w.root.insert(item, weight) + } + } +} + +// choose randomly selects an item from the set, with a chance proportional to its +// current weight. If the weight of the chosen element has been decreased since the +// last stored value, returns it with a newWeight/oldWeight chance, otherwise just +// updates its weight and selects another one +func (w *weightedRandomSelect) choose() wrsItem { + for { + if w.root.sumWeight == 0 { + return nil + } + val := rand.Int63n(w.root.sumWeight) + choice, lastWeight := w.root.choose(val) + weight := choice.Weight() + if weight != lastWeight { + w.setWeight(choice, weight) + } + if weight >= lastWeight || rand.Int63n(lastWeight) < weight { + return choice + } + } +} + +const wrsBranches = 8 // max number of branches in the wrsNode tree + +// wrsNode is a node of a tree structure that can store wrsItems or further wrsNodes. +type wrsNode struct { + items [wrsBranches]interface{} + weights [wrsBranches]int64 + sumWeight int64 + level, itemCnt, maxItems int +} + +// insert recursively inserts a new item to the tree and returns the item index +func (n *wrsNode) insert(item wrsItem, weight int64) int { + branch := 0 + for n.items[branch] != nil && (n.level == 0 || n.items[branch].(*wrsNode).itemCnt == n.items[branch].(*wrsNode).maxItems) { + branch++ + if branch == wrsBranches { + panic(nil) + } + } + n.itemCnt++ + n.sumWeight += weight + n.weights[branch] += weight + if n.level == 0 { + n.items[branch] = item + return branch + } else { + var subNode *wrsNode + if n.items[branch] == nil { + subNode = &wrsNode{maxItems: n.maxItems / wrsBranches, level: n.level - 1} + n.items[branch] = subNode + } else { + subNode = n.items[branch].(*wrsNode) + } + subIdx := subNode.insert(item, weight) + return subNode.maxItems*branch + subIdx + } +} + +// setWeight updates the weight of a certain item (which should exist) and returns +// the change of the last weight value stored in the tree +func (n *wrsNode) setWeight(idx int, weight int64) int64 { + if n.level == 0 { + oldWeight := n.weights[idx] + n.weights[idx] = weight + diff := weight - oldWeight + n.sumWeight += diff + if weight == 0 { + n.items[idx] = nil + n.itemCnt-- + } + return diff + } + branchItems := n.maxItems / wrsBranches + branch := idx / branchItems + diff := n.items[branch].(*wrsNode).setWeight(idx-branch*branchItems, weight) + n.weights[branch] += diff + n.sumWeight += diff + if weight == 0 { + n.itemCnt-- + } + return diff +} + +// choose recursively selects an item from the tree and returns it along with its weight +func (n *wrsNode) choose(val int64) (wrsItem, int64) { + for i, w := range n.weights { + if val < w { + if n.level == 0 { + return n.items[i].(wrsItem), n.weights[i] + } else { + return n.items[i].(*wrsNode).choose(val) + } + } else { + val -= w + } + } + panic(nil) +} diff --git a/les/randselect_test.go b/les/randselect_test.go new file mode 100644 index 000000000..f3c34305e --- /dev/null +++ b/les/randselect_test.go @@ -0,0 +1,67 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package les + +import ( + "math/rand" + "testing" +) + +type testWrsItem struct { + idx int + widx *int +} + +func (t *testWrsItem) Weight() int64 { + w := *t.widx + if w == -1 || w == t.idx { + return int64(t.idx + 1) + } + return 0 +} + +func TestWeightedRandomSelect(t *testing.T) { + testFn := func(cnt int) { + s := newWeightedRandomSelect() + w := -1 + list := make([]testWrsItem, cnt) + for i, _ := range list { + list[i] = testWrsItem{idx: i, widx: &w} + s.update(&list[i]) + } + w = rand.Intn(cnt) + c := s.choose() + if c == nil { + t.Errorf("expected item, got nil") + } else { + if c.(*testWrsItem).idx != w { + t.Errorf("expected another item") + } + } + w = -2 + if s.choose() != nil { + t.Errorf("expected nil, got item") + } + } + testFn(1) + testFn(10) + testFn(100) + testFn(1000) + testFn(10000) + testFn(100000) + testFn(1000000) +} diff --git a/les/request_test.go b/les/request_test.go index a6fbb06ce..03b946771 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -71,6 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) + pool := (*testServerPool)(lpeer) + odr.serverPool = pool select { case <-time.After(time.Millisecond * 100): case err := <-err1: @@ -100,11 +102,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { } // temporarily remove peer to test odr fails - odr.UnregisterPeer(lpeer) + odr.serverPool = nil // expect retrievals to fail (except genesis block) without a les peer test(0) - odr.RegisterPeer(lpeer) + odr.serverPool = pool // expect all retrievals to pass test(5) - odr.UnregisterPeer(lpeer) } diff --git a/les/server.go b/les/server.go index c763e8c63..e55616a44 100644 --- a/les/server.go +++ b/les/server.go @@ -42,6 +42,9 @@ type LesServer struct { fcManager *flowcontrol.ClientManager // nil if our node is client only fcCostStats *requestCostStats defParams *flowcontrol.ServerParams + srvr *p2p.Server + synced, stopped bool + lock sync.Mutex } func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { @@ -67,12 +70,35 @@ func (s *LesServer) Protocols() []p2p.Protocol { return s.protocolManager.SubProtocols } +// Start only starts the actual service if the ETH protocol has already been synced, +// otherwise it will be started by Synced() func (s *LesServer) Start(srvr *p2p.Server) { - s.protocolManager.Start(srvr) + s.lock.Lock() + defer s.lock.Unlock() + + s.srvr = srvr + if s.synced { + s.protocolManager.Start(s.srvr) + } +} + +// Synced notifies the server that the ETH protocol has been synced and LES service can be started +func (s *LesServer) Synced() { + s.lock.Lock() + defer s.lock.Unlock() + s.synced = true + if s.srvr != nil && !s.stopped { + s.protocolManager.Start(s.srvr) + } } +// Stop stops the LES service func (s *LesServer) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + s.stopped = true s.fcCostStats.store() s.fcManager.Stop() go func() { @@ -323,9 +349,8 @@ func (pm *ProtocolManager) blockLoop() { } var ( - lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian) - chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash - chtConfirmations = light.ChtFrequency / 2 + lastChtKey = []byte("LastChtNumber") // chtNum (uint64 big endian) + chtPrefix = []byte("cht") // chtPrefix + chtNum (uint64 big endian) -> trie root hash ) func getChtRoot(db ethdb.Database, num uint64) common.Hash { @@ -346,8 +371,8 @@ func makeCht(db ethdb.Database) bool { headNum := core.GetBlockNumber(db, headHash) var newChtNum uint64 - if headNum > chtConfirmations { - newChtNum = (headNum - chtConfirmations) / light.ChtFrequency + if headNum > light.ChtConfirmations { + newChtNum = (headNum - light.ChtConfirmations) / light.ChtFrequency } var lastChtNum uint64 diff --git a/les/serverpool.go b/les/serverpool.go new file mode 100644 index 000000000..f5e880460 --- /dev/null +++ b/les/serverpool.go @@ -0,0 +1,766 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "io" + "math" + "math/rand" + "net" + "strconv" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + // After a connection has been ended or timed out, there is a waiting period + // before it can be selected for connection again. + // waiting period = base delay * (1 + random(1)) + // base delay = shortRetryDelay for the first shortRetryCnt times after a + // successful connection, after that longRetryDelay is applied + shortRetryCnt = 5 + shortRetryDelay = time.Second * 5 + longRetryDelay = time.Minute * 10 + // maxNewEntries is the maximum number of newly discovered (never connected) nodes. + // If the limit is reached, the least recently discovered one is thrown out. + maxNewEntries = 1000 + // maxKnownEntries is the maximum number of known (already connected) nodes. + // If the limit is reached, the least recently connected one is thrown out. + // (not that unlike new entries, known entries are persistent) + maxKnownEntries = 1000 + // target for simultaneously connected servers + targetServerCount = 5 + // target for servers selected from the known table + // (we leave room for trying new ones if there is any) + targetKnownSelect = 3 + // after dialTimeout, consider the server unavailable and adjust statistics + dialTimeout = time.Second * 30 + // targetConnTime is the minimum expected connection duration before a server + // drops a client without any specific reason + targetConnTime = time.Minute * 10 + // new entry selection weight calculation based on most recent discovery time: + // unity until discoverExpireStart, then exponential decay with discoverExpireConst + discoverExpireStart = time.Minute * 20 + discoverExpireConst = time.Minute * 20 + // known entry selection weight is dropped by a factor of exp(-failDropLn) after + // each unsuccessful connection (restored after a successful one) + failDropLn = 0.1 + // known node connection success and quality statistics have a long term average + // and a short term value which is adjusted exponentially with a factor of + // pstatRecentAdjust with each dial/connection and also returned exponentially + // to the average with the time constant pstatReturnToMeanTC + pstatRecentAdjust = 0.1 + pstatReturnToMeanTC = time.Hour + // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after + // each unsuccessful connection (restored after a successful one) + addrFailDropLn = math.Ln2 + // responseScoreTC and delayScoreTC are exponential decay time constants for + // calculating selection chances from response times and block delay times + responseScoreTC = time.Millisecond * 100 + delayScoreTC = time.Second * 5 + timeoutPow = 10 + // peerSelectMinWeight is added to calculated weights at request peer selection + // to give poorly performing peers a little chance of coming back + peerSelectMinWeight = 0.005 + // initStatsWeight is used to initialize previously unknown peers with good + // statistics to give a chance to prove themselves + initStatsWeight = 1 +) + +// serverPool implements a pool for storing and selecting newly discovered and already +// known light server nodes. It received discovered nodes, stores statistics about +// known nodes and takes care of always having enough good quality servers connected. +type serverPool struct { + db ethdb.Database + dbKey []byte + server *p2p.Server + quit chan struct{} + wg *sync.WaitGroup + connWg sync.WaitGroup + + discSetPeriod chan time.Duration + discNodes chan *discv5.Node + discLookups chan bool + + entries map[discover.NodeID]*poolEntry + lock sync.Mutex + timeout, enableRetry chan *poolEntry + adjustStats chan poolStatAdjust + + knownQueue, newQueue poolEntryQueue + knownSelect, newSelect *weightedRandomSelect + knownSelected, newSelected int + fastDiscover bool +} + +// newServerPool creates a new serverPool instance +func newServerPool(db ethdb.Database, dbPrefix []byte, server *p2p.Server, topic discv5.Topic, quit chan struct{}, wg *sync.WaitGroup) *serverPool { + pool := &serverPool{ + db: db, + dbKey: append(dbPrefix, []byte(topic)...), + server: server, + quit: quit, + wg: wg, + entries: make(map[discover.NodeID]*poolEntry), + timeout: make(chan *poolEntry, 1), + adjustStats: make(chan poolStatAdjust, 100), + enableRetry: make(chan *poolEntry, 1), + knownSelect: newWeightedRandomSelect(), + newSelect: newWeightedRandomSelect(), + fastDiscover: true, + } + pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) + pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) + wg.Add(1) + pool.loadNodes() + pool.checkDial() + + if pool.server.DiscV5 != nil { + pool.discSetPeriod = make(chan time.Duration, 1) + pool.discNodes = make(chan *discv5.Node, 100) + pool.discLookups = make(chan bool, 100) + go pool.server.DiscV5.SearchTopic(topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + } + + go pool.eventLoop() + return pool +} + +// connect should be called upon any incoming connection. If the connection has been +// dialed by the server pool recently, the appropriate pool entry is returned. +// Otherwise, the connection should be rejected. +// Note that whenever a connection has been accepted and a pool entry has been returned, +// disconnect should also always be called. +func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { + pool.lock.Lock() + defer pool.lock.Unlock() + entry := pool.entries[p.ID()] + if entry == nil { + return nil + } + glog.V(logger.Debug).Infof("connecting to %v, state: %v", p.id, entry.state) + if entry.state != psDialed { + return nil + } + pool.connWg.Add(1) + entry.peer = p + entry.state = psConnected + addr := &poolEntryAddress{ + ip: ip, + port: port, + lastSeen: mclock.Now(), + } + entry.lastConnected = addr + entry.addr = make(map[string]*poolEntryAddress) + entry.addr[addr.strKey()] = addr + entry.addrSelect = *newWeightedRandomSelect() + entry.addrSelect.update(addr) + return entry +} + +// registered should be called after a successful handshake +func (pool *serverPool) registered(entry *poolEntry) { + glog.V(logger.Debug).Infof("registered %v", entry.id.String()) + pool.lock.Lock() + defer pool.lock.Unlock() + + entry.state = psRegistered + entry.regTime = mclock.Now() + if !entry.known { + pool.newQueue.remove(entry) + entry.known = true + } + pool.knownQueue.setLatest(entry) + entry.shortRetry = shortRetryCnt +} + +// disconnect should be called when ending a connection. Service quality statistics +// can be updated optionally (not updated if no registration happened, in this case +// only connection statistics are updated, just like in case of timeout) +func (pool *serverPool) disconnect(entry *poolEntry) { + glog.V(logger.Debug).Infof("disconnected %v", entry.id.String()) + pool.lock.Lock() + defer pool.lock.Unlock() + + if entry.state == psRegistered { + connTime := mclock.Now() - entry.regTime + connAdjust := float64(connTime) / float64(targetConnTime) + if connAdjust > 1 { + connAdjust = 1 + } + stopped := false + select { + case <-pool.quit: + stopped = true + default: + } + if stopped { + entry.connectStats.add(1, connAdjust) + } else { + entry.connectStats.add(connAdjust, 1) + } + } + + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + pool.setRetryDial(entry) + pool.connWg.Done() +} + +const ( + pseBlockDelay = iota + pseResponseTime + pseResponseTimeout +) + +// poolStatAdjust records are sent to adjust peer block delay/response time statistics +type poolStatAdjust struct { + adjustType int + entry *poolEntry + time time.Duration +} + +// adjustBlockDelay adjusts the block announce delay statistics of a node +func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) { + pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time} +} + +// adjustResponseTime adjusts the request response time statistics of a node +func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) { + if timeout { + pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time} + } else { + pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time} + } +} + +type selectPeerItem struct { + peer *peer + weight int64 +} + +func (sp selectPeerItem) Weight() int64 { + return sp.weight +} + +// selectPeer selects a suitable peer for a request +func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer { + pool.lock.Lock() + defer pool.lock.Unlock() + + sel := newWeightedRandomSelect() + for _, entry := range pool.entries { + if entry.state == psRegistered { + p := entry.peer + ok, cost := canSend(p) + if ok { + w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow))) + sel.update(selectPeerItem{peer: p, weight: w}) + } + } + } + choice := sel.choose() + if choice == nil { + return nil + } + return choice.(selectPeerItem).peer +} + +// eventLoop handles pool events and mutex locking for all internal functions +func (pool *serverPool) eventLoop() { + lookupCnt := 0 + var convTime mclock.AbsTime + pool.discSetPeriod <- time.Millisecond * 100 + for { + select { + case entry := <-pool.timeout: + pool.lock.Lock() + if !entry.removed { + pool.checkDialTimeout(entry) + } + pool.lock.Unlock() + + case entry := <-pool.enableRetry: + pool.lock.Lock() + if !entry.removed { + entry.delayedRetry = false + pool.updateCheckDial(entry) + } + pool.lock.Unlock() + + case adj := <-pool.adjustStats: + pool.lock.Lock() + switch adj.adjustType { + case pseBlockDelay: + adj.entry.delayStats.add(float64(adj.time), 1) + case pseResponseTime: + adj.entry.responseStats.add(float64(adj.time), 1) + adj.entry.timeoutStats.add(0, 1) + case pseResponseTimeout: + adj.entry.timeoutStats.add(1, 1) + } + pool.lock.Unlock() + + case node := <-pool.discNodes: + pool.lock.Lock() + now := mclock.Now() + id := discover.NodeID(node.ID) + entry := pool.entries[id] + if entry == nil { + glog.V(logger.Debug).Infof("discovered %v", node.String()) + entry = &poolEntry{ + id: id, + addr: make(map[string]*poolEntryAddress), + addrSelect: *newWeightedRandomSelect(), + shortRetry: shortRetryCnt, + } + pool.entries[id] = entry + // initialize previously unknown peers with good statistics to give a chance to prove themselves + entry.connectStats.add(1, initStatsWeight) + entry.delayStats.add(0, initStatsWeight) + entry.responseStats.add(0, initStatsWeight) + entry.timeoutStats.add(0, initStatsWeight) + } + entry.lastDiscovered = now + addr := &poolEntryAddress{ + ip: node.IP, + port: node.TCP, + } + if a, ok := entry.addr[addr.strKey()]; ok { + addr = a + } else { + entry.addr[addr.strKey()] = addr + } + addr.lastSeen = now + entry.addrSelect.update(addr) + if !entry.known { + pool.newQueue.setLatest(entry) + } + pool.updateCheckDial(entry) + pool.lock.Unlock() + + case conv := <-pool.discLookups: + if conv { + if lookupCnt == 0 { + convTime = mclock.Now() + } + lookupCnt++ + if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) { + pool.fastDiscover = false + pool.discSetPeriod <- time.Minute + } + } + + case <-pool.quit: + close(pool.discSetPeriod) + pool.connWg.Wait() + pool.saveNodes() + pool.wg.Done() + return + + } + } +} + +// loadNodes loads known nodes and their statistics from the database +func (pool *serverPool) loadNodes() { + enc, err := pool.db.Get(pool.dbKey) + if err != nil { + return + } + var list []*poolEntry + err = rlp.DecodeBytes(enc, &list) + if err != nil { + glog.V(logger.Debug).Infof("node list decode error: %v", err) + return + } + for _, e := range list { + glog.V(logger.Debug).Infof("loaded server stats %016x fails: %v connStats: %v / %v delayStats: %v / %v responseStats: %v / %v timeoutStats: %v / %v", e.id[0:8], e.lastConnected.fails, e.connectStats.avg, e.connectStats.weight, time.Duration(e.delayStats.avg), e.delayStats.weight, time.Duration(e.responseStats.avg), e.responseStats.weight, e.timeoutStats.avg, e.timeoutStats.weight) + pool.entries[e.id] = e + pool.knownQueue.setLatest(e) + pool.knownSelect.update((*knownEntry)(e)) + } +} + +// saveNodes saves known nodes and their statistics into the database. Nodes are +// ordered from least to most recently connected. +func (pool *serverPool) saveNodes() { + list := make([]*poolEntry, len(pool.knownQueue.queue)) + for i, _ := range list { + list[i] = pool.knownQueue.fetchOldest() + } + enc, err := rlp.EncodeToBytes(list) + if err == nil { + pool.db.Put(pool.dbKey, enc) + } +} + +// removeEntry removes a pool entry when the entry count limit is reached. +// Note that it is called by the new/known queues from which the entry has already +// been removed so removing it from the queues is not necessary. +func (pool *serverPool) removeEntry(entry *poolEntry) { + pool.newSelect.remove((*discoveredEntry)(entry)) + pool.knownSelect.remove((*knownEntry)(entry)) + entry.removed = true + delete(pool.entries, entry.id) +} + +// setRetryDial starts the timer which will enable dialing a certain node again +func (pool *serverPool) setRetryDial(entry *poolEntry) { + delay := longRetryDelay + if entry.shortRetry > 0 { + entry.shortRetry-- + delay = shortRetryDelay + } + delay += time.Duration(rand.Int63n(int64(delay) + 1)) + entry.delayedRetry = true + go func() { + select { + case <-pool.quit: + case <-time.After(delay): + select { + case <-pool.quit: + case pool.enableRetry <- entry: + } + } + }() +} + +// updateCheckDial is called when an entry can potentially be dialed again. It updates +// its selection weights and checks if new dials can/should be made. +func (pool *serverPool) updateCheckDial(entry *poolEntry) { + pool.newSelect.update((*discoveredEntry)(entry)) + pool.knownSelect.update((*knownEntry)(entry)) + pool.checkDial() +} + +// checkDial checks if new dials can/should be made. It tries to select servers both +// based on good statistics and recent discovery. +func (pool *serverPool) checkDial() { + fillWithKnownSelects := !pool.fastDiscover + for pool.knownSelected < targetKnownSelect { + entry := pool.knownSelect.choose() + if entry == nil { + fillWithKnownSelects = false + break + } + pool.dial((*poolEntry)(entry.(*knownEntry)), true) + } + for pool.knownSelected+pool.newSelected < targetServerCount { + entry := pool.newSelect.choose() + if entry == nil { + break + } + pool.dial((*poolEntry)(entry.(*discoveredEntry)), false) + } + if fillWithKnownSelects { + // no more newly discovered nodes to select and since fast discover period + // is over, we probably won't find more in the near future so select more + // known entries if possible + for pool.knownSelected < targetServerCount { + entry := pool.knownSelect.choose() + if entry == nil { + break + } + pool.dial((*poolEntry)(entry.(*knownEntry)), true) + } + } +} + +// dial initiates a new connection +func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { + if entry.state != psNotConnected { + return + } + entry.state = psDialed + entry.knownSelected = knownSelected + if knownSelected { + pool.knownSelected++ + } else { + pool.newSelected++ + } + addr := entry.addrSelect.choose().(*poolEntryAddress) + glog.V(logger.Debug).Infof("dialing %v out of %v, known: %v", entry.id.String()+"@"+addr.strKey(), len(entry.addr), knownSelected) + entry.dialed = addr + go func() { + pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) + select { + case <-pool.quit: + case <-time.After(dialTimeout): + select { + case <-pool.quit: + case pool.timeout <- entry: + } + } + }() +} + +// checkDialTimeout checks if the node is still in dialed state and if so, resets it +// and adjusts connection statistics accordingly. +func (pool *serverPool) checkDialTimeout(entry *poolEntry) { + if entry.state != psDialed { + return + } + glog.V(logger.Debug).Infof("timeout %v", entry.id.String()+"@"+entry.dialed.strKey()) + entry.state = psNotConnected + if entry.knownSelected { + pool.knownSelected-- + } else { + pool.newSelected-- + } + entry.connectStats.add(0, 1) + entry.dialed.fails++ + pool.setRetryDial(entry) +} + +const ( + psNotConnected = iota + psDialed + psConnected + psRegistered +) + +// poolEntry represents a server node and stores its current state and statistics. +type poolEntry struct { + peer *peer + id discover.NodeID + addr map[string]*poolEntryAddress + lastConnected, dialed *poolEntryAddress + addrSelect weightedRandomSelect + + lastDiscovered mclock.AbsTime + known, knownSelected bool + connectStats, delayStats poolStats + responseStats, timeoutStats poolStats + state int + regTime mclock.AbsTime + queueIdx int + removed bool + + delayedRetry bool + shortRetry int +} + +func (e *poolEntry) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats}) +} + +func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { + var entry struct { + ID discover.NodeID + IP net.IP + Port uint16 + Fails uint + CStat, DStat, RStat, TStat poolStats + } + if err := s.Decode(&entry); err != nil { + return err + } + addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} + e.id = entry.ID + e.addr = make(map[string]*poolEntryAddress) + e.addr[addr.strKey()] = addr + e.addrSelect = *newWeightedRandomSelect() + e.addrSelect.update(addr) + e.lastConnected = addr + e.connectStats = entry.CStat + e.delayStats = entry.DStat + e.responseStats = entry.RStat + e.timeoutStats = entry.TStat + e.shortRetry = shortRetryCnt + e.known = true + return nil +} + +// discoveredEntry implements wrsItem +type discoveredEntry poolEntry + +// Weight calculates random selection weight for newly discovered entries +func (e *discoveredEntry) Weight() int64 { + if e.state != psNotConnected || e.delayedRetry { + return 0 + } + t := time.Duration(mclock.Now() - e.lastDiscovered) + if t <= discoverExpireStart { + return 1000000000 + } else { + return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst))) + } +} + +// knownEntry implements wrsItem +type knownEntry poolEntry + +// Weight calculates random selection weight for known entries +func (e *knownEntry) Weight() int64 { + if e.state != psNotConnected || !e.known || e.delayedRetry { + return 0 + } + return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow((1-e.timeoutStats.recentAvg()), timeoutPow)) +} + +// poolEntryAddress is a separate object because currently it is necessary to remember +// multiple potential network addresses for a pool entry. This will be removed after +// the final implementation of v5 discovery which will retrieve signed and serial +// numbered advertisements, making it clear which IP/port is the latest one. +type poolEntryAddress struct { + ip net.IP + port uint16 + lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db + fails uint // connection failures since last successful connection (persistent) +} + +func (a *poolEntryAddress) Weight() int64 { + t := time.Duration(mclock.Now() - a.lastSeen) + return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1 +} + +func (a *poolEntryAddress) strKey() string { + return a.ip.String() + ":" + strconv.Itoa(int(a.port)) +} + +// poolStats implement statistics for a certain quantity with a long term average +// and a short term value which is adjusted exponentially with a factor of +// pstatRecentAdjust with each update and also returned exponentially to the +// average with the time constant pstatReturnToMeanTC +type poolStats struct { + sum, weight, avg, recent float64 + lastRecalc mclock.AbsTime +} + +// init initializes stats with a long term sum/update count pair retrieved from the database +func (s *poolStats) init(sum, weight float64) { + s.sum = sum + s.weight = weight + var avg float64 + if weight > 0 { + avg = s.sum / weight + } + s.avg = avg + s.recent = avg + s.lastRecalc = mclock.Now() +} + +// recalc recalculates recent value return-to-mean and long term average +func (s *poolStats) recalc() { + now := mclock.Now() + s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) + if s.sum == 0 { + s.avg = 0 + } else { + if s.sum > s.weight*1e30 { + s.avg = 1e30 + } else { + s.avg = s.sum / s.weight + } + } + s.lastRecalc = now +} + +// add updates the stats with a new value +func (s *poolStats) add(value, weight float64) { + s.weight += weight + s.sum += value * weight + s.recalc() +} + +// recentAvg returns the short-term adjusted average +func (s *poolStats) recentAvg() float64 { + s.recalc() + return s.recent +} + +func (s *poolStats) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)}) +} + +func (s *poolStats) DecodeRLP(st *rlp.Stream) error { + var stats struct { + SumUint, WeightUint uint64 + } + if err := st.Decode(&stats); err != nil { + return err + } + s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint)) + return nil +} + +// poolEntryQueue keeps track of its least recently accessed entries and removes +// them when the number of entries reaches the limit +type poolEntryQueue struct { + queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value + newPtr, oldPtr, maxCnt int + removeFromPool func(*poolEntry) +} + +// newPoolEntryQueue returns a new poolEntryQueue +func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue { + return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool} +} + +// fetchOldest returns and removes the least recently accessed entry +func (q *poolEntryQueue) fetchOldest() *poolEntry { + if len(q.queue) == 0 { + return nil + } + for { + if e := q.queue[q.oldPtr]; e != nil { + delete(q.queue, q.oldPtr) + q.oldPtr++ + return e + } + q.oldPtr++ + } +} + +// remove removes an entry from the queue +func (q *poolEntryQueue) remove(entry *poolEntry) { + if q.queue[entry.queueIdx] == entry { + delete(q.queue, entry.queueIdx) + } +} + +// setLatest adds or updates a recently accessed entry. It also checks if an old entry +// needs to be removed and removes it from the parent pool too with a callback function. +func (q *poolEntryQueue) setLatest(entry *poolEntry) { + if q.queue[entry.queueIdx] == entry { + delete(q.queue, entry.queueIdx) + } else { + if len(q.queue) == q.maxCnt { + e := q.fetchOldest() + q.remove(e) + q.removeFromPool(e) + } + } + entry.queueIdx = q.newPtr + q.queue[entry.queueIdx] = entry + q.newPtr++ +} diff --git a/light/lightchain.go b/light/lightchain.go index 1cea7a892..d397f5006 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -505,3 +505,14 @@ func (self *LightChain) SyncCht(ctx context.Context) bool { } return false } + +// LockChain locks the chain mutex for reading so that multiple canonical hashes can be +// retrieved while it is guaranteed that they belong to the same version of the chain +func (self *LightChain) LockChain() { + self.chainmu.RLock() +} + +// UnlockChain unlocks the chain mutex +func (self *LightChain) UnlockChain() { + self.chainmu.RUnlock() +} diff --git a/light/odr.go b/light/odr.go index 679569bf9..4f6ef6b9e 100644 --- a/light/odr.go +++ b/light/odr.go @@ -48,6 +48,7 @@ type OdrRequest interface { // TrieID identifies a state or account storage trie type TrieID struct { BlockHash, Root common.Hash + BlockNumber uint64 AccKey []byte } @@ -55,9 +56,10 @@ type TrieID struct { // header. func StateTrieID(header *types.Header) *TrieID { return &TrieID{ - BlockHash: header.Hash(), - AccKey: nil, - Root: header.Root, + BlockHash: header.Hash(), + BlockNumber: header.Number.Uint64(), + AccKey: nil, + Root: header.Root, } } @@ -66,9 +68,10 @@ func StateTrieID(header *types.Header) *TrieID { // checking Merkle proofs. func StorageTrieID(state *TrieID, addr common.Address, root common.Hash) *TrieID { return &TrieID{ - BlockHash: state.BlockHash, - AccKey: crypto.Keccak256(addr[:]), - Root: root, + BlockHash: state.BlockHash, + BlockNumber: state.BlockNumber, + AccKey: crypto.Keccak256(addr[:]), + Root: root, } } diff --git a/light/odr_util.go b/light/odr_util.go index 5c72f90e9..761711621 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -38,8 +38,9 @@ var ( ErrNoTrustedCht = errors.New("No trusted canonical hash trie") ErrNoHeader = errors.New("Header not found") - ChtFrequency = uint64(4096) - trustedChtKey = []byte("TrustedCHT") + ChtFrequency = uint64(4096) + ChtConfirmations = uint64(2048) + trustedChtKey = []byte("TrustedCHT") ) type ChtNode struct { diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index d1c48904e..1348c6774 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -126,8 +126,15 @@ type topicRegisterReq struct { } type topicSearchReq struct { - topic Topic - found chan<- string + topic Topic + found chan<- *Node + lookup chan<- bool + delay time.Duration +} + +type topicSearchResult struct { + target lookupInfo + nodes []*Node } type timeoutEvent struct { @@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node { break } // Wait for the next reply. - for _, n := range <-reply { - if n != nil && !seen[n.ID] { - seen[n.ID] = true - result.push(n, bucketSize) - if stopOnMatch && n.sha == target { - return result.entries + select { + case nodes := <-reply: + for _, n := range nodes { + if n != nil && !seen[n.ID] { + seen[n.ID] = true + result.push(n, bucketSize) + if stopOnMatch && n.sha == target { + return result.entries + } } } + pendingQueries-- + case <-time.After(respTimeout): + // forget all pending requests, start new ones + pendingQueries = 0 + reply = make(chan []*Node, alpha) } - pendingQueries-- } return result.entries } @@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) { } } -func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) { - select { - case net.topicSearchReq <- topicSearchReq{topic, found}: - case <-net.closed: - return - } - select { - case <-net.closed: - case <-stop: +func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) { + for { select { - case net.topicSearchReq <- topicSearchReq{topic, nil}: case <-net.closed: + return + case delay, ok := <-setPeriod: + select { + case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}: + case <-net.closed: + return + } + if !ok { + return + } } } } @@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) { // TODO: external address handling. +type topicSearchInfo struct { + lookupChn chan<- bool + period time.Duration +} + +const maxSearchCount = 5 + func (net *Network) loop() { var ( refreshTimer = time.NewTicker(autoRefreshInterval) @@ -385,10 +408,12 @@ func (net *Network) loop() { topicRegisterLookupTarget lookupInfo topicRegisterLookupDone chan []*Node topicRegisterLookupTick = time.NewTimer(0) - topicSearchLookupTarget lookupInfo searchReqWhenRefreshDone []topicSearchReq + searchInfo = make(map[Topic]topicSearchInfo) + activeSearchCount int ) - topicSearchLookupDone := make(chan []*Node, 1) + topicSearchLookupDone := make(chan topicSearchResult, 100) + topicSearch := make(chan Topic, 100) <-topicRegisterLookupTick.C statsDump := time.NewTicker(10 * time.Second) @@ -504,21 +529,52 @@ loop: case req := <-net.topicSearchReq: if refreshDone == nil { debugLog("<-net.topicSearchReq") - if req.found == nil { - net.ticketStore.removeSearchTopic(req.topic) + info, ok := searchInfo[req.topic] + if ok { + if req.delay == time.Duration(0) { + delete(searchInfo, req.topic) + net.ticketStore.removeSearchTopic(req.topic) + } else { + info.period = req.delay + searchInfo[req.topic] = info + } continue } - net.ticketStore.addSearchTopic(req.topic, req.found) - if (topicSearchLookupTarget.target == common.Hash{}) { - topicSearchLookupDone <- nil + if req.delay != time.Duration(0) { + var info topicSearchInfo + info.period = req.delay + info.lookupChn = req.lookup + searchInfo[req.topic] = info + net.ticketStore.addSearchTopic(req.topic, req.found) + topicSearch <- req.topic } } else { searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req) } - case nodes := <-topicSearchLookupDone: - debugLog("<-topicSearchLookupDone") - net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte { + case topic := <-topicSearch: + if activeSearchCount < maxSearchCount { + activeSearchCount++ + target := net.ticketStore.nextSearchLookup(topic) + go func() { + nodes := net.lookup(target.target, false) + topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes} + }() + } + period := searchInfo[topic].period + if period != time.Duration(0) { + go func() { + time.Sleep(period) + topicSearch <- topic + }() + } + + case res := <-topicSearchLookupDone: + activeSearchCount-- + if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil { + lookupChn <- net.ticketStore.radius[res.target.topic].converged + } + net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte { net.ping(n, n.addr()) return n.pingEcho }, func(n *Node, topic Topic) []byte { @@ -531,11 +587,6 @@ loop: return nil } }) - topicSearchLookupTarget = net.ticketStore.nextSearchLookup() - target := topicSearchLookupTarget.target - if (target != common.Hash{}) { - go func() { topicSearchLookupDone <- net.lookup(target, false) }() - } case <-statsDump.C: debugLog("<-statsDump.C") diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 202504314..752fdc9b4 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -138,16 +138,12 @@ type ticketStore struct { nextTicketReg mclock.AbsTime searchTopicMap map[Topic]searchTopic - searchTopicList []Topic - searchTopicPtr int nextTopicQueryCleanup mclock.AbsTime queriesSent map[*Node]map[common.Hash]sentQuery - radiusLookupCnt int } type searchTopic struct { - foundChn chan<- string - listIdx int + foundChn chan<- *Node } type sentQuery struct { @@ -183,23 +179,15 @@ func (s *ticketStore) addTopic(t Topic, register bool) { } } -func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) { +func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) { s.addTopic(t, false) if s.searchTopicMap[t].foundChn == nil { - s.searchTopicList = append(s.searchTopicList, t) - s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1} + s.searchTopicMap[t] = searchTopic{foundChn: foundChn} } } func (s *ticketStore) removeSearchTopic(t Topic) { if st := s.searchTopicMap[t]; st.foundChn != nil { - lastIdx := len(s.searchTopicList) - 1 - lastTopic := s.searchTopicList[lastIdx] - s.searchTopicList[st.listIdx] = lastTopic - sl := s.searchTopicMap[lastTopic] - sl.listIdx = st.listIdx - s.searchTopicMap[lastTopic] = sl - s.searchTopicList = s.searchTopicList[:lastIdx] delete(s.searchTopicMap, t) } } @@ -247,20 +235,13 @@ func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Durati return lookupInfo{}, 40 * time.Second } -func (s *ticketStore) nextSearchLookup() lookupInfo { - if len(s.searchTopicList) == 0 { - return lookupInfo{} - } - if s.searchTopicPtr >= len(s.searchTopicList) { - s.searchTopicPtr = 0 - } - topic := s.searchTopicList[s.searchTopicPtr] - s.searchTopicPtr++ - target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery) +func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { + tr := s.radius[topic] + target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery) if target.radiusLookup { - s.radiusLookupCnt++ + tr.radiusLookupCnt++ } else { - s.radiusLookupCnt = 0 + tr.radiusLookupCnt = 0 } return target } @@ -662,9 +643,9 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod if ip.IsUnspecified() || ip.IsLoopback() { ip = from.IP } - enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1 + n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1 select { - case chn <- enode: + case chn <- n: default: return false } @@ -677,6 +658,8 @@ type topicRadius struct { topicHashPrefix uint64 radius, minRadius uint64 buckets []topicRadiusBucket + converged bool + radiusLookupCnt int } type topicRadiusEvent int @@ -706,7 +689,7 @@ func (b *topicRadiusBucket) update(now mclock.AbsTime) { b.lastTime = now for target, tm := range b.lookupSent { - if now-tm > mclock.AbsTime(pingTimeout) { + if now-tm > mclock.AbsTime(respTimeout) { b.weights[trNoAdjust] += 1 delete(b.lookupSent, target) } @@ -906,6 +889,7 @@ func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) { if radiusLookup == -1 { // no more radius lookups needed at the moment, return a radius + r.converged = true rad := maxBucket if minRadBucket < rad { rad = minRadBucket |