aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/protocols/protocol.go
blob: 615f74b569746f955192fff4b69c601cc58138c5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

/*
Package protocols is an extension to p2p. It offers a user friendly simple way to define
devp2p subprotocols by abstracting away code standardly shared by protocols.

* automate assigments of code indexes to messages
* automate RLP decoding/encoding based on reflecting
* provide the forever loop to read incoming messages
* standardise error handling related to communication
* standardised  handshake negotiation
* TODO: automatic generation of wire protocol specification for peers

*/
package protocols

import (
    "bufio"
    "bytes"
    "context"
    "fmt"
    "io"
    "reflect"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/metrics"
    "github.com/ethereum/go-ethereum/p2p"
    "github.com/ethereum/go-ethereum/rlp"
    "github.com/ethereum/go-ethereum/swarm/spancontext"
    "github.com/ethereum/go-ethereum/swarm/tracing"
    opentracing "github.com/opentracing/opentracing-go"
)

// error codes used by this  protocol scheme
const (
    ErrMsgTooLong = iota
    ErrDecode
    ErrWrite
    ErrInvalidMsgCode
    ErrInvalidMsgType
    ErrHandshake
    ErrNoHandler
    ErrHandler
)

// error description strings associated with the codes
var errorToString = map[int]string{
    ErrMsgTooLong:     "Message too long",
    ErrDecode:         "Invalid message (RLP error)",
    ErrWrite:          "Error sending message",
    ErrInvalidMsgCode: "Invalid message code",
    ErrInvalidMsgType: "Invalid message type",
    ErrHandshake:      "Handshake error",
    ErrNoHandler:      "No handler registered error",
    ErrHandler:        "Message handler error",
}

/*
Error implements the standard go error interface.
Use:

  errorf(code, format, params ...interface{})

Prints as:

 <description>: <details>

where description is given by code in errorToString
and details is fmt.Sprintf(format, params...)

exported field Code can be checked
*/
type Error struct {
    Code    int
    message string
    format  string
    params  []interface{}
}

func (e Error) Error() (message string) {
    if len(e.message) == 0 {
        name, ok := errorToString[e.Code]
        if !ok {
            panic("invalid message code")
        }
        e.message = name
        if e.format != "" {
            e.message += ": " + fmt.Sprintf(e.format, e.params...)
        }
    }
    return e.message
}

func errorf(code int, format string, params ...interface{}) *Error {
    return &Error{
        Code:   code,
        format: format,
        params: params,
    }
}

// WrappedMsg is used to propagate marshalled context alongside message payloads
type WrappedMsg struct {
    Context []byte
    Size    uint32
    Payload []byte
}

// Spec is a protocol specification including its name and version as well as
// the types of messages which are exchanged
type Spec struct {
    // Name is the name of the protocol, often a three-letter word
    Name string

    // Version is the version number of the protocol
    Version uint

    // MaxMsgSize is the maximum accepted length of the message payload
    MaxMsgSize uint32

    // Messages is a list of message data types which this protocol uses, with
    // each message type being sent with its array index as the code (so
    // [&foo{}, &bar{}, &baz{}] would send foo, bar and baz with codes
    // 0, 1 and 2 respectively)
    // each message must have a single unique data type
    Messages []interface{}

    initOnce sync.Once
    codes    map[reflect.Type]uint64
    types    map[uint64]reflect.Type
}

func (s *Spec) init() {
    s.initOnce.Do(func() {
        s.codes = make(map[reflect.Type]uint64, len(s.Messages))
        s.types = make(map[uint64]reflect.Type, len(s.Messages))
        for i, msg := range s.Messages {
            code := uint64(i)
            typ := reflect.TypeOf(msg)
            if typ.Kind() == reflect.Ptr {
                typ = typ.Elem()
            }
            s.codes[typ] = code
            s.types[code] = typ
        }
    })
}

// Length returns the number of message types in the protocol
func (s *Spec) Length() uint64 {
    return uint64(len(s.Messages))
}

// GetCode returns the message code of a type, and boolean second argument is
// false if the message type is not found
func (s *Spec) GetCode(msg interface{}) (uint64, bool) {
    s.init()
    typ := reflect.TypeOf(msg)
    if typ.Kind() == reflect.Ptr {
        typ = typ.Elem()
    }
    code, ok := s.codes[typ]
    return code, ok
}

// NewMsg construct a new message type given the code
func (s *Spec) NewMsg(code uint64) (interface{}, bool) {
    s.init()
    typ, ok := s.types[code]
    if !ok {
        return nil, false
    }
    return reflect.New(typ).Interface(), true
}

// Peer represents a remote peer or protocol instance that is running on a peer connection with
// a remote peer
type Peer struct {
    *p2p.Peer                   // the p2p.Peer object representing the remote
    rw        p2p.MsgReadWriter // p2p.MsgReadWriter to send messages to and read messages from
    spec      *Spec
}

// NewPeer constructs a new peer
// this constructor is called by the p2p.Protocol#Run function
// the first two arguments are the arguments passed to p2p.Protocol.Run function
// the third argument is the Spec describing the protocol
func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer {
    return &Peer{
        Peer: p,
        rw:   rw,
        spec: spec,
    }
}

