diff options
Diffstat (limited to 'les')
-rw-r--r-- | les/commons.go | 4 | ||||
-rw-r--r-- | les/handler.go | 5 | ||||
-rw-r--r-- | les/helper_test.go | 6 | ||||
-rw-r--r-- | les/peer.go | 4 | ||||
-rw-r--r-- | les/protocol.go | 13 | ||||
-rw-r--r-- | les/serverpool.go | 121 |
6 files changed, 92 insertions, 61 deletions
diff --git a/les/commons.go b/les/commons.go index 0b6cf3711..21fb25714 100644 --- a/les/commons.go +++ b/les/commons.go @@ -26,7 +26,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" ) @@ -63,7 +63,7 @@ func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol { Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { return c.protocolManager.runPeer(version, p, rw) }, - PeerInfo: func(id discover.NodeID) interface{} { + PeerInfo: func(id enode.ID) interface{} { if p := c.protocolManager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } diff --git a/les/handler.go b/les/handler.go index 243a6dabd..19ccbcd2b 100644 --- a/les/handler.go +++ b/les/handler.go @@ -213,8 +213,7 @@ func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWrit var entry *poolEntry peer := pm.newPeer(int(version), pm.networkId, p, rw) if pm.serverPool != nil { - addr := p.RemoteAddr().(*net.TCPAddr) - entry = pm.serverPool.connect(peer, addr.IP, uint16(addr.Port)) + entry = pm.serverPool.connect(peer, peer.Node()) } peer.poolEntry = entry select { @@ -382,7 +381,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } if p.requestAnnounceType == announceTypeSigned { - if err := req.checkSignature(p.pubKey); err != nil { + if err := req.checkSignature(p.ID()); err != nil { p.Log().Trace("Invalid announcement signature", "err", err) return err } diff --git a/les/helper_test.go b/les/helper_test.go index 29496d6af..b46d41f17 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -38,7 +38,7 @@ import ( "github.com/ethereum/go-ethereum/les/flowcontrol" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" ) @@ -221,7 +221,7 @@ func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, sh app, net := p2p.MsgPipe() // Generate a random id and create the peer - var id discover.NodeID + var id enode.ID rand.Read(id[:]) peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) @@ -258,7 +258,7 @@ func newTestPeerPair(name string, version int, pm, pm2 *ProtocolManager) (*peer, app, net := p2p.MsgPipe() // Generate a random id and create the peer - var id discover.NodeID + var id enode.ID rand.Read(id[:]) peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) diff --git a/les/peer.go b/les/peer.go index 70c863c2f..1f343847e 100644 --- a/les/peer.go +++ b/les/peer.go @@ -18,7 +18,6 @@ package les import ( - "crypto/ecdsa" "errors" "fmt" "math/big" @@ -51,7 +50,6 @@ const ( type peer struct { *p2p.Peer - pubKey *ecdsa.PublicKey rw p2p.MsgReadWriter @@ -80,11 +78,9 @@ type peer struct { func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() - pubKey, _ := id.Pubkey() return &peer{ Peer: p, - pubKey: pubKey, rw: rw, version: version, network: network, diff --git a/les/protocol.go b/les/protocol.go index ee4c22398..0b24f5aed 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -18,9 +18,7 @@ package les import ( - "bytes" "crypto/ecdsa" - "crypto/elliptic" "errors" "fmt" "io" @@ -30,7 +28,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/secp256k1" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" ) @@ -148,21 +146,20 @@ func (a *announceData) sign(privKey *ecdsa.PrivateKey) { } // checkSignature verifies if the block announcement has a valid signature by the given pubKey -func (a *announceData) checkSignature(pubKey *ecdsa.PublicKey) error { +func (a *announceData) checkSignature(id enode.ID) error { var sig []byte if err := a.Update.decode().get("sign", &sig); err != nil { return err } rlp, _ := rlp.EncodeToBytes(announceBlock{a.Hash, a.Number, a.Td}) - recPubkey, err := secp256k1.RecoverPubkey(crypto.Keccak256(rlp), sig) + recPubkey, err := crypto.SigToPub(crypto.Keccak256(rlp), sig) if err != nil { return err } - pbytes := elliptic.Marshal(pubKey.Curve, pubKey.X, pubKey.Y) - if bytes.Equal(pbytes, recPubkey) { + if id == enode.PubkeyToIDV4(recPubkey) { return nil } - return errors.New("Wrong signature") + return errors.New("wrong signature") } type blockInfo struct { diff --git a/les/serverpool.go b/les/serverpool.go index 1a4c75229..0fe6e49b6 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -18,6 +18,7 @@ package les import ( + "crypto/ecdsa" "fmt" "io" "math" @@ -28,11 +29,12 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" ) @@ -90,8 +92,7 @@ const ( // connReq represents a request for peer connection. type connReq struct { p *peer - ip net.IP - port uint16 + node *enode.Node result chan *poolEntry } @@ -122,10 +123,10 @@ type serverPool struct { topic discv5.Topic discSetPeriod chan time.Duration - discNodes chan *discv5.Node + discNodes chan *enode.Node discLookups chan bool - entries map[discover.NodeID]*poolEntry + entries map[enode.ID]*poolEntry timeout, enableRetry chan *poolEntry adjustStats chan poolStatAdjust @@ -145,7 +146,7 @@ func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *s db: db, quit: quit, wg: wg, - entries: make(map[discover.NodeID]*poolEntry), + entries: make(map[enode.ID]*poolEntry), timeout: make(chan *poolEntry, 1), adjustStats: make(chan poolStatAdjust, 100), enableRetry: make(chan *poolEntry, 1), @@ -170,22 +171,38 @@ func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { if pool.server.DiscV5 != nil { pool.discSetPeriod = make(chan time.Duration, 1) - pool.discNodes = make(chan *discv5.Node, 100) + pool.discNodes = make(chan *enode.Node, 100) pool.discLookups = make(chan bool, 100) - go pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, pool.discNodes, pool.discLookups) + go pool.discoverNodes() } pool.checkDial() go pool.eventLoop() } +// discoverNodes wraps SearchTopic, converting result nodes to enode.Node. +func (pool *serverPool) discoverNodes() { + ch := make(chan *discv5.Node) + go func() { + pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups) + close(ch) + }() + for n := range ch { + pubkey, err := decodePubkey64(n.ID[:]) + if err != nil { + continue + } + pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP)) + } +} + // connect should be called upon any incoming connection. If the connection has been // dialed by the server pool recently, the appropriate pool entry is returned. // Otherwise, the connection should be rejected. // Note that whenever a connection has been accepted and a pool entry has been returned, // disconnect should also always be called. -func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { +func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry { log.Debug("Connect new entry", "enode", p.id) - req := &connReq{p: p, ip: ip, port: port, result: make(chan *poolEntry, 1)} + req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)} select { case pool.connCh <- req: case <-pool.quit: @@ -196,7 +213,7 @@ func (pool *serverPool) connect(p *peer, ip net.IP, port uint16) *poolEntry { // registered should be called after a successful handshake func (pool *serverPool) registered(entry *poolEntry) { - log.Debug("Registered new entry", "enode", entry.id) + log.Debug("Registered new entry", "enode", entry.node.ID()) req := ®isterReq{entry: entry, done: make(chan struct{})} select { case pool.registerCh <- req: @@ -216,7 +233,7 @@ func (pool *serverPool) disconnect(entry *poolEntry) { stopped = true default: } - log.Debug("Disconnected old entry", "enode", entry.id) + log.Debug("Disconnected old entry", "enode", entry.node.ID()) req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})} // Block until disconnection request is served. @@ -320,7 +337,7 @@ func (pool *serverPool) eventLoop() { } case node := <-pool.discNodes: - entry := pool.findOrNewNode(discover.NodeID(node.ID), node.IP, node.TCP) + entry := pool.findOrNewNode(node) pool.updateCheckDial(entry) case conv := <-pool.discLookups: @@ -341,7 +358,7 @@ func (pool *serverPool) eventLoop() { // Handle peer connection requests. entry := pool.entries[req.p.ID()] if entry == nil { - entry = pool.findOrNewNode(req.p.ID(), req.ip, req.port) + entry = pool.findOrNewNode(req.node) } if entry.state == psConnected || entry.state == psRegistered { req.result <- nil @@ -351,8 +368,8 @@ func (pool *serverPool) eventLoop() { entry.peer = req.p entry.state = psConnected addr := &poolEntryAddress{ - ip: req.ip, - port: req.port, + ip: req.node.IP(), + port: uint16(req.node.TCP()), lastSeen: mclock.Now(), } entry.lastConnected = addr @@ -401,18 +418,18 @@ func (pool *serverPool) eventLoop() { } } -func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16) *poolEntry { +func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { now := mclock.Now() - entry := pool.entries[id] + entry := pool.entries[node.ID()] if entry == nil { - log.Debug("Discovered new entry", "id", id) + log.Debug("Discovered new entry", "id", node.ID()) entry = &poolEntry{ - id: id, + node: node, addr: make(map[string]*poolEntryAddress), addrSelect: *newWeightedRandomSelect(), shortRetry: shortRetryCnt, } - pool.entries[id] = entry + pool.entries[node.ID()] = entry // initialize previously unknown peers with good statistics to give a chance to prove themselves entry.connectStats.add(1, initStatsWeight) entry.delayStats.add(0, initStatsWeight) @@ -420,10 +437,7 @@ func (pool *serverPool) findOrNewNode(id discover.NodeID, ip net.IP, port uint16 entry.timeoutStats.add(0, initStatsWeight) } entry.lastDiscovered = now - addr := &poolEntryAddress{ - ip: ip, - port: port, - } + addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())} if a, ok := entry.addr[addr.strKey()]; ok { addr = a } else { @@ -450,12 +464,12 @@ func (pool *serverPool) loadNodes() { return } for _, e := range list { - log.Debug("Loaded server stats", "id", e.id, "fails", e.lastConnected.fails, + log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails, "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight), "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight), "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight), "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight)) - pool.entries[e.id] = e + pool.entries[e.node.ID()] = e pool.knownQueue.setLatest(e) pool.knownSelect.update((*knownEntry)(e)) } @@ -481,7 +495,7 @@ func (pool *serverPool) removeEntry(entry *poolEntry) { pool.newSelect.remove((*discoveredEntry)(entry)) pool.knownSelect.remove((*knownEntry)(entry)) entry.removed = true - delete(pool.entries, entry.id) + delete(pool.entries, entry.node.ID()) } // setRetryDial starts the timer which will enable dialing a certain node again @@ -559,10 +573,10 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { pool.newSelected++ } addr := entry.addrSelect.choose().(*poolEntryAddress) - log.Debug("Dialing new peer", "lesaddr", entry.id.String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) + log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) entry.dialed = addr go func() { - pool.server.AddPeer(discover.NewNode(entry.id, addr.ip, addr.port, addr.port)) + pool.server.AddPeer(entry.node) select { case <-pool.quit: case <-time.After(dialTimeout): @@ -580,7 +594,7 @@ func (pool *serverPool) checkDialTimeout(entry *poolEntry) { if entry.state != psDialed { return } - log.Debug("Dial timeout", "lesaddr", entry.id.String()+"@"+entry.dialed.strKey()) + log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey()) entry.state = psNotConnected if entry.knownSelected { pool.knownSelected-- @@ -602,8 +616,9 @@ const ( // poolEntry represents a server node and stores its current state and statistics. type poolEntry struct { peer *peer - id discover.NodeID + pubkey [64]byte // secp256k1 key of the node addr map[string]*poolEntryAddress + node *enode.Node lastConnected, dialed *poolEntryAddress addrSelect weightedRandomSelect @@ -620,23 +635,39 @@ type poolEntry struct { shortRetry int } +// poolEntryEnc is the RLP encoding of poolEntry. +type poolEntryEnc struct { + Pubkey []byte + IP net.IP + Port uint16 + Fails uint + CStat, DStat, RStat, TStat poolStats +} + func (e *poolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.id, e.lastConnected.ip, e.lastConnected.port, e.lastConnected.fails, &e.connectStats, &e.delayStats, &e.responseStats, &e.timeoutStats}) + return rlp.Encode(w, &poolEntryEnc{ + Pubkey: encodePubkey64(e.node.Pubkey()), + IP: e.lastConnected.ip, + Port: e.lastConnected.port, + Fails: e.lastConnected.fails, + CStat: e.connectStats, + DStat: e.delayStats, + RStat: e.responseStats, + TStat: e.timeoutStats, + }) } func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { - var entry struct { - ID discover.NodeID - IP net.IP - Port uint16 - Fails uint - CStat, DStat, RStat, TStat poolStats - } + var entry poolEntryEnc if err := s.Decode(&entry); err != nil { return err } + pubkey, err := decodePubkey64(entry.Pubkey) + if err != nil { + return err + } addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} - e.id = entry.ID + e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr e.addrSelect = *newWeightedRandomSelect() @@ -651,6 +682,14 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { return nil } +func encodePubkey64(pub *ecdsa.PublicKey) []byte { + return crypto.FromECDSAPub(pub)[:1] +} + +func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) { + return crypto.UnmarshalPubkey(append([]byte{0x04}, b...)) +} + // discoveredEntry implements wrsItem type discoveredEntry poolEntry |