aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocol.go
diff options
context:
space:
mode:
authorzelig <viktor.tron@gmail.com>2014-10-23 23:57:54 +0800
committerzelig <viktor.tron@gmail.com>2014-10-23 23:57:54 +0800
commit771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb (patch)
tree15a966dbe15e2f8388f69b396e613c7759b06f6d /p2p/protocol.go
parent119c5b40a7ed1aea1c871c0cb56956b8ef9303d9 (diff)
downloadgo-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.gz
go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.zst
go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.zip
initial commit of p2p package
Diffstat (limited to 'p2p/protocol.go')
-rw-r--r--p2p/protocol.go278
1 files changed, 278 insertions, 0 deletions
diff --git a/p2p/protocol.go b/p2p/protocol.go
new file mode 100644
index 000000000..5d05ced7d
--- /dev/null
+++ b/p2p/protocol.go
@@ -0,0 +1,278 @@
+package p2p
+
+import (
+ "bytes"
+ "fmt"
+ "net"
+ "sort"
+ "sync"
+ "time"
+)
+
+type Protocol interface {
+ Start()
+ Stop()
+ HandleIn(*Msg, chan *Msg)
+ HandleOut(*Msg) bool
+ Offset() MsgCode
+ Name() string
+}
+
+const (
+ P2PVersion = 0
+ pingTimeout = 2
+ pingGracePeriod = 2
+)
+
+const (
+ HandshakeMsg = iota
+ DiscMsg
+ PingMsg
+ PongMsg
+ GetPeersMsg
+ PeersMsg
+ offset = 16
+)
+
+type ProtocolState uint8
+
+const (
+ nullState = iota
+ handshakeReceived
+)
+
+type DiscReason byte
+
+const (
+ // Values are given explicitly instead of by iota because these values are
+ // defined by the wire protocol spec; it is easier for humans to ensure
+ // correctness when values are explicit.
+ DiscRequested = 0x00
+ DiscNetworkError = 0x01
+ DiscProtocolError = 0x02
+ DiscUselessPeer = 0x03
+ DiscTooManyPeers = 0x04
+ DiscAlreadyConnected = 0x05
+ DiscIncompatibleVersion = 0x06
+ DiscInvalidIdentity = 0x07
+ DiscQuitting = 0x08
+ DiscUnexpectedIdentity = 0x09
+ DiscSelf = 0x0a
+ DiscReadTimeout = 0x0b
+ DiscSubprotocolError = 0x10
+)
+
+var discReasonToString = map[DiscReason]string{
+ DiscRequested: "Disconnect requested",
+ DiscNetworkError: "Network error",
+ DiscProtocolError: "Breach of protocol",
+ DiscUselessPeer: "Useless peer",
+ DiscTooManyPeers: "Too many peers",
+ DiscAlreadyConnected: "Already connected",
+ DiscIncompatibleVersion: "Incompatible P2P protocol version",
+ DiscInvalidIdentity: "Invalid node identity",
+ DiscQuitting: "Client quitting",
+ DiscUnexpectedIdentity: "Unexpected identity",
+ DiscSelf: "Connected to self",
+ DiscReadTimeout: "Read timeout",
+ DiscSubprotocolError: "Subprotocol error",
+}
+
+func (d DiscReason) String() string {
+ if len(discReasonToString) < int(d) {
+ return "Unknown"
+ }
+
+ return discReasonToString[d]
+}
+
+type BaseProtocol struct {
+ peer *Peer
+ state ProtocolState
+ stateLock sync.RWMutex
+}
+
+func NewBaseProtocol(peer *Peer) *BaseProtocol {
+ self := &BaseProtocol{
+ peer: peer,
+ }
+
+ return self
+}
+
+func (self *BaseProtocol) Start() {
+ if self.peer != nil {
+ self.peer.Write("", self.peer.Server().Handshake())
+ go self.peer.Messenger().PingPong(
+ pingTimeout*time.Second,
+ pingGracePeriod*time.Second,
+ self.Ping,
+ self.Timeout,
+ )
+ }
+}
+
+func (self *BaseProtocol) Stop() {
+}
+
+func (self *BaseProtocol) Ping() {
+ msg, _ := NewMsg(PingMsg)
+ self.peer.Write("", msg)
+}
+
+func (self *BaseProtocol) Timeout() {
+ self.peerError(PingTimeout, "")
+}
+
+func (self *BaseProtocol) Name() string {
+ return ""
+}
+
+func (self *BaseProtocol) Offset() MsgCode {
+ return offset
+}
+
+func (self *BaseProtocol) CheckState(state ProtocolState) bool {
+ self.stateLock.RLock()
+ self.stateLock.RUnlock()
+ if self.state != state {
+ return false
+ } else {
+ return true
+ }
+}
+
+func (self *BaseProtocol) HandleIn(msg *Msg, response chan *Msg) {
+ if msg.Code() == HandshakeMsg {
+ self.handleHandshake(msg)
+ } else {
+ if !self.CheckState(handshakeReceived) {
+ self.peerError(ProtocolBreach, "message code %v not allowed", msg.Code())
+ close(response)
+ return
+ }
+ switch msg.Code() {
+ case DiscMsg:
+ logger.Infof("Disconnect requested from peer %v, reason", DiscReason(msg.Data().Get(0).Uint()))
+ self.peer.Server().PeerDisconnect() <- DisconnectRequest{
+ addr: self.peer.Address,
+ reason: DiscRequested,
+ }
+ case PingMsg:
+ out, _ := NewMsg(PongMsg)
+ response <- out
+ case PongMsg:
+ case GetPeersMsg:
+ // Peer asked for list of connected peers
+ if out, err := self.peer.Server().PeersMessage(); err != nil {
+ response <- out
+ }
+ case PeersMsg:
+ self.handlePeers(msg)
+ default:
+ self.peerError(InvalidMsgCode, "unknown message code %v", msg.Code())
+ }
+ }
+ close(response)
+}
+
+func (self *BaseProtocol) HandleOut(msg *Msg) (allowed bool) {
+ // somewhat overly paranoid
+ allowed = msg.Code() == HandshakeMsg || msg.Code() == DiscMsg || msg.Code() < self.Offset() && self.CheckState(handshakeReceived)
+ return
+}
+
+func (self *BaseProtocol) peerError(errorCode ErrorCode, format string, v ...interface{}) {
+ err := NewPeerError(errorCode, format, v...)
+ logger.Warnln(err)
+ fmt.Println(self.peer, err)
+ if self.peer != nil {
+ self.peer.PeerErrorChan() <- err
+ }
+}
+
+func (self *BaseProtocol) handlePeers(msg *Msg) {
+ it := msg.Data().NewIterator()
+ for it.Next() {
+ ip := net.IP(it.Value().Get(0).Bytes())
+ port := it.Value().Get(1).Uint()
+ address := &net.TCPAddr{IP: ip, Port: int(port)}
+ go self.peer.Server().PeerConnect(address)
+ }
+}
+
+func (self *BaseProtocol) handleHandshake(msg *Msg) {
+ self.stateLock.Lock()
+ defer self.stateLock.Unlock()
+ if self.state != nullState {
+ self.peerError(ProtocolBreach, "extra handshake")
+ return
+ }
+
+ c := msg.Data()
+
+ var (
+ p2pVersion = c.Get(0).Uint()
+ id = c.Get(1).Str()
+ caps = c.Get(2)
+ port = c.Get(3).Uint()
+ pubkey = c.Get(4).Bytes()
+ )
+ fmt.Printf("handshake received %v, %v, %v, %v, %v ", p2pVersion, id, caps, port, pubkey)
+
+ // Check correctness of p2p protocol version
+ if p2pVersion != P2PVersion {
+ self.peerError(P2PVersionMismatch, "Require protocol %d, received %d\n", P2PVersion, p2pVersion)
+ return
+ }
+
+ // Handle the pub key (validation, uniqueness)
+ if len(pubkey) == 0 {
+ self.peerError(PubkeyMissing, "not supplied in handshake.")
+ return
+ }
+
+ if len(pubkey) != 64 {
+ self.peerError(PubkeyInvalid, "require 512 bit, got %v", len(pubkey)*8)
+ return
+ }
+
+ // Self connect detection
+ if bytes.Compare(self.peer.Server().ClientIdentity().Pubkey()[1:], pubkey) == 0 {
+ self.peerError(PubkeyForbidden, "not allowed to connect to self")
+ return
+ }
+
+ // register pubkey on server. this also sets the pubkey on the peer (need lock)
+ if err := self.peer.Server().RegisterPubkey(self.peer, pubkey); err != nil {
+ self.peerError(PubkeyForbidden, err.Error())
+ return
+ }
+
+ // check port
+ if self.peer.Inbound {
+ uint16port := uint16(port)
+ if self.peer.Port > 0 && self.peer.Port != uint16port {
+ self.peerError(PortMismatch, "port mismatch: %v != %v", self.peer.Port, port)
+ return
+ } else {
+ self.peer.Port = uint16port
+ }
+ }
+
+ capsIt := caps.NewIterator()
+ for capsIt.Next() {
+ cap := capsIt.Value().Str()
+ self.peer.Caps = append(self.peer.Caps, cap)
+ }
+ sort.Strings(self.peer.Caps)
+ self.peer.Messenger().AddProtocols(self.peer.Caps)
+
+ self.peer.Id = id
+
+ self.state = handshakeReceived
+
+ //p.ethereum.PushPeer(p)
+ // p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
+ return
+}