From f38052c499c1fee61423efeddb1f52677f1442e9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 4 Nov 2014 13:21:44 +0100 Subject: p2p: rework protocol API --- p2p/server.go | 150 ++++++++++++++++++++++++++++++---------------------------- 1 file changed, 77 insertions(+), 73 deletions(-) (limited to 'p2p/server.go') diff --git a/p2p/server.go b/p2p/server.go index 91bc4af5c..54d2cde30 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -80,12 +80,12 @@ type Server struct { quit chan chan bool peersLock sync.RWMutex - maxPeers int - peers []*Peer - peerSlots chan int - peersTable map[string]int - peersMsg *Msg - peerCount int + maxPeers int + peers []*Peer + peerSlots chan int + peersTable map[string]int + peerCount int + cachedEncodedPeers []byte peerConnect chan net.Addr peerDisconnect chan DisconnectRequest @@ -147,27 +147,6 @@ func (self *Server) ClientIdentity() ClientIdentity { return self.identity } -func (self *Server) PeersMessage() (msg *Msg, err error) { - // TODO: memoize and reset when peers change - self.peersLock.RLock() - defer self.peersLock.RUnlock() - msg = self.peersMsg - if msg == nil { - var peerData []interface{} - for _, i := range self.peersTable { - peer := self.peers[i] - peerData = append(peerData, peer.Encode()) - } - if len(peerData) == 0 { - err = fmt.Errorf("no peers") - } else { - msg, err = NewMsg(PeersMsg, peerData...) - self.peersMsg = msg //memoize - } - } - return -} - func (self *Server) Peers() (peers []*Peer) { self.peersLock.RLock() defer self.peersLock.RUnlock() @@ -185,8 +164,6 @@ func (self *Server) PeerCount() int { return self.peerCount } -var getPeersMsg, _ = NewMsg(GetPeersMsg) - func (self *Server) PeerConnect(addr net.Addr) { // TODO: should buffer, filter and uniq // send GetPeersMsg if not blocking @@ -209,12 +186,21 @@ func (self *Server) Handlers() Handlers { return self.handlers } -func (self *Server) Broadcast(protocol string, msg *Msg) { +func (self *Server) Broadcast(protocol string, code MsgCode, data ...interface{}) { + var payload []byte + if data != nil { + payload = encodePayload(data...) + } self.peersLock.RLock() defer self.peersLock.RUnlock() for _, peer := range self.peers { if peer != nil { - peer.Write(protocol, msg) + var msg = Msg{Code: code} + if data != nil { + msg.Payload = bytes.NewReader(payload) + msg.Size = uint32(len(payload)) + } + peer.messenger.writeProtoMsg(protocol, msg) } } } @@ -296,7 +282,7 @@ FOR: select { case slot := <-self.peerSlots: i++ - fmt.Printf("%v: found slot %v", i, slot) + fmt.Printf("%v: found slot %v\n", i, slot) if i == self.maxPeers { break FOR } @@ -358,70 +344,68 @@ func (self *Server) outboundPeerHandler(dialer Dialer) { } // check if peer address already connected -func (self *Server) connected(address net.Addr) (err error) { +func (self *Server) isConnected(address net.Addr) bool { self.peersLock.RLock() defer self.peersLock.RUnlock() - // fmt.Printf("address: %v\n", address) - slot, found := self.peersTable[address.String()] - if found { - err = fmt.Errorf("already connected as peer %v (%v)", slot, address) - } - return + _, found := self.peersTable[address.String()] + return found } // connect to peer via listener.Accept() func (self *Server) connectInboundPeer(listener net.Listener, slot int) { var address net.Addr conn, err := listener.Accept() - if err == nil { - address = conn.RemoteAddr() - err = self.connected(address) - if err != nil { - conn.Close() - } - } if err != nil { logger.Debugln(err) self.peerSlots <- slot - } else { - fmt.Printf("adding %v\n", address) - go self.addPeer(conn, address, true, slot) + return + } + address = conn.RemoteAddr() + // XXX: this won't work because the remote socket + // address does not identify the peer. we should + // probably get rid of this check and rely on public + // key detection in the base protocol. + if self.isConnected(address) { + conn.Close() + self.peerSlots <- slot + return } + fmt.Printf("adding %v\n", address) + go self.addPeer(conn, address, true, slot) } // connect to peer via dial out func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) { - var conn net.Conn - err := self.connected(address) - if err == nil { - conn, err = dialer.Dial(address.Network(), address.String()) + if self.isConnected(address) { + return } + conn, err := dialer.Dial(address.Network(), address.String()) if err != nil { - logger.Debugln(err) self.peerSlots <- slot - } else { - go self.addPeer(conn, address, false, slot) + return } + go self.addPeer(conn, address, false, slot) } // creates the new peer object and inserts it into its slot -func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) { +func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) *Peer { self.peersLock.Lock() defer self.peersLock.Unlock() if self.closed { fmt.Println("oopsy, not no longer need peer") conn.Close() //oopsy our bad self.peerSlots <- slot // release slot - } else { - peer := NewPeer(conn, address, inbound, self) - self.peers[slot] = peer - self.peersTable[address.String()] = slot - self.peerCount++ - // reset peersmsg - self.peersMsg = nil - fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot) - peer.Start() + return nil } + logger.Infoln("adding new peer", address) + peer := NewPeer(conn, address, inbound, self) + self.peers[slot] = peer + self.peersTable[address.String()] = slot + self.peerCount++ + self.cachedEncodedPeers = nil + fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot) + peer.Start() + return peer } // removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot @@ -441,13 +425,12 @@ func (self *Server) removePeer(request DisconnectRequest) { self.peerCount-- self.peers[slot] = nil delete(self.peersTable, address.String()) - // reset peersmsg - self.peersMsg = nil + self.cachedEncodedPeers = nil fmt.Printf("removed peer %v (slot %v)\n", peer, slot) self.peersLock.Unlock() // sending disconnect message - disconnectMsg, _ := NewMsg(DiscMsg, request.reason) + disconnectMsg := NewMsg(discMsg, request.reason) peer.Write("", disconnectMsg) // be nice and wait time.Sleep(disconnectGracePeriod * time.Second) @@ -459,11 +442,32 @@ func (self *Server) removePeer(request DisconnectRequest) { self.peerSlots <- slot } +// encodedPeerList returns an RLP-encoded list of peers. +// the returned slice will be nil if there are no peers. +func (self *Server) encodedPeerList() []byte { + // TODO: memoize and reset when peers change + self.peersLock.RLock() + defer self.peersLock.RUnlock() + if self.cachedEncodedPeers == nil && self.peerCount > 0 { + var peerData []interface{} + for _, i := range self.peersTable { + peer := self.peers[i] + peerData = append(peerData, peer.Encode()) + } + self.cachedEncodedPeers = encodePayload(peerData) + } + return self.cachedEncodedPeers +} + // fix handshake message to push to peers -func (self *Server) Handshake() *Msg { - fmt.Println(self.identity.Pubkey()[1:]) - msg, _ := NewMsg(HandshakeMsg, P2PVersion, []byte(self.identity.String()), []interface{}{self.protocols}, self.port, self.identity.Pubkey()[1:]) - return msg +func (self *Server) handshakeMsg() Msg { + return NewMsg(handshakeMsg, + p2pVersion, + []byte(self.identity.String()), + []interface{}{self.protocols}, + self.port, + self.identity.Pubkey()[1:], + ) } func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error { -- cgit