diff options
Diffstat (limited to 'p2p/dial.go')
-rw-r--r-- | p2p/dial.go | 110 |
1 files changed, 88 insertions, 22 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index bdc9f852c..c0e703d7d 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -36,6 +36,10 @@ const ( // Discovery lookups are throttled and can only run // once every few seconds. lookupInterval = 4 * time.Second + + // Endpoint resolution is throttled with bounded backoff. + initialResolveDelay = 60 * time.Second + maxResolveDelay = time.Hour ) // dialstate schedules dials and discovery lookups. @@ -46,17 +50,17 @@ type dialstate struct { ntab discoverTable lookupRunning bool - - dialing map[discover.NodeID]connFlag - lookupBuf []*discover.Node // current discovery lookup results - randomNodes []*discover.Node // filled from Table - static map[discover.NodeID]*discover.Node - hist *dialHistory + dialing map[discover.NodeID]connFlag + lookupBuf []*discover.Node // current discovery lookup results + randomNodes []*discover.Node // filled from Table + static map[discover.NodeID]*dialTask + hist *dialHistory } type discoverTable interface { Self() *discover.Node Close() + Resolve(target discover.NodeID) *discover.Node Lookup(target discover.NodeID) []*discover.Node ReadRandomNodes([]*discover.Node) int } @@ -74,10 +78,13 @@ type task interface { Do(*Server) } -// A dialTask is generated for each node that is dialed. +// A dialTask is generated for each node that is dialed. Its +// fields cannot be accessed while the task is running. type dialTask struct { - flags connFlag - dest *discover.Node + flags connFlag + dest *discover.Node + lastResolved time.Time + resolveDelay time.Duration } // discoverTask runs discovery table operations. @@ -97,26 +104,31 @@ func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dial s := &dialstate{ maxDynDials: maxdyn, ntab: ntab, - static: make(map[discover.NodeID]*discover.Node), + static: make(map[discover.NodeID]*dialTask), dialing: make(map[discover.NodeID]connFlag), randomNodes: make([]*discover.Node, maxdyn/2), hist: new(dialHistory), } for _, n := range static { - s.static[n.ID] = n + s.addStatic(n) } return s } func (s *dialstate) addStatic(n *discover.Node) { - s.static[n.ID] = n + // This overwites the task instead of updating an existing + // entry, giving users the opportunity to force a resolve operation. + s.static[n.ID] = &dialTask{flags: staticDialedConn, dest: n} } func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task { var newtasks []task + isDialing := func(id discover.NodeID) bool { + _, found := s.dialing[id] + return found || peers[id] != nil || s.hist.contains(id) + } addDial := func(flag connFlag, n *discover.Node) bool { - _, dialing := s.dialing[n.ID] - if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) { + if isDialing(n.ID) { return false } s.dialing[n.ID] = flag @@ -141,8 +153,11 @@ func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now s.hist.expire(now) // Create dials for static nodes if they are not connected. - for _, n := range s.static { - addDial(staticDialedConn, n) + for id, t := range s.static { + if !isDialing(id) { + s.dialing[id] = t.flags + newtasks = append(newtasks, t) + } } // Use random nodes from the table for half of the necessary @@ -194,17 +209,68 @@ func (s *dialstate) taskDone(t task, now time.Time) { } func (t *dialTask) Do(srv *Server) { - addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)} - glog.V(logger.Debug).Infof("dialing %v\n", t.dest) + if t.dest.Incomplete() { + if !t.resolve(srv) { + return + } + } + success := t.dial(srv, t.dest) + // Try resolving the ID of static nodes if dialing failed. + if !success && t.flags&staticDialedConn != 0 { + if t.resolve(srv) { + t.dial(srv, t.dest) + } + } +} + +// resolve attempts to find the current endpoint for the destination +// using discovery. +// +// Resolve operations are throttled with backoff to avoid flooding the +// discovery network with useless queries for nodes that don't exist. +// The backoff delay resets when the node is found. +func (t *dialTask) resolve(srv *Server) bool { + if srv.ntab == nil { + glog.V(logger.Debug).Infof("can't resolve node %x: discovery is disabled", t.dest.ID[:6]) + return false + } + if t.resolveDelay == 0 { + t.resolveDelay = initialResolveDelay + } + if time.Since(t.lastResolved) < t.resolveDelay { + return false + } + resolved := srv.ntab.Resolve(t.dest.ID) + t.lastResolved = time.Now() + if resolved == nil { + t.resolveDelay *= 2 + if t.resolveDelay > maxResolveDelay { + t.resolveDelay = maxResolveDelay + } + glog.V(logger.Debug).Infof("resolving node %x failed (new delay: %v)", t.dest.ID[:6], t.resolveDelay) + return false + } + // The node was found. + t.resolveDelay = initialResolveDelay + t.dest = resolved + glog.V(logger.Debug).Infof("resolved node %x: %v:%d", t.dest.ID[:6], t.dest.IP, t.dest.TCP) + return true +} + +// dial performs the actual connection attempt. +func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { + addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} + glog.V(logger.Debug).Infof("dial tcp %v (%x)\n", addr, dest.ID[:6]) fd, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { - glog.V(logger.Detail).Infof("dial error: %v", err) - return + glog.V(logger.Detail).Infof("%v", err) + return false } mfd := newMeteredConn(fd, false) - - srv.setupConn(mfd, t.flags, t.dest) + srv.setupConn(mfd, t.flags, dest) + return true } + func (t *dialTask) String() string { return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP) } |