// Run starts the forever loop that handles incoming messages
// called within the p2p.Protocol#Run function
// the handler argument is a function which is called for each message received
// from the remote peer, a returned error causes the loop to exit
// resulting in disconnection
func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) error {
    for {
        if err := p.handleIncoming(handler); err != nil {
            if err != io.EOF {
                metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1)
                log.Error("peer.handleIncoming", "err", err)
            }

            return err
        }
    }
}

// Drop disconnects a peer.
// TODO: may need to implement protocol drop only? don't want to kick off the peer
// if they are useful for other protocols
func (p *Peer) Drop(err error) {
    p.Disconnect(p2p.DiscSubprotocolError)
}

// Send takes a message, encodes it in RLP, finds the right message code and sends the
// message off to the peer
// this low level call will be wrapped by libraries providing routed or broadcast sends
// but often just used to forward and push messages to directly connected peers
func (p *Peer) Send(ctx context.Context, msg interface{}) error {
    defer metrics.GetOrRegisterResettingTimer("peer.send_t", nil).UpdateSince(time.Now())
    metrics.GetOrRegisterCounter("peer.send", nil).Inc(1)

    var b bytes.Buffer
    if tracing.Enabled {
        writer := bufio.NewWriter(&b)

        tracer := opentracing.GlobalTracer()

        sctx := spancontext.FromContext(ctx)

        if sctx != nil {
            err := tracer.Inject(
                sctx,
                opentracing.Binary,
                writer)
            if err != nil {
                return err
            }
        }

        writer.Flush()
    }

    r, err := rlp.EncodeToBytes(msg)
    if err != nil {
        return err
    }

    wmsg := WrappedMsg{
        Context: b.Bytes(),
        Size:    uint32(len(r)),
        Payload: r,
    }

    code, found := p.spec.GetCode(msg)
    if !found {
        return errorf(ErrInvalidMsgType, "%v", code)
    }
    return p2p.Send(p.rw, code, wmsg)
}

// handleIncoming(code)
// is called each cycle of the main forever loop that dispatches incoming messages
// if this returns an error the loop returns and the peer is disconnected with the error
// this generic handler
// * checks message size,
// * checks for out-of-range message codes,
// * handles decoding with reflection,
// * call handlers as callbacks
func (p *Peer) handleIncoming(handle func(ctx context.Context, msg interface{}) error) error {
    msg, err := p.rw.ReadMsg()
    if err != nil {
        return err
    }
    // make sure that the payload has been fully consumed
    defer msg.Discard()

    if msg.Size > p.spec.MaxMsgSize {
        return errorf(ErrMsgTooLong, "%v > %v", msg.Size, p.spec.MaxMsgSize)
    }

    // unmarshal wrapped msg, which might contain context
    var wmsg WrappedMsg
    err = msg.Decode(&wmsg)
    if err != nil {
        log.Error(err.Error())
        return err
    }

    ctx := context.Background()

    // if tracing is enabled and the context coming within the request is
    // not empty, try to unmarshal it
    if tracing.Enabled && len(wmsg.Context) > 0 {
        var sctx opentracing.SpanContext

        tracer := opentracing.GlobalTracer()
        sctx, err = tracer.Extract(
            opentracing.Binary,
            bytes.NewReader(wmsg.Context))
        if err != nil {
            log.Error(err.Error())
            return err
        }

        ctx = spancontext.WithContext(ctx, sctx)
    }

    val, ok := p.spec.NewMsg(msg.Code)
    if !ok {
        return errorf(ErrInvalidMsgCode, "%v", msg.Code)
    }
    if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil {
        return errorf(ErrDecode, "<= %v: %v", msg, err)
    }

    // call the registered handler callbacks
    // a registered callback take the decoded message as argument as an interface
    // which the handler is supposed to cast to the appropriate type
    // it is entirely safe not to check the cast in the handler since the handler is
    // chosen based on the proper type in the first place
    if err := handle(ctx, val); err != nil {
        return errorf(ErrHandler, "(msg code %v): %v", msg.Code, err)
    }
    return nil
}

// Handshake negotiates a handshake on the peer connection
// * arguments
//   * context
//   * the local handshake to be sent to the remote peer
//   * funcion to be called on the remote handshake (can be nil)
// * expects a remote handshake back of the same type
// * the dialing peer needs to send the handshake first and then waits for remote
// * the listening peer waits for the remote handshake and then sends it
// returns the remote handshake and an error
func (p *Peer) Handshake(ctx context.Context, hs interface{}, verify func(interface{}) error) (rhs interface{}, err error) {
    if _, ok := p.spec.GetCode(hs); !ok {
        return nil, errorf(ErrHandshake, "unknown handshake message type: %T", hs)
    }
    errc := make(chan error, 2)
    handle := func(ctx context.Context, msg interface{}) error {
        rhs = msg
        if verify != nil {
            return verify(rhs)
        }
        return nil
    }
    send := func() { errc <- p.Send(ctx, hs) }
    receive := func() { errc <- p.handleIncoming(handle) }

    go func() {
        if p.Inbound() {
            receive()
            send()
        } else {
            send()
            receive()
        }
    }()

    for i := 0; i < 2; i++ {
        select {
        case err = <-errc:
        case <-ctx.Done():
            err = ctx.Err()
        }
        if err != nil {
            return nil, errorf(ErrHandshake, err.Error())
        }
    }
    return rhs, nil
}