aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
blob: 4abae8d5e659391dfe8d5ce2e1152107585965b6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Contains the active peer-set of the downloader, maintaining both failures
// as well as reputation metrics to prioritize the block retrievals.

package downloader

import (
    "errors"
    "sync"
    "sync/atomic"

    "github.com/ethereum/go-ethereum/common"
    "gopkg.in/fatih/set.v0"
)

type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error

var (
    errAlreadyFetching   = errors.New("already fetching blocks from peer")
    errAlreadyRegistered = errors.New("peer is already registered")
    errNotRegistered     = errors.New("peer is not registered")
)

// peer represents an active peer from which hashes and blocks are retrieved.
type peer struct {
    id   string      // Unique identifier of the peer
    head common.Hash // Hash of the peers latest known block

    idle int32 // Current activity state of the peer (idle = 0, active = 1)
    rep  int32 // Simple peer reputation (not used currently)

    mu sync.RWMutex

    ignored *set.Set

    getHashes hashFetcherFn
    getBlocks blockFetcherFn
}

// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
    return &peer{
        id:        id,
        head:      head,
        getHashes: getHashes,
        getBlocks: getBlocks,
        ignored:   set.New(),
    }
}

// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
    atomic.StoreInt32(&p.idle, 0)
    p.ignored.Clear()
}

// Fetch sends a block retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error {
    // Short circuit if the peer is already fetching
    if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
        return errAlreadyFetching
    }
    // Convert the hash set to a retrievable slice
    hashes := make([]common.Hash, 0, len(request.Hashes))
    for hash, _ := range request.Hashes {
        hashes = append(hashes, hash)
    }
    p.getBlocks(hashes)

    return nil
}

// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
func (p *peer) SetIdle() {
    atomic.StoreInt32(&p.idle, 0)
}

// Promote increases the peer's reputation.
func (p *peer) Promote() {
    atomic.AddInt32(&p.rep, 1)
}

// Demote decreases the peer's reputation or leaves it at 0.
func (p *peer) Demote() {
    for {
        // Calculate the new reputation value
        prev := atomic.LoadInt32(&p.rep)
        next := prev / 2

        // Try to update the old value
        if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
            return
        }
    }
}

// peerSet represents the collection of active peer participating in the block
// download procedure.
type peerSet struct {
    peers map[string]*peer
    lock  sync.RWMutex
}

// newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet {
    return &peerSet{
        peers: make(map[string]*peer),
    }
}

// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    for _, peer := range ps.peers {
        peer.Reset()
    }
}

// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[p.id]; ok {
        return errAlreadyRegistered
    }
    ps.peers[p.id] = p
    return nil
}

// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[id]; !ok {
        return errNotRegistered
    }
    delete(ps.peers, id)
    return nil
}

// Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return ps.peers[id]
}

// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return len(ps.peers)
}

// AllPeers retrieves a flat list of all the peers within the set.
func (ps *peerSet) AllPeers() []*peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    list := make([]*peer, 0, len(ps.peers))
    for _, p := range ps.peers {
        list = append(list, p)
    }
    return list
}

// IdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
func (ps *peerSet) IdlePeers() []*peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    list := make([]*peer, 0, len(ps.peers))
    for _, p := range ps.peers {
        if atomic.LoadInt32(&p.idle) == 0 {
            list = append(list, p)
        }
    }
    for i := 0; i < len(list); i++ {
        for j := i + 1; j < len(list); j++ {
            if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
                list[i], list[j] = list[j], list[i]
            }
        }
    }
    return list
}