diff options
author | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
commit | 771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb (patch) | |
tree | 15a966dbe15e2f8388f69b396e613c7759b06f6d /p2p/server.go | |
parent | 119c5b40a7ed1aea1c871c0cb56956b8ef9303d9 (diff) | |
download | go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.gz go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.zst go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.zip |
initial commit of p2p package
Diffstat (limited to 'p2p/server.go')
-rw-r--r-- | p2p/server.go | 484 |
1 files changed, 484 insertions, 0 deletions
diff --git a/p2p/server.go b/p2p/server.go new file mode 100644 index 000000000..a6bbd9260 --- /dev/null +++ b/p2p/server.go @@ -0,0 +1,484 @@ +package p2p + +import ( + "bytes" + "fmt" + "net" + "sort" + "strconv" + "sync" + "time" + + "github.com/ethereum/eth-go/ethlog" +) + +const ( + outboundAddressPoolSize = 10 + disconnectGracePeriod = 2 +) + +type Blacklist interface { + Get([]byte) (bool, error) + Put([]byte) error + Delete([]byte) error + Exists(pubkey []byte) (ok bool) +} + +type BlacklistMap struct { + blacklist map[string]bool + lock sync.RWMutex +} + +func NewBlacklist() *BlacklistMap { + return &BlacklistMap{ + blacklist: make(map[string]bool), + } +} + +func (self *BlacklistMap) Get(pubkey []byte) (bool, error) { + self.lock.RLock() + defer self.lock.RUnlock() + v, ok := self.blacklist[string(pubkey)] + var err error + if !ok { + err = fmt.Errorf("not found") + } + return v, err +} + +func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) { + self.lock.RLock() + defer self.lock.RUnlock() + _, ok = self.blacklist[string(pubkey)] + return +} + +func (self *BlacklistMap) Put(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + self.blacklist[string(pubkey)] = true + return nil +} + +func (self *BlacklistMap) Delete(pubkey []byte) error { + self.lock.RLock() + defer self.lock.RUnlock() + delete(self.blacklist, string(pubkey)) + return nil +} + +type Server struct { + network Network + listening bool //needed? + dialing bool //needed? + closed bool + identity ClientIdentity + addr net.Addr + port uint16 + protocols []string + + quit chan chan bool + peersLock sync.RWMutex + + maxPeers int + peers []*Peer + peerSlots chan int + peersTable map[string]int + peersMsg *Msg + peerCount int + + peerConnect chan net.Addr + peerDisconnect chan DisconnectRequest + blacklist Blacklist + handlers Handlers +} + +var logger = ethlog.NewLogger("P2P") + +func New(network Network, addr net.Addr, identity ClientIdentity, handlers Handlers, maxPeers int, blacklist Blacklist) *Server { + // get alphabetical list of protocol names from handlers map + protocols := []string{} + for protocol := range handlers { + protocols = append(protocols, protocol) + } + sort.Strings(protocols) + + _, port, _ := net.SplitHostPort(addr.String()) + intport, _ := strconv.Atoi(port) + + self := &Server{ + // NewSimpleClientIdentity(clientIdentifier, version, customIdentifier) + network: network, + identity: identity, + addr: addr, + port: uint16(intport), + protocols: protocols, + + quit: make(chan chan bool), + + maxPeers: maxPeers, + peers: make([]*Peer, maxPeers), + peerSlots: make(chan int, maxPeers), + peersTable: make(map[string]int), + + peerConnect: make(chan net.Addr, outboundAddressPoolSize), + peerDisconnect: make(chan DisconnectRequest), + blacklist: blacklist, + + handlers: handlers, + } + for i := 0; i < maxPeers; i++ { + self.peerSlots <- i // fill up with indexes + } + return self +} + +func (self *Server) NewAddr(host string, port int) (addr net.Addr, err error) { + addr, err = self.network.NewAddr(host, port) + return +} + +func (self *Server) ParseAddr(address string) (addr net.Addr, err error) { + addr, err = self.network.ParseAddr(address) + return +} + +func (self *Server) ClientIdentity() ClientIdentity { + return self.identity +} + +func (self *Server) PeersMessage() (msg *Msg, err error) { + // TODO: memoize and reset when peers change + self.peersLock.RLock() + defer self.peersLock.RUnlock() + msg = self.peersMsg + if msg == nil { + var peerData []interface{} + for _, i := range self.peersTable { + peer := self.peers[i] + peerData = append(peerData, peer.Encode()) + } + if len(peerData) == 0 { + err = fmt.Errorf("no peers") + } else { + msg, err = NewMsg(PeersMsg, peerData...) + self.peersMsg = msg //memoize + } + } + return +} + +func (self *Server) Peers() (peers []*Peer) { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + for _, peer := range self.peers { + if peer != nil { + peers = append(peers, peer) + } + } + return +} + +func (self *Server) PeerCount() int { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + return self.peerCount +} + +var getPeersMsg, _ = NewMsg(GetPeersMsg) + +func (self *Server) PeerConnect(addr net.Addr) { + // TODO: should buffer, filter and uniq + // send GetPeersMsg if not blocking + select { + case self.peerConnect <- addr: // not enough peers + self.Broadcast("", getPeersMsg) + default: // we dont care + } +} + +func (self *Server) PeerDisconnect() chan DisconnectRequest { + return self.peerDisconnect +} + +func (self *Server) Blacklist() Blacklist { + return self.blacklist +} + +func (self *Server) Handlers() Handlers { + return self.handlers +} + +func (self *Server) Broadcast(protocol string, msg *Msg) { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + for _, peer := range self.peers { + if peer != nil { + peer.Write(protocol, msg) + } + } +} + +// Start the server +func (self *Server) Start(listen bool, dial bool) { + self.network.Start() + if listen { + listener, err := self.network.Listener(self.addr) + if err != nil { + logger.Warnf("Error initializing listener: %v", err) + logger.Warnf("Connection listening disabled") + self.listening = false + } else { + self.listening = true + logger.Infoln("Listen on %v: ready and accepting connections", listener.Addr()) + go self.inboundPeerHandler(listener) + } + } + if dial { + dialer, err := self.network.Dialer(self.addr) + if err != nil { + logger.Warnf("Error initializing dialer: %v", err) + logger.Warnf("Connection dialout disabled") + self.dialing = false + } else { + self.dialing = true + logger.Infoln("Dial peers watching outbound address pool") + go self.outboundPeerHandler(dialer) + } + } + logger.Infoln("server started") +} + +func (self *Server) Stop() { + logger.Infoln("server stopping...") + // // quit one loop if dialing + if self.dialing { + logger.Infoln("stop dialout...") + dialq := make(chan bool) + self.quit <- dialq + <-dialq + fmt.Println("quit another") + } + // quit the other loop if listening + if self.listening { + logger.Infoln("stop listening...") + listenq := make(chan bool) + self.quit <- listenq + <-listenq + fmt.Println("quit one") + } + + fmt.Println("quit waited") + + logger.Infoln("stopping peers...") + peers := []net.Addr{} + self.peersLock.RLock() + self.closed = true + for _, peer := range self.peers { + if peer != nil { + peers = append(peers, peer.Address) + } + } + self.peersLock.RUnlock() + for _, address := range peers { + go self.removePeer(DisconnectRequest{ + addr: address, + reason: DiscQuitting, + }) + } + // wait till they actually disconnect + // this is checked by draining the peerSlots (slots are released back if a peer is removed) + i := 0 + fmt.Println("draining peers") + +FOR: + for { + select { + case slot := <-self.peerSlots: + i++ + fmt.Printf("%v: found slot %v", i, slot) + if i == self.maxPeers { + break FOR + } + } + } + logger.Infoln("server stopped") +} + +// main loop for adding connections via listening +func (self *Server) inboundPeerHandler(listener net.Listener) { + for { + select { + case slot := <-self.peerSlots: + go self.connectInboundPeer(listener, slot) + case errc := <-self.quit: + listener.Close() + fmt.Println("quit listenloop") + errc <- true + return + } + } +} + +// main loop for adding outbound peers based on peerConnect address pool +// this same loop handles peer disconnect requests as well +func (self *Server) outboundPeerHandler(dialer Dialer) { + // addressChan initially set to nil (only watches peerConnect if we need more peers) + var addressChan chan net.Addr + slots := self.peerSlots + var slot *int + for { + select { + case i := <-slots: + // we need a peer in slot i, slot reserved + slot = &i + // now we can watch for candidate peers in the next loop + addressChan = self.peerConnect + // do not consume more until candidate peer is found + slots = nil + case address := <-addressChan: + // candidate peer found, will dial out asyncronously + // if connection fails slot will be released + go self.connectOutboundPeer(dialer, address, *slot) + // we can watch if more peers needed in the next loop + slots = self.peerSlots + // until then we dont care about candidate peers + addressChan = nil + case request := <-self.peerDisconnect: + go self.removePeer(request) + case errc := <-self.quit: + if addressChan != nil && slot != nil { + self.peerSlots <- *slot + } + fmt.Println("quit dialloop") + errc <- true + return + } + } +} + +// check if peer address already connected +func (self *Server) connected(address net.Addr) (err error) { + self.peersLock.RLock() + defer self.peersLock.RUnlock() + // fmt.Printf("address: %v\n", address) + slot, found := self.peersTable[address.String()] + if found { + err = fmt.Errorf("already connected as peer %v (%v)", slot, address) + } + return +} + +// connect to peer via listener.Accept() +func (self *Server) connectInboundPeer(listener net.Listener, slot int) { + var address net.Addr + conn, err := listener.Accept() + if err == nil { + address = conn.RemoteAddr() + err = self.connected(address) + if err != nil { + conn.Close() + } + } + if err != nil { + logger.Debugln(err) + self.peerSlots <- slot + } else { + fmt.Printf("adding %v\n", address) + go self.addPeer(conn, address, true, slot) + } +} + +// connect to peer via dial out +func (self *Server) connectOutboundPeer(dialer Dialer, address net.Addr, slot int) { + var conn net.Conn + err := self.connected(address) + if err == nil { + conn, err = dialer.Dial(address.Network(), address.String()) + } + if err != nil { + logger.Debugln(err) + self.peerSlots <- slot + } else { + go self.addPeer(conn, address, false, slot) + } +} + +// creates the new peer object and inserts it into its slot +func (self *Server) addPeer(conn net.Conn, address net.Addr, inbound bool, slot int) { + self.peersLock.Lock() + defer self.peersLock.Unlock() + if self.closed { + fmt.Println("oopsy, not no longer need peer") + conn.Close() //oopsy our bad + self.peerSlots <- slot // release slot + } else { + peer := NewPeer(conn, address, inbound, self) + self.peers[slot] = peer + self.peersTable[address.String()] = slot + self.peerCount++ + // reset peersmsg + self.peersMsg = nil + fmt.Printf("added peer %v %v (slot %v)\n", address, peer, slot) + peer.Start() + } +} + +// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot +func (self *Server) removePeer(request DisconnectRequest) { + self.peersLock.Lock() + + address := request.addr + slot := self.peersTable[address.String()] + peer := self.peers[slot] + fmt.Printf("removing peer %v %v (slot %v)\n", address, peer, slot) + if peer == nil { + logger.Debugf("already removed peer on %v", address) + self.peersLock.Unlock() + return + } + // remove from list and index + self.peerCount-- + self.peers[slot] = nil + delete(self.peersTable, address.String()) + // reset peersmsg + self.peersMsg = nil + fmt.Printf("removed peer %v (slot %v)\n", peer, slot) + self.peersLock.Unlock() + + // sending disconnect message + disconnectMsg, _ := NewMsg(DiscMsg, request.reason) + peer.Write("", disconnectMsg) + // be nice and wait + time.Sleep(disconnectGracePeriod * time.Second) + // switch off peer and close connections etc. + fmt.Println("stopping peer") + peer.Stop() + fmt.Println("stopped peer") + // release slot to signal need for a new peer, last! + self.peerSlots <- slot +} + +// fix handshake message to push to peers +func (self *Server) Handshake() *Msg { + fmt.Println(self.identity.Pubkey()[1:]) + msg, _ := NewMsg(HandshakeMsg, P2PVersion, []byte(self.identity.String()), []interface{}{self.protocols}, self.port, self.identity.Pubkey()[1:]) + return msg +} + +func (self *Server) RegisterPubkey(candidate *Peer, pubkey []byte) error { + // Check for blacklisting + if self.blacklist.Exists(pubkey) { + return fmt.Errorf("blacklisted") + } + + self.peersLock.RLock() + defer self.peersLock.RUnlock() + for _, peer := range self.peers { + if peer != nil && peer != candidate && bytes.Compare(peer.Pubkey, pubkey) == 0 { + return fmt.Errorf("already connected") + } + } + candidate.Pubkey = pubkey + return nil +} |