aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/peer.go
blob: df54eecbd464df8a2aa0cdc2e6505075476e479f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// Contains the active peer-set of the downloader, maintaining both failures
// as well as reputation metrics to prioritize the block retrievals.

package downloader

import (
    "errors"
    "math"
    "sync"
    "sync/atomic"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/logger"
    "github.com/ethereum/go-ethereum/logger/glog"
    "gopkg.in/fatih/set.v0"
)

type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error

var (
    errAlreadyFetching   = errors.New("already fetching blocks from peer")
    errAlreadyRegistered = errors.New("peer is already registered")
    errNotRegistered     = errors.New("peer is not registered")
)

// peer represents an active peer from which hashes and blocks are retrieved.
type peer struct {
    id   string      // Unique identifier of the peer
    head common.Hash // Hash of the peers latest known block

    idle int32 // Current activity state of the peer (idle = 0, active = 1)
    rep  int32 // Simple peer reputation

    capacity int32     // Number of blocks allowed to fetch per request
    started  time.Time // Time instance when the last fetch was started

    ignored *set.Set // Set of hashes not to request (didn't have previously)

    getHashes hashFetcherFn  // Method to retrieve a batch of hashes (mockable for testing)
    getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing)
}

// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
    return &peer{
        id:        id,
        head:      head,
        capacity:  1,
        getHashes: getHashes,
        getBlocks: getBlocks,
        ignored:   set.New(),
    }
}

// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
    atomic.StoreInt32(&p.idle, 0)
    atomic.StoreInt32(&p.capacity, 1)
    p.ignored.Clear()
}

// Fetch sends a block retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error {
    // Short circuit if the peer is already fetching
    if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) {
        return errAlreadyFetching
    }
    p.started = time.Now()

    // Convert the hash set to a retrievable slice
    hashes := make([]common.Hash, 0, len(request.Hashes))
    for hash, _ := range request.Hashes {
        hashes = append(hashes, hash)
    }
    p.getBlocks(hashes)

    return nil
}

// SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() {
    // Update the peer's download allowance based on previous performance
    scale := 2.0
    if time.Since(p.started) > blockSoftTTL {
        scale = 0.5
    }
    for {
        // Calculate the new download bandwidth allowance
        prev := atomic.LoadInt32(&p.capacity)
        next := int32(math.Max(1, math.Min(MaxBlockFetch, float64(prev)*scale)))
        if scale < 1 {
            glog.V(logger.Detail).Infof("%s: reducing block allowance from %d to %d", p.id, prev, next)
        }
        // Try to update the old value
        if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
            break
        }
    }
    // Set the peer to idle to allow further block requests
    atomic.StoreInt32(&p.idle, 0)
}

// Capacity retrieves the peers block download allowance based on its previously
// discovered bandwidth capacity.
func (p *peer) Capacity() int {
    return int(atomic.LoadInt32(&p.capacity))
}

// Promote increases the peer's reputation.
func (p *peer) Promote() {
    atomic.AddInt32(&p.rep, 1)
}

// Demote decreases the peer's reputation or leaves it at 0.
func (p *peer) Demote() {
    for {
        // Calculate the new reputation value
        prev := atomic.LoadInt32(&p.rep)
        next := prev / 2

        // Try to update the old value
        if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
            return
        }
    }
}

// peerSet represents the collection of active peer participating in the block
// download procedure.
type peerSet struct {
    peers map[string]*peer
    lock  sync.RWMutex
}

// newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet {
    return &peerSet{
        peers: make(map[string]*peer),
    }
}

// Reset iterates over the current peer set, and resets each of the known peers
// to prepare for a next batch of block retrieval.
func (ps *peerSet) Reset() {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    for _, peer := range ps.peers {
        peer.Reset()
    }
}

// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[p.id]; ok {
        return errAlreadyRegistered
    }
    ps.peers[p.id] = p
    return nil
}

// Unregister removes a remote peer from the active set, disabling any further
// actions to/from that particular entity.
func (ps *peerSet) Unregister(id string) error {
    ps.lock.Lock()
    defer ps.lock.Unlock()

    if _, ok := ps.peers[id]; !ok {
        return errNotRegistered
    }
    delete(ps.peers, id)
    return nil
}

// Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return ps.peers[id]
}

// Len returns if the current number of peers in the set.
func (ps *peerSet) Len() int {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    return len(ps.peers)
}

// AllPeers retrieves a flat list of all the peers within the set.
func (ps *peerSet) AllPeers() []*peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    list := make([]*peer, 0, len(ps.peers))
    for _, p := range ps.peers {
        list = append(list, p)
    }
    return list
}

// IdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
func (ps *peerSet) IdlePeers() []*peer {
    ps.lock.RLock()
    defer ps.lock.RUnlock()

    list := make([]*peer, 0, len(ps.peers))
    for _, p := range ps.peers {
        if atomic.LoadInt32(&p.idle) == 0 {
            list = append(list, p)
        }
    }
    for i := 0; i < len(list); i++ {
        for j := i + 1; j < len(list); j++ {
            if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
                list[i], list[j] = list[j], list[i]
            }
        }
    }
    return list
}