aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
Diffstat (limited to 'les')
-rw-r--r--les/commons.go4
-rw-r--r--les/handler.go5
-rw-r--r--les/helper_test.go6
-rw-r--r--les/peer.go4
-rw-r--r--les/protocol.go13
-rw-r--r--les/serverpool.go121
6 files changed, 92 insertions, 61 deletions
diff --git a/les/commons.go b/les/commons.go
index 0b6cf3711..21fb25714 100644
--- a/les/commons.go
+++ b/les/commons.go
@@ -26,7 +26,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)
@@ -63,7 +63,7 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol {
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
return c.protocolManager.runPeer(version, p, rw)
},
- PeerInfo: func(id discover.NodeID) interface{} {
+ PeerInfo: func(id enode.ID) interface{} {
if p := c.protocolManager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
diff --git a/les/handler.go b/les/handler.go
index 243a6dabd..19ccbcd2b 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -213,8 +213,7 @@ func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWrit
var entry *poolEntry
peer := pm.newPeer(int(version), pm.networkId, p, rw)
if pm.serverPool != nil {
- addr := p.RemoteAddr().(*net.TCPAddr)
- entry = pm.serverPool.connect(peer, addr.IP, uint16(addr.Port))
+ entry = pm.serverPool.connect(peer, peer.Node())
}
peer.poolEntry = entry
select {
@@ -382,7 +381,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
if p.requestAnnounceType == announceTypeSigned {
- if err := req.checkSignature(p.pubKey); err != nil {
+ if err := req.checkSignature(p.ID()); err != nil {
p.Log().Trace("Invalid announcement signature", "err", err)
return err
}
diff --git a/les/helper_test.go b/les/helper_test.go
index 29496d6af..b46d41f17 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -38,7 +38,7 @@ import (
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
)
@@ -221,7 +221,7 @@ func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, sh
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
- var id discover.NodeID
+ var id enode.ID
rand.Read(id[:])
peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
@@ -258,7 +258,7 @@ func newTestPeerPair(name string, version int, pm, pm2 *ProtocolManager) (*peer,
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
- var id discover.NodeID
+ var id enode.ID
rand.Read(id[:])
peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
diff --git a/les/peer.go b/les/peer.go
index 70c863c2f..1f343847e 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -18,7 +18,6 @@
package les
import (
- "crypto/ecdsa"
"errors"
"fmt"
"math/big"
@@ -51,7 +50,6 @@ const (
type peer struct {
*p2p.Peer
- pubKey *ecdsa.PublicKey
rw p2p.MsgReadWriter
@@ -80,11 +78,9 @@ type peer struct {
func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
id := p.ID()
- pubKey, _ := id.Pubkey()
return &peer{
Peer: p,
- pubKey: pubKey,
rw: rw,
version: version,
network: network,
diff --git a/les/protocol.go b/les/protocol.go
index ee4c22398..0b24f5aed 100644
--- a/les/protocol.go
+++ b/les/protocol.go
@@ -18,9 +18,7 @@
package les
import (
- "bytes"
"crypto/ecdsa"
- "crypto/elliptic"
"errors"
"fmt"
"io"
@@ -30,7 +28,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/crypto/secp256k1"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -148,21 +146,20 @@ func (a *announceData) sign(privKey *ecdsa.PrivateKey) {
}
// checkSignature verifies if the block announcement has a valid signature by the given pubKey
-func (a *announceData) checkSignature(pubKey *ecdsa.PublicKey) error {
+func (a *announceData) checkSignature(id enode.ID) error {
var sig []byte
if err := a.Update.decode().get("sign", &sig); err != nil {
return err
}
rlp, _ := rlp.EncodeToBytes(announceBlock{a.Hash, a.Number, a.Td})
- recPubkey, err := secp256k1.RecoverPubkey(crypto.Keccak256(rlp), sig)
+ recPubkey, err := crypto.SigToPub(crypto.Keccak256(rlp), sig)
if err != nil {
return err
}
- pbytes := elliptic.Marshal(pubKey.Curve, pubKey.X, pubKey.Y)
- if bytes.Equal(pbytes, recPubkey) {
+ if id == enode.PubkeyToIDV4(recPubkey) {
return nil
}
- return errors.New("Wrong signature")
+ return errors.New("wrong signature")
}
type blockInfo struct {
diff --git a/les/serverpool.go b/les/serverpool.go
index 1a4c75229..0fe6e49b6 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -18,6 +18,7 @@
package les
import (
+ "crypto/ecdsa"
"fmt"
"io"
"math"
@@ -28,11 +29,12 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -90,8 +92,7 @@ const (
// connReq represents a request for peer connection.
type connReq struct {
p *peer
- ip net.IP
- port uint16
+ node *enode.Node
result chan *poolEntry
}
@@ -122,10 +123,10 @@ type serverPool struct {
topic discv5.Topic
discSetPeriod chan time.Duration
- discNodes chan *discv5.Node
+ discNodes chan *enode.Node
discLookups chan bool
- entries map[discover.NodeID]*poolEntry
+ entries map[enode.ID]*poolEntry
timeout, enableRetry chan *poolEntry
adjustStats chan poolStatAdjust
@@ -145,7 +146,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s
db: db,
quit: quit,
wg: wg,
- entries: make(map[discover.NodeID]*poolEntry),
+ entries: make(map[enode.ID]*poolEntry),
timeout: make(chan *poolEntry, 1),
adjustStats: make(chan poolStatAdjust, 100),
enableRetry: make(chan *poolEntry, 1),
@@ -170,22 +171,38 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) {
if pool.server.DiscV5 != nil {
pool.discSetPeriod = make(chan time.Duration, 1)
- pool.discNodes = make(chan *discv5.Node, 100)
+ pool.discNodes = make(chan *enode.Node, 100)
pool.discLookups = make(chan bool, 100)
- go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups)
+ go pool.discoverNodes()
}
pool.checkDial()
go pool.eventLoop()
}
+// discoverNodes wraps SearchTopic, converting result nodes to enode.Node.
+func (pool *serverPool) discoverNodes() {
+ ch := make(chan *discv5.Node)
+ go func() {
+ pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups)
+ close(ch)
+ }()
+ for n := range ch {
+ pubkey, err := decodePubkey64(n.ID[:])
+ if err != nil {
+ continue
+ }
+ pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP))
+ }
+}
+
// connect should be called upon any incoming connection. If the connection has been
// dialed by the server pool recently, the appropriate pool entry is returned.
// Otherwise, the connection should be rejected.
// Note that whenever a connection has been accepted and a pool entry has been returned,
// disconnect should also always be called.
-func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry {
log.Debug("Connect new entry", "enode", p.id)
- req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)}
+ req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)}
select {
case pool.connCh <- req:
case <-pool.quit:
@@ -196,7 +213,7 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry {
// registered should be called after a successful handshake
func (pool *serverPool) registered(entry *poolEntry) {
- log.Debug("Registered new entry", "enode", entry.id)
+ log.Debug("Registered new entry", "enode", entry.node.ID())
req := &registerReq{entry: entry, done: make(chan struct{})}
select {
case pool.registerCh <- req:
@@ -216,7 +233,7 @@ func (pool *serverPool) disconnect(entry *poolEntry) {
stopped = true
default:
}
- log.Debug("Disconnected old entry", "enode", entry.id)
+ log.Debug("Disconnected old entry", "enode", entry.node.ID())
req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})}
// Block until disconnection request is served.
@@ -320,7 +337,7 @@ func (pool *serverPool) eventLoop() {
}
case node := <-pool.discNodes:
- entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP)
+ entry := pool.findOrNewNode(node)
pool.updateCheckDial(entry)
case conv := <-pool.discLookups:
@@ -341,7 +358,7 @@ func (pool *serverPool) eventLoop() {
// Handle peer connection requests.
entry := pool.entries[req.p.ID()]
if entry == nil {
- entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port)
+ entry = pool.findOrNewNode(req.node)
}
if entry.state == psConnected || entry.state == psRegistered {
req.result <- nil
@@ -351,8 +368,8 @@ func (pool *serverPool) eventLoop() {
entry.peer = req.p
entry.state = psConnected
addr := &poolEntryAddress{
- ip: req.ip,
- port: req.port,
+ ip: req.node.IP(),
+ port: uint16(req.node.TCP()),
lastSeen: mclock.Now(),
}
entry.lastConnected = addr
@@ -401,18 +418,18 @@ func (pool *serverPool) eventLoop() {
}
}
-func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry {
+func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry {
now := mclock.Now()
- entry := pool.entries[id]
+ entry := pool.entries[node.ID()]
if entry == nil {
- log.Debug("Discovered new entry", "id", id)
+ log.Debug("Discovered new entry", "id", node.ID())
entry = &poolEntry{
- id: id,
+ node: node,
addr: make(map[string]*poolEntryAddress),
addrSelect: *newWeightedRandomSelect(),
shortRetry: shortRetryCnt,
}
- pool.entries[id] = entry
+ pool.entries[node.ID()] = entry
// initialize previously unknown peers with good statistics to give a chance to prove themselves
entry.connectStats.add(1, initStatsWeight)
entry.delayStats.add(0, initStatsWeight)
@@ -420,10 +437,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16
entry.timeoutStats.add(0, initStatsWeight)
}
entry.lastDiscovered = now
- addr := &poolEntryAddress{
- ip: ip,
- port: port,
- }
+ addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())}
if a, ok := entry.addr[addr.strKey()]; ok {
addr = a
} else {
@@ -450,12 +464,12 @@ func (pool *serverPool) loadNodes() {
return
}
for _, e := range list {
- log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails,
+ log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails,
"conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight),
"delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight),
"response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight),
"timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight))
- pool.entries[e.id] = e
+ pool.entries[e.node.ID()] = e
pool.knownQueue.setLatest(e)
pool.knownSelect.update((*knownEntry)(e))
}
@@ -481,7 +495,7 @@ func (pool *serverPool) removeEntry(entry *poolEntry) {
pool.newSelect.remove((*discoveredEntry)(entry))
pool.knownSelect.remove((*knownEntry)(entry))
entry.removed = true
- delete(pool.entries, entry.id)
+ delete(pool.entries, entry.node.ID())
}
// setRetryDial starts the timer which will enable dialing a certain node again
@@ -559,10 +573,10 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) {
pool.newSelected++
}
addr := entry.addrSelect.choose().(*poolEntryAddress)
- log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
+ log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected)
entry.dialed = addr
go func() {
- pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port))
+ pool.server.AddPeer(entry.node)
select {
case <-pool.quit:
case <-time.After(dialTimeout):
@@ -580,7 +594,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) {
if entry.state != psDialed {
return
}
- log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey())
+ log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey())
entry.state = psNotConnected
if entry.knownSelected {
pool.knownSelected--
@@ -602,8 +616,9 @@ const (
// poolEntry represents a server node and stores its current state and statistics.
type poolEntry struct {
peer *peer
- id discover.NodeID
+ pubkey [64]byte // secp256k1 key of the node
addr map[string]*poolEntryAddress
+ node *enode.Node
lastConnected, dialed *poolEntryAddress
addrSelect weightedRandomSelect
@@ -620,23 +635,39 @@ type poolEntry struct {
shortRetry int
}
+// poolEntryEnc is the RLP encoding of poolEntry.
+type poolEntryEnc struct {
+ Pubkey []byte
+ IP net.IP
+ Port uint16
+ Fails uint
+ CStat, DStat, RStat, TStat poolStats
+}
+
func (e *poolEntry) EncodeRLP(w io.Writer) error {
- return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats})
+ return rlp.Encode(w, &poolEntryEnc{
+ Pubkey: encodePubkey64(e.node.Pubkey()),
+ IP: e.lastConnected.ip,
+ Port: e.lastConnected.port,
+ Fails: e.lastConnected.fails,
+ CStat: e.connectStats,
+ DStat: e.delayStats,
+ RStat: e.responseStats,
+ TStat: e.timeoutStats,
+ })
}
func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
- var entry struct {
- ID discover.NodeID
- IP net.IP
- Port uint16
- Fails uint
- CStat, DStat, RStat, TStat poolStats
- }
+ var entry poolEntryEnc
if err := s.Decode(&entry); err != nil {
return err
}
+ pubkey, err := decodePubkey64(entry.Pubkey)
+ if err != nil {
+ return err
+ }
addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()}
- e.id = entry.ID
+ e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port))
e.addr = make(map[string]*poolEntryAddress)
e.addr[addr.strKey()] = addr
e.addrSelect = *newWeightedRandomSelect()
@@ -651,6 +682,14 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error {
return nil
}
+func encodePubkey64(pub *ecdsa.PublicKey) []byte {
+ return crypto.FromECDSAPub(pub)[:1]
+}
+
+func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) {
+ return crypto.UnmarshalPubkey(append([]byte{0x04}, b...))
+}
+
// discoveredEntry implements wrsItem
type discoveredEntry poolEntry