diff options
-rw-r--r-- | p2p/discv5/net.go | 119 | ||||
-rw-r--r-- | p2p/discv5/ticket.go | 44 |
2 files changed, 99 insertions, 64 deletions
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index d1c48904e..1348c6774 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -126,8 +126,15 @@ type topicRegisterReq struct { } type topicSearchReq struct { - topic Topic - found chan<- string + topic Topic + found chan<- *Node + lookup chan<- bool + delay time.Duration +} + +type topicSearchResult struct { + target lookupInfo + nodes []*Node } type timeoutEvent struct { @@ -263,16 +270,23 @@ func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node { break } // Wait for the next reply. - for _, n := range <-reply { - if n != nil && !seen[n.ID] { - seen[n.ID] = true - result.push(n, bucketSize) - if stopOnMatch && n.sha == target { - return result.entries + select { + case nodes := <-reply: + for _, n := range nodes { + if n != nil && !seen[n.ID] { + seen[n.ID] = true + result.push(n, bucketSize) + if stopOnMatch && n.sha == target { + return result.entries + } } } + pendingQueries-- + case <-time.After(respTimeout): + // forget all pending requests, start new ones + pendingQueries = 0 + reply = make(chan []*Node, alpha) } - pendingQueries-- } return result.entries } @@ -293,18 +307,20 @@ func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) { } } -func (net *Network) SearchTopic(topic Topic, stop <-chan struct{}, found chan<- string) { - select { - case net.topicSearchReq <- topicSearchReq{topic, found}: - case <-net.closed: - return - } - select { - case <-net.closed: - case <-stop: +func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) { + for { select { - case net.topicSearchReq <- topicSearchReq{topic, nil}: case <-net.closed: + return + case delay, ok := <-setPeriod: + select { + case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}: + case <-net.closed: + return + } + if !ok { + return + } } } } @@ -347,6 +363,13 @@ func (net *Network) reqTableOp(f func()) (called bool) { // TODO: external address handling. +type topicSearchInfo struct { + lookupChn chan<- bool + period time.Duration +} + +const maxSearchCount = 5 + func (net *Network) loop() { var ( refreshTimer = time.NewTicker(autoRefreshInterval) @@ -385,10 +408,12 @@ func (net *Network) loop() { topicRegisterLookupTarget lookupInfo topicRegisterLookupDone chan []*Node topicRegisterLookupTick = time.NewTimer(0) - topicSearchLookupTarget lookupInfo searchReqWhenRefreshDone []topicSearchReq + searchInfo = make(map[Topic]topicSearchInfo) + activeSearchCount int ) - topicSearchLookupDone := make(chan []*Node, 1) + topicSearchLookupDone := make(chan topicSearchResult, 100) + topicSearch := make(chan Topic, 100) <-topicRegisterLookupTick.C statsDump := time.NewTicker(10 * time.Second) @@ -504,21 +529,52 @@ loop: case req := <-net.topicSearchReq: if refreshDone == nil { debugLog("<-net.topicSearchReq") - if req.found == nil { - net.ticketStore.removeSearchTopic(req.topic) + info, ok := searchInfo[req.topic] + if ok { + if req.delay == time.Duration(0) { + delete(searchInfo, req.topic) + net.ticketStore.removeSearchTopic(req.topic) + } else { + info.period = req.delay + searchInfo[req.topic] = info + } continue } - net.ticketStore.addSearchTopic(req.topic, req.found) - if (topicSearchLookupTarget.target == common.Hash{}) { - topicSearchLookupDone <- nil + if req.delay != time.Duration(0) { + var info topicSearchInfo + info.period = req.delay + info.lookupChn = req.lookup + searchInfo[req.topic] = info + net.ticketStore.addSearchTopic(req.topic, req.found) + topicSearch <- req.topic } } else { searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req) } - case nodes := <-topicSearchLookupDone: - debugLog("<-topicSearchLookupDone") - net.ticketStore.searchLookupDone(topicSearchLookupTarget, nodes, func(n *Node) []byte { + case topic := <-topicSearch: + if activeSearchCount < maxSearchCount { + activeSearchCount++ + target := net.ticketStore.nextSearchLookup(topic) + go func() { + nodes := net.lookup(target.target, false) + topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes} + }() + } + period := searchInfo[topic].period + if period != time.Duration(0) { + go func() { + time.Sleep(period) + topicSearch <- topic + }() + } + + case res := <-topicSearchLookupDone: + activeSearchCount-- + if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil { + lookupChn <- net.ticketStore.radius[res.target.topic].converged + } + net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte { net.ping(n, n.addr()) return n.pingEcho }, func(n *Node, topic Topic) []byte { @@ -531,11 +587,6 @@ loop: return nil } }) - topicSearchLookupTarget = net.ticketStore.nextSearchLookup() - target := topicSearchLookupTarget.target - if (target != common.Hash{}) { - go func() { topicSearchLookupDone <- net.lookup(target, false) }() - } case <-statsDump.C: debugLog("<-statsDump.C") diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 202504314..752fdc9b4 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -138,16 +138,12 @@ type ticketStore struct { nextTicketReg mclock.AbsTime searchTopicMap map[Topic]searchTopic - searchTopicList []Topic - searchTopicPtr int nextTopicQueryCleanup mclock.AbsTime queriesSent map[*Node]map[common.Hash]sentQuery - radiusLookupCnt int } type searchTopic struct { - foundChn chan<- string - listIdx int + foundChn chan<- *Node } type sentQuery struct { @@ -183,23 +179,15 @@ func (s *ticketStore) addTopic(t Topic, register bool) { } } -func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- string) { +func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) { s.addTopic(t, false) if s.searchTopicMap[t].foundChn == nil { - s.searchTopicList = append(s.searchTopicList, t) - s.searchTopicMap[t] = searchTopic{foundChn: foundChn, listIdx: len(s.searchTopicList) - 1} + s.searchTopicMap[t] = searchTopic{foundChn: foundChn} } } func (s *ticketStore) removeSearchTopic(t Topic) { if st := s.searchTopicMap[t]; st.foundChn != nil { - lastIdx := len(s.searchTopicList) - 1 - lastTopic := s.searchTopicList[lastIdx] - s.searchTopicList[st.listIdx] = lastTopic - sl := s.searchTopicMap[lastTopic] - sl.listIdx = st.listIdx - s.searchTopicMap[lastTopic] = sl - s.searchTopicList = s.searchTopicList[:lastIdx] delete(s.searchTopicMap, t) } } @@ -247,20 +235,13 @@ func (s *ticketStore) nextRegisterLookup() (lookup lookupInfo, delay time.Durati return lookupInfo{}, 40 * time.Second } -func (s *ticketStore) nextSearchLookup() lookupInfo { - if len(s.searchTopicList) == 0 { - return lookupInfo{} - } - if s.searchTopicPtr >= len(s.searchTopicList) { - s.searchTopicPtr = 0 - } - topic := s.searchTopicList[s.searchTopicPtr] - s.searchTopicPtr++ - target := s.radius[topic].nextTarget(s.radiusLookupCnt >= searchForceQuery) +func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { + tr := s.radius[topic] + target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery) if target.radiusLookup { - s.radiusLookupCnt++ + tr.radiusLookupCnt++ } else { - s.radiusLookupCnt = 0 + tr.radiusLookupCnt = 0 } return target } @@ -662,9 +643,9 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod if ip.IsUnspecified() || ip.IsLoopback() { ip = from.IP } - enode := NewNode(node.ID, ip, node.UDP-1, node.TCP-1).String() // subtract one from port while discv5 is running in test mode on UDPport+1 + n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1 select { - case chn <- enode: + case chn <- n: default: return false } @@ -677,6 +658,8 @@ type topicRadius struct { topicHashPrefix uint64 radius, minRadius uint64 buckets []topicRadiusBucket + converged bool + radiusLookupCnt int } type topicRadiusEvent int @@ -706,7 +689,7 @@ func (b *topicRadiusBucket) update(now mclock.AbsTime) { b.lastTime = now for target, tm := range b.lookupSent { - if now-tm > mclock.AbsTime(pingTimeout) { + if now-tm > mclock.AbsTime(respTimeout) { b.weights[trNoAdjust] += 1 delete(b.lookupSent, target) } @@ -906,6 +889,7 @@ func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) { if radiusLookup == -1 { // no more radius lookups needed at the moment, return a radius + r.converged = true rad := maxBucket if minRadBucket < rad { rad = minRadBucket |