diff options
author | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
---|---|---|
committer | zelig <viktor.tron@gmail.com> | 2014-10-23 23:57:54 +0800 |
commit | 771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb (patch) | |
tree | 15a966dbe15e2f8388f69b396e613c7759b06f6d /p2p/messenger.go | |
parent | 119c5b40a7ed1aea1c871c0cb56956b8ef9303d9 (diff) | |
download | go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.gz go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.tar.zst go-tangerine-771fbcc02e6d10cdf4cda2e8ec8ea23f11066feb.zip |
initial commit of p2p package
Diffstat (limited to 'p2p/messenger.go')
-rw-r--r-- | p2p/messenger.go | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/p2p/messenger.go b/p2p/messenger.go new file mode 100644 index 000000000..d42ba1720 --- /dev/null +++ b/p2p/messenger.go @@ -0,0 +1,220 @@ +package p2p + +import ( + "fmt" + "sync" + "time" +) + +const ( + handlerTimeout = 1000 +) + +type Handlers map[string](func(p *Peer) Protocol) + +type Messenger struct { + conn *Connection + peer *Peer + handlers Handlers + protocolLock sync.RWMutex + protocols []Protocol + offsets []MsgCode // offsets for adaptive message idss + protocolTable map[string]int + quit chan chan bool + err chan *PeerError + pulse chan bool +} + +func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger { + baseProtocol := NewBaseProtocol(peer) + return &Messenger{ + conn: conn, + peer: peer, + offsets: []MsgCode{baseProtocol.Offset()}, + handlers: handlers, + protocols: []Protocol{baseProtocol}, + protocolTable: make(map[string]int), + err: errchan, + pulse: make(chan bool, 1), + quit: make(chan chan bool, 1), + } +} + +func (self *Messenger) Start() { + self.conn.Open() + go self.messenger() + self.protocolLock.RLock() + defer self.protocolLock.RUnlock() + self.protocols[0].Start() +} + +func (self *Messenger) Stop() { + // close pulse to stop ping pong monitoring + close(self.pulse) + self.protocolLock.RLock() + defer self.protocolLock.RUnlock() + for _, protocol := range self.protocols { + protocol.Stop() // could be parallel + } + q := make(chan bool) + self.quit <- q + <-q + self.conn.Close() +} + +func (self *Messenger) messenger() { + in := self.conn.Read() + for { + select { + case payload, ok := <-in: + //dispatches message to the protocol asynchronously + if ok { + go self.handle(payload) + } else { + return + } + case q := <-self.quit: + q <- true + return + } + } +} + +// handles each message by dispatching to the appropriate protocol +// using adaptive message codes +// this function is started as a separate go routine for each message +// it waits for the protocol response +// then encodes and sends outgoing messages to the connection's write channel +func (self *Messenger) handle(payload []byte) { + // send ping to heartbeat channel signalling time of last message + // select { + // case self.pulse <- true: + // default: + // } + self.pulse <- true + // initialise message from payload + msg, err := NewMsgFromBytes(payload) + if err != nil { + self.err <- NewPeerError(MiscError, " %v", err) + return + } + // retrieves protocol based on message Code + protocol, offset, peerErr := self.getProtocol(msg.Code()) + if err != nil { + self.err <- peerErr + return + } + // reset message code based on adaptive offset + msg.Decode(offset) + // dispatches + response := make(chan *Msg) + go protocol.HandleIn(msg, response) + // protocol reponse timeout to prevent leaks + timer := time.After(handlerTimeout * time.Millisecond) + for { + select { + case outgoing, ok := <-response: + // we check if response channel is not closed + if ok { + self.conn.Write() <- outgoing.Encode(offset) + } else { + return + } + case <-timer: + return + } + } +} + +// negotiated protocols +// stores offsets needed for adaptive message id scheme + +// based on offsets set at handshake +// get the right protocol to handle the message +func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) { + self.protocolLock.RLock() + defer self.protocolLock.RUnlock() + base := MsgCode(0) + for index, offset := range self.offsets { + if code < offset { + return self.protocols[index], base, nil + } + base = offset + } + return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code) +} + +func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) { + fmt.Printf("pingpong keepalive started at %v", time.Now()) + + timer := time.After(timeout) + pinged := false + for { + select { + case _, ok := <-self.pulse: + if ok { + pinged = false + timer = time.After(timeout) + } else { + // pulse is closed, stop monitoring + return + } + case <-timer: + if pinged { + fmt.Printf("timeout at %v", time.Now()) + timeoutCallback() + return + } else { + fmt.Printf("pinged at %v", time.Now()) + pingCallback() + timer = time.After(gracePeriod) + pinged = true + } + } + } +} + +func (self *Messenger) AddProtocols(protocols []string) { + self.protocolLock.Lock() + defer self.protocolLock.Unlock() + i := len(self.offsets) + offset := self.offsets[i-1] + for _, name := range protocols { + protocolFunc, ok := self.handlers[name] + if ok { + protocol := protocolFunc(self.peer) + self.protocolTable[name] = i + i++ + offset += protocol.Offset() + fmt.Println("offset ", name, offset) + + self.offsets = append(self.offsets, offset) + self.protocols = append(self.protocols, protocol) + protocol.Start() + } else { + fmt.Println("no ", name) + // protocol not handled + } + } +} + +func (self *Messenger) Write(protocol string, msg *Msg) error { + self.protocolLock.RLock() + defer self.protocolLock.RUnlock() + i := 0 + offset := MsgCode(0) + if len(protocol) > 0 { + var ok bool + i, ok = self.protocolTable[protocol] + if !ok { + return fmt.Errorf("protocol %v not handled by peer", protocol) + } + offset = self.offsets[i-1] + } + handler := self.protocols[i] + // checking if protocol status/caps allows the message to be sent out + if handler.HandleOut(msg) { + self.conn.Write() <- msg.Encode(offset) + } + return nil +} |