1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
package p2p
import (
"fmt"
"sync"
"time"
)
const (
handlerTimeout = 1000
)
type Handlers map[string](func(p *Peer) Protocol)
type Messenger struct {
conn *Connection
peer *Peer
handlers Handlers
protocolLock sync.RWMutex
protocols []Protocol
offsets []MsgCode // offsets for adaptive message idss
protocolTable map[string]int
quit chan chan bool
err chan *PeerError
pulse chan bool
}
func NewMessenger(peer *Peer, conn *Connection, errchan chan *PeerError, handlers Handlers) *Messenger {
baseProtocol := NewBaseProtocol(peer)
return &Messenger{
conn: conn,
peer: peer,
offsets: []MsgCode{baseProtocol.Offset()},
handlers: handlers,
protocols: []Protocol{baseProtocol},
protocolTable: make(map[string]int),
err: errchan,
pulse: make(chan bool, 1),
quit: make(chan chan bool, 1),
}
}
func (self *Messenger) Start() {
self.conn.Open()
go self.messenger()
self.protocolLock.RLock()
defer self.protocolLock.RUnlock()
self.protocols[0].Start()
}
func (self *Messenger) Stop() {
// close pulse to stop ping pong monitoring
close(self.pulse)
self.protocolLock.RLock()
defer self.protocolLock.RUnlock()
for _, protocol := range self.protocols {
protocol.Stop() // could be parallel
}
q := make(chan bool)
self.quit <- q
<-q
self.conn.Close()
}
func (self *Messenger) messenger() {
in := self.conn.Read()
for {
select {
case payload, ok := <-in:
//dispatches message to the protocol asynchronously
if ok {
go self.handle(payload)
} else {
return
}
case q := <-self.quit:
q <- true
return
}
}
}
// handles each message by dispatching to the appropriate protocol
// using adaptive message codes
// this function is started as a separate go routine for each message
// it waits for the protocol response
// then encodes and sends outgoing messages to the connection's write channel
func (self *Messenger) handle(payload []byte) {
// send ping to heartbeat channel signalling time of last message
// select {
// case self.pulse <- true:
// default:
// }
self.pulse <- true
// initialise message from payload
msg, err := NewMsgFromBytes(payload)
if err != nil {
self.err <- NewPeerError(MiscError, " %v", err)
return
}
// retrieves protocol based on message Code
protocol, offset, peerErr := self.getProtocol(msg.Code())
if err != nil {
self.err <- peerErr
return
}
// reset message code based on adaptive offset
msg.Decode(offset)
// dispatches
response := make(chan *Msg)
go protocol.HandleIn(msg, response)
// protocol reponse timeout to prevent leaks
timer := time.After(handlerTimeout * time.Millisecond)
for {
select {
case outgoing, ok := <-response:
// we check if response channel is not closed
if ok {
self.conn.Write() <- outgoing.Encode(offset)
} else {
return
}
case <-timer:
return
}
}
}
// negotiated protocols
// stores offsets needed for adaptive message id scheme
// based on offsets set at handshake
// get the right protocol to handle the message
func (self *Messenger) getProtocol(code MsgCode) (Protocol, MsgCode, *PeerError) {
self.protocolLock.RLock()
defer self.protocolLock.RUnlock()
base := MsgCode(0)
for index, offset := range self.offsets {
if code < offset {
return self.protocols[index], base, nil
}
base = offset
}
return nil, MsgCode(0), NewPeerError(InvalidMsgCode, " %v", code)
}
func (self *Messenger) PingPong(timeout time.Duration, gracePeriod time.Duration, pingCallback func(), timeoutCallback func()) {
fmt.Printf("pingpong keepalive started at %v", time.Now())
timer := time.After(timeout)
pinged := false
for {
select {
case _, ok := <-self.pulse:
if ok {
pinged = false
timer = time.After(timeout)
} else {
// pulse is closed, stop monitoring
return
}
case <-timer:
if pinged {
fmt.Printf("timeout at %v", time.Now())
timeoutCallback()
return
} else {
fmt.Printf("pinged at %v", time.Now())
pingCallback()
timer = time.After(gracePeriod)
pinged = true
}
}
}
}
func (self *Messenger) AddProtocols(protocols []string) {
self.protocolLock.Lock()
defer self.protocolLock.Unlock()
i := len(self.offsets)
offset := self.offsets[i-1]
for _, name := range protocols {
protocolFunc, ok := self.handlers[name]
if ok {
protocol := protocolFunc(self.peer)
self.protocolTable[name] = i
i++
offset += protocol.Offset()
fmt.Println("offset ", name, offset)
self.offsets = append(self.offsets, offset)
self.protocols = append(self.protocols, protocol)
protocol.Start()
} else {
fmt.Println("no ", name)
// protocol not handled
}
}
}
func (self *Messenger) Write(protocol string, msg *Msg) error {
self.protocolLock.RLock()
defer self.protocolLock.RUnlock()
i := 0
offset := MsgCode(0)
if len(protocol) > 0 {
var ok bool
i, ok = self.protocolTable[protocol]
if !ok {
return fmt.Errorf("protocol %v not handled by peer", protocol)
}
offset = self.offsets[i-1]
}
handler := self.protocols[i]
// checking if protocol status/caps allows the message to be sent out
if handler.HandleOut(msg) {
self.conn.Write() <- msg.Encode(offset)
}
return nil
}
|