aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/messenger.go
diff options
context:
space:
mode:
authorzelig <viktor.tron@gmail.com>2014-10-23 23:57:54 +0800
committerzelig <viktor.tron@gmail.com>2014-10-23 23:57:54 +0800
commit771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb (patch)
tree15a966dbe15e2f8388f69b396e613c7759b06f6d /p2p/messenger.go
parent119c5b40a7ed1aea1c871c0cb56956b8ef9303d9 (diff)
downloadgo-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.gz
go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.zst
go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.zip
initial commit of p2p package
Diffstat (limited to 'p2p/messenger.go')
-rw-r--r--p2p/messenger.go220
1 files changed, 220 insertions, 0 deletions
diff --git a/p2p/messenger.go b/p2p/messenger.go
new file mode 100644
index 000000000..d42ba1720
--- /dev/null
+++ b/p2p/messenger.go
@@ -0,0 +1,220 @@
+package p2p
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+const (
+ handlerTimeout = 1000
+)
+
+type Handlers map[string](func(p *Peer) Protocol)
+
+type Messenger struct {
+ conn *Connection
+ peer *Peer
+ handlers Handlers
+ protocolLock sync.RWMutex
+ protocols []Protocol
+ offsets []MsgCode // offsets for adaptive message idss
+ protocolTable map[string]int
+ quit chan chan bool
+ err chan *PeerError
+ pulse chan bool
+}
+
+func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
+ baseProtocol := NewBaseProtocol(peer)
+ return &Messenger{
+ conn: conn,
+ peer: peer,
+ offsets: []MsgCode{baseProtocol.Offset()},
+ handlers: handlers,
+ protocols: []Protocol{baseProtocol},
+ protocolTable: make(map[string]int),
+ err: errchan,
+ pulse: make(chan bool, 1),
+ quit: make(chan chan bool, 1),
+ }
+}
+
+func (self *Messenger) Start() {
+ self.conn.Open()
+ go self.messenger()
+ self.protocolLock.RLock()
+ defer self.protocolLock.RUnlock()
+ self.protocols[0].Start()
+}
+
+func (self *Messenger) Stop() {
+ // close pulse to stop ping pong monitoring
+ close(self.pulse)
+ self.protocolLock.RLock()
+ defer self.protocolLock.RUnlock()
+ for _, protocol := range self.protocols {
+ protocol.Stop() // could be parallel
+ }
+ q := make(chan bool)
+ self.quit <- q
+ <-q
+ self.conn.Close()
+}
+
+func (self *Messenger) messenger() {
+ in := self.conn.Read()
+ for {
+ select {
+ case payload, ok := <-in:
+ //dispatches message to the protocol asynchronously
+ if ok {
+ go self.handle(payload)
+ } else {
+ return
+ }
+ case q := <-self.quit:
+ q <- true
+ return
+ }
+ }
+}
+
+// handles each message by dispatching to the appropriate protocol
+// using adaptive message codes
+// this function is started as a separate go routine for each message
+// it waits for the protocol response
+// then encodes and sends outgoing messages to the connection's write channel
+func (self *Messenger) handle(payload []byte) {
+ // send ping to heartbeat channel signalling time of last message
+ // select {
+ // case self.pulse <- true:
+ // default:
+ // }
+ self.pulse <- true
+ // initialise message from payload
+ msg, err := NewMsgFromBytes(payload)
+ if err != nil {
+ self.err <- NewPeerError(MiscError, " %v", err)
+ return
+ }
+ // retrieves protocol based on message Code
+ protocol, offset, peerErr := self.getProtocol(msg.Code())
+ if err != nil {
+ self.err <- peerErr
+ return
+ }
+ // reset message code based on adaptive offset
+ msg.Decode(offset)
+ // dispatches
+ response := make(chan *Msg)
+ go protocol.HandleIn(msg, response)
+ // protocol reponse timeout to prevent leaks
+ timer := time.After(handlerTimeout * time.Millisecond)
+ for {
+ select {
+ case outgoing, ok := <-response:
+ // we check if response channel is not closed
+ if ok {
+ self.conn.Write() <- outgoing.Encode(offset)
+ } else {
+ return
+ }
+ case <-timer:
+ return
+ }
+ }
+}
+
+// negotiated protocols
+// stores offsets needed for adaptive message id scheme
+
+// based on offsets set at handshake
+// get the right protocol to handle the message
+func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
+ self.protocolLock.RLock()
+ defer self.protocolLock.RUnlock()
+ base := MsgCode(0)
+ for index, offset := range self.offsets {
+ if code < offset {
+ return self.protocols[index], base, nil
+ }
+ base = offset
+ }
+ return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
+}
+
+func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
+ fmt.Printf("pingpong keepalive started at %v", time.Now())
+
+ timer := time.After(timeout)
+ pinged := false
+ for {
+ select {
+ case _, ok := <-self.pulse:
+ if ok {
+ pinged = false
+ timer = time.After(timeout)
+ } else {
+ // pulse is closed, stop monitoring
+ return
+ }
+ case <-timer:
+ if pinged {
+ fmt.Printf("timeout at %v", time.Now())
+ timeoutCallback()
+ return
+ } else {
+ fmt.Printf("pinged at %v", time.Now())
+ pingCallback()
+ timer = time.After(gracePeriod)
+ pinged = true
+ }
+ }
+ }
+}
+
+func (self *Messenger) AddProtocols(protocols []string) {
+ self.protocolLock.Lock()
+ defer self.protocolLock.Unlock()
+ i := len(self.offsets)
+ offset := self.offsets[i-1]
+ for _, name := range protocols {
+ protocolFunc, ok := self.handlers[name]
+ if ok {
+ protocol := protocolFunc(self.peer)
+ self.protocolTable[name] = i
+ i++
+ offset += protocol.Offset()
+ fmt.Println("offset ", name, offset)
+
+ self.offsets = append(self.offsets, offset)
+ self.protocols = append(self.protocols, protocol)
+ protocol.Start()
+ } else {
+ fmt.Println("no ", name)
+ // protocol not handled
+ }
+ }
+}
+
+func (self *Messenger) Write(protocol string, msg *Msg) error {
+ self.protocolLock.RLock()
+ defer self.protocolLock.RUnlock()
+ i := 0
+ offset := MsgCode(0)
+ if len(protocol) > 0 {
+ var ok bool
+ i, ok = self.protocolTable[protocol]
+ if !ok {
+ return fmt.Errorf("protocol %v not handled by peer", protocol)
+ }
+ offset = self.offsets[i-1]
+ }
+ handler := self.protocols[i]
+ // checking if protocol status/caps allows the message to be sent out
+ if handler.HandleOut(msg) {
+ self.conn.Write() <- msg.Encode(offset)
+ }
+ return nil
+}