diff options
author | Sonic <sonic@dexon.org> | 2019-01-07 15:38:59 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:56 +0800 |
commit | 3e405c7fdc936d3cdb516273105efc233144965c (patch) | |
tree | e6da7f943ea073a66b85b48478c08ac10230f233 | |
parent | 7ee984823b496de502fcf5ec334bf292cf86eea9 (diff) | |
download | dexon-3e405c7fdc936d3cdb516273105efc233144965c.tar.gz dexon-3e405c7fdc936d3cdb516273105efc233144965c.tar.zst dexon-3e405c7fdc936d3cdb516273105efc233144965c.zip |
dex: replace NodeMeta with ENR (#132)
-rw-r--r-- | dex/backend.go | 2 | ||||
-rw-r--r-- | dex/handler.go | 108 | ||||
-rw-r--r-- | dex/nodetable.go | 71 | ||||
-rw-r--r-- | dex/nodetable_test.go | 97 | ||||
-rw-r--r-- | dex/peer.go | 80 | ||||
-rw-r--r-- | dex/peer_test.go | 4 | ||||
-rw-r--r-- | dex/protocol.go | 2 | ||||
-rw-r--r-- | dex/protocol_test.go | 67 | ||||
-rw-r--r-- | dex/sync.go | 60 |
9 files changed, 236 insertions, 255 deletions
diff --git a/dex/backend.go b/dex/backend.go index 7b1f4905d..c7e0773ad 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -245,7 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error { } // Start the networking layer and the light server if requested s.protocolManager.Start(srvr, maxPeers) - s.protocolManager.addSelfMeta() + s.protocolManager.addSelfRecord() return nil } diff --git a/dex/handler.go b/dex/handler.go index 8a60c2a56..66a0fd7fd 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -61,6 +61,7 @@ import ( "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" ) @@ -75,7 +76,7 @@ const ( finalizedBlockChanSize = 128 - metaChanSize = 10240 + recordChanSize = 10240 maxPullPeers = 3 ) @@ -108,18 +109,18 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + recordsCh chan newRecordsEvent + recordsSub event.Subscription // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - metasyncCh chan *metasync - quitSync chan struct{} - noMorePeers chan struct{} + newPeerCh chan *peer + txsyncCh chan *txsync + recordsyncCh chan *recordsync + quitSync chan struct{} + noMorePeers chan struct{} // channels for peerSetLoop chainHeadCh chan core.ChainHeadEvent @@ -163,7 +164,7 @@ func NewProtocolManager( newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), + recordsyncCh: make(chan *recordsync), quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, @@ -272,10 +273,10 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { go pm.finalizedBlockBroadcastLoop() } - // broadcast node metas - pm.metasCh = make(chan newMetasEvent, metaChanSize) - pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) - go pm.metaBroadcastLoop() + // broadcast node records + pm.recordsCh = make(chan newRecordsEvent, recordChanSize) + pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh) + go pm.recordBroadcastLoop() // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) @@ -285,37 +286,12 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { // start sync handlers go pm.syncer() go pm.txsyncLoop() - go pm.metasyncLoop() + go pm.recordsyncLoop() } -func (pm *ProtocolManager) addSelfMeta() { - pm.nodeTable.Add([]*NodeMeta{pm.makeSelfNodeMeta()}) -} - -func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { - self := pm.srvr.Self() - meta := &NodeMeta{ - ID: self.ID(), - IP: self.IP(), - UDP: uint(self.UDP()), - TCP: uint(self.TCP()), - Timestamp: uint64(time.Now().Unix()), - } - - h := rlpHash([]interface{}{ - meta.ID, - meta.IP, - meta.UDP, - meta.TCP, - meta.Timestamp, - }) - sig, err := crypto.Sign(h[:], pm.srvr.GetPrivateKey()) - if err != nil { - panic(err) - } - meta.Sig = sig - return meta +func (pm *ProtocolManager) addSelfRecord() { + pm.nodeTable.AddRecords([]*enr.Record{pm.srvr.Self().Record()}) } func (pm *ProtocolManager) Stop() { @@ -388,7 +364,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) - pm.syncNodeMetas(p) + pm.syncNodeRecords(p) // main loop. handle incoming messages. for { @@ -784,18 +760,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs) pm.txpool.AddRemotes(txs) - case msg.Code == MetaMsg: - var metas []*NodeMeta - if err := msg.Decode(&metas); err != nil { + case msg.Code == RecordMsg: + var records []*enr.Record + if err := msg.Decode(&records); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - for i, meta := range metas { - if meta == nil { - return errResp(ErrDecode, "node meta %d is nil", i) + for i, record := range records { + if record == nil { + return errResp(ErrDecode, "node record %d is nil", i) } - p.MarkNodeMeta(meta.Hash()) + p.MarkNodeRecord(rlpHash(record)) } - pm.nodeTable.Add(metas) + pm.nodeTable.AddRecords(records) // Block proposer-only messages. @@ -979,20 +955,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastMetas will propagate node metas to its peers. -func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { - var metaset = make(map[*peer][]*NodeMeta) +// BroadcastRecords will propagate node records to its peers. +func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { + var recordset = make(map[*peer][]*enr.Record) - for _, meta := range metas { - peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, record := range records { + peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record)) for _, peer := range peers { - metaset[peer] = append(metaset[peer], meta) + recordset[peer] = append(recordset[peer], record) } - log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + log.Trace("Broadcast record", "recipients", len(peers)) } - for peer, metas := range metaset { - peer.AsyncSendNodeMetas(metas) + for peer, records := range recordset { + peer.AsyncSendNodeRecords(records) } } @@ -1178,14 +1154,14 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { } } -func (pm *ProtocolManager) metaBroadcastLoop() { +func (pm *ProtocolManager) recordBroadcastLoop() { for { select { - case event := <-pm.metasCh: - pm.BroadcastMetas(event.Metas) + case event := <-pm.recordsCh: + pm.BroadcastRecords(event.Records) // Err() channel will be closed when unsubscribing. - case <-pm.metasSub.Err(): + case <-pm.recordsSub.Err(): return } } diff --git a/dex/nodetable.go b/dex/nodetable.go index 9fe1d6b71..12cc9ba46 100644 --- a/dex/nodetable.go +++ b/dex/nodetable.go @@ -1,82 +1,71 @@ package dex import ( - "net" "sync" - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/crypto/sha3" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/rlp" + "github.com/dexon-foundation/dexon/p2p/enr" ) -type NodeMeta struct { - ID enode.ID - IP net.IP - UDP uint - TCP uint - Timestamp uint64 - Sig []byte -} - -func (n *NodeMeta) Hash() (h common.Hash) { - hw := sha3.NewKeccak256() - rlp.Encode(hw, n) - hw.Sum(h[:0]) - return h -} - -type newMetasEvent struct{ Metas []*NodeMeta } +type newRecordsEvent struct{ Records []*enr.Record } type nodeTable struct { mu sync.RWMutex - entry map[enode.ID]*NodeMeta + entry map[enode.ID]*enode.Node feed event.Feed } func newNodeTable() *nodeTable { return &nodeTable{ - entry: make(map[enode.ID]*NodeMeta), + entry: make(map[enode.ID]*enode.Node), } } -func (t *nodeTable) Get(id enode.ID) *NodeMeta { +func (t *nodeTable) GetNode(id enode.ID) *enode.Node { t.mu.RLock() defer t.mu.RUnlock() return t.entry[id] } -func (t *nodeTable) Add(metas []*NodeMeta) { +func (t *nodeTable) AddRecords(records []*enr.Record) { t.mu.Lock() defer t.mu.Unlock() - var newMetas []*NodeMeta - for _, meta := range metas { - // TODO: validate the meta - if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp { + var newRecords []*enr.Record + for _, record := range records { + node, err := enode.New(enode.ValidSchemes, record) + if err != nil { + log.Error("invalid node record", "err", err) + return + } + + if n, ok := t.entry[node.ID()]; ok && n.Seq() >= node.Seq() { + log.Debug("Ignore new record, already exists", "id", node.ID().String(), + "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP()) continue } - t.entry[meta.ID] = meta - newMetas = append(newMetas, meta) - log.Trace("add new node meta", "id", meta.ID[:8], - "ip", meta.IP, "udp", meta.UDP, "tcp", meta.TCP) + + t.entry[node.ID()] = node + newRecords = append(newRecords, record) + log.Debug("Add new record to node table", "id", node.ID().String(), + "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP()) } - t.feed.Send(newMetasEvent{newMetas}) + t.feed.Send(newRecordsEvent{newRecords}) } -func (t *nodeTable) Metas() []*NodeMeta { +func (t *nodeTable) Records() []*enr.Record { t.mu.RLock() defer t.mu.RUnlock() - metas := make([]*NodeMeta, 0, len(t.entry)) - for _, meta := range t.entry { - metas = append(metas, meta) + records := make([]*enr.Record, 0, len(t.entry)) + for _, node := range t.entry { + records = append(records, node.Record()) } - return metas + return records } -func (t *nodeTable) SubscribeNewMetasEvent( - ch chan<- newMetasEvent) event.Subscription { +func (t *nodeTable) SubscribeNewRecordsEvent( + ch chan<- newRecordsEvent) event.Subscription { return t.feed.Subscribe(ch) } diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go index da00098a9..06078a0d8 100644 --- a/dex/nodetable_test.go +++ b/dex/nodetable_test.go @@ -9,85 +9,90 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/p2p/enr" ) func TestNodeTable(t *testing.T) { table := newNodeTable() - ch := make(chan newMetasEvent) - table.SubscribeNewMetasEvent(ch) + ch := make(chan newRecordsEvent) + table.SubscribeNewRecordsEvent(ch) - metas1 := []*NodeMeta{ - {ID: randomID()}, - {ID: randomID()}, + records1 := []*enr.Record{ + randomNode().Record(), + randomNode().Record(), } - metas2 := []*NodeMeta{ - {ID: randomID()}, - {ID: randomID()}, + records2 := []*enr.Record{ + randomNode().Record(), + randomNode().Record(), } - go table.Add(metas1) + go table.AddRecords(records1) select { - case newMetas := <-ch: + case newRecords := <-ch: m := map[common.Hash]struct{}{} - for _, meta := range newMetas.Metas { - m[meta.Hash()] = struct{}{} + for _, record := range newRecords.Records { + m[rlpHash(record)] = struct{}{} } - if len(m) != len(metas1) { + if len(m) != len(records1) { t.Errorf("len mismatch: got %d, want: %d", - len(m), len(metas1)) + len(m), len(records1)) } - for _, meta := range metas1 { - if _, ok := m[meta.Hash()]; !ok { - t.Errorf("expected meta (%s) not exists", meta.Hash()) + for _, record := range records1 { + if _, ok := m[rlpHash(record)]; !ok { + t.Errorf("expected record (%s) not exists", rlpHash(record)) } } case <-time.After(1 * time.Second): - t.Error("did not receive new metas event within one second") + t.Error("did not receive new records event within one second") } - go table.Add(metas2) + go table.AddRecords(records2) select { - case newMetas := <-ch: + case newRecords := <-ch: m := map[common.Hash]struct{}{} - for _, meta := range newMetas.Metas { - m[meta.Hash()] = struct{}{} + for _, record := range newRecords.Records { + m[rlpHash(record)] = struct{}{} } - if len(m) != len(metas1) { + if len(m) != len(records2) { t.Errorf("len mismatch: got %d, want: %d", - len(m), len(metas2)) + len(m), len(records2)) } - for _, meta := range metas2 { - if _, ok := m[meta.Hash()]; !ok { - t.Errorf("expected meta (%s) not exists", meta.Hash()) + for _, record := range records2 { + if _, ok := m[rlpHash(record)]; !ok { + t.Errorf("expected record (%s) not exists", rlpHash(record)) } } case <-time.After(1 * time.Second): - t.Error("did not receive new metas event within one second") + t.Error("did not receive new records event within one second") } - var metas []*NodeMeta - metas = append(metas, metas1...) - metas = append(metas, metas2...) - allMetas := table.Metas() - if len(allMetas) != len(metas) { + var records []*enr.Record + records = append(records, records1...) + records = append(records, records2...) + allRecords := table.Records() + if len(allRecords) != len(records) { t.Errorf("all metas num mismatch: got %d, want %d", - len(metas), len(allMetas)) + len(records), len(allRecords)) } - for _, m := range metas { - if m.Hash() != table.Get(m.ID).Hash() { - t.Errorf("meta (%s) mismatch", m.ID.String()) + for _, r := range records { + n, err := enode.New(enode.V4ID{}, r) + if err != nil { + t.Errorf(err.Error()) + } + if rlpHash(r) != rlpHash(table.GetNode(n.ID()).Record()) { + t.Errorf("record (%s) mismatch", n.ID().String()) } } } -func randomEnode() *enode.Node { +func randomNode() *enode.Node { var err error var privkey *ecdsa.PrivateKey for { @@ -96,9 +101,21 @@ func randomEnode() *enode.Node { break } } - return enode.NewV4(&privkey.PublicKey, net.IP{}, 0, 0) + var r enr.Record + r.Set(enr.IP(net.IP{})) + r.Set(enr.UDP(0)) + r.Set(enr.TCP(0)) + if err := enode.SignV4(&r, privkey); err != nil { + panic(err) + } + node, err := enode.New(enode.V4ID{}, &r) + if err != nil { + panic(err) + + } + return node } func randomID() enode.ID { - return randomEnode().ID() + return randomNode().ID() } diff --git a/dex/peer.go b/dex/peer.go index aecf9dc7c..295c5c400 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -52,6 +52,7 @@ import ( "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p" "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/rlp" ) @@ -62,9 +63,9 @@ var ( ) const ( - maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) - maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) - maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) /* maxKnownLatticeBLocks = 2048 @@ -80,7 +81,7 @@ const ( // contain a single transaction, or thousands. maxQueuedTxs = 1024 - maxQueuedMetas = 512 + maxQueuedRecords = 512 // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few @@ -141,7 +142,7 @@ type peer struct { lock sync.RWMutex knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownMetas mapset.Set // Set of node metas known to be known by this peer + knownRecords mapset.Set // Set of node record known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer knownLatticeBlocks mapset.Set knownVotes mapset.Set @@ -150,7 +151,7 @@ type peer struct { knownDKGPrivateShares mapset.Set knownDKGPartialSignatures mapset.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer + queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer queuedProps chan *types.Block // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer queuedLatticeBlocks chan *coreTypes.Block @@ -172,7 +173,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { version: version, id: p.ID().String(), knownTxs: mapset.NewSet(), - knownMetas: mapset.NewSet(), + knownRecords: mapset.NewSet(), knownBlocks: mapset.NewSet(), knownLatticeBlocks: mapset.NewSet(), knownVotes: mapset.NewSet(), @@ -181,7 +182,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { knownDKGPrivateShares: mapset.NewSet(), knownDKGPartialSignatures: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), + queuedRecords: make(chan []*enr.Record, maxQueuedRecords), queuedProps: make(chan *types.Block, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks), @@ -198,16 +199,16 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { } // broadcast is a write loop that multiplexes block propagations, announcements, -// transaction and notary node metas broadcasts into the remote peer. +// transaction and notary node records broadcasts into the remote peer. // The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { select { - case metas := <-p.queuedMetas: - if err := p.SendNodeMetas(metas); err != nil { + case records := <-p.queuedRecords: + if err := p.SendNodeRecords(records); err != nil { return } - p.Log().Trace("Broadcast node metas", "count", len(metas)) + p.Log().Trace("Broadcast node records", "count", len(records)) case block := <-p.queuedProps: if err := p.SendNewBlock(block); err != nil { @@ -335,11 +336,11 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -func (p *peer) MarkNodeMeta(hash common.Hash) { - for p.knownMetas.Cardinality() >= maxKnownMetas { - p.knownMetas.Pop() +func (p *peer) MarkNodeRecord(hash common.Hash) { + for p.knownRecords.Cardinality() >= maxKnownRecords { + p.knownRecords.Pop() } - p.knownMetas.Add(hash) + p.knownRecords.Add(hash) } // SendTransactions sends transactions to the peer and includes the hashes @@ -364,26 +365,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } -// SendNodeMetas sends the metas to the peer and includes the hashes -// in its metas hash set for future reference. -func (p *peer) SendNodeMetas(metas []*NodeMeta) error { - for _, meta := range metas { - p.knownMetas.Add(meta.Hash()) +// SendNodeRecords sends the records to the peer and includes the hashes +// in its records hash set for future reference. +func (p *peer) SendNodeRecords(records []*enr.Record) error { + for _, record := range records { + p.knownRecords.Add(rlpHash(record)) } - return p2p.Send(p.rw, MetaMsg, metas) + return p2p.Send(p.rw, RecordMsg, records) } -// AsyncSendNodeMeta queues list of notary node meta propagation to a +// AsyncSendNodeRecord queues list of notary node records propagation to a // remote peer. If the peer's broadcast queue is full, the event is silently // dropped. -func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) { +func (p *peer) AsyncSendNodeRecords(records []*enr.Record) { select { - case p.queuedMetas <- metas: - for _, meta := range metas { - p.knownMetas.Add(meta.Hash()) + case p.queuedRecords <- records: + for _, record := range records { + p.knownRecords.Add(rlpHash(record)) } default: - p.Log().Debug("Dropping node meta propagation", "count", len(metas)) + p.Log().Debug("Dropping node record propagation", "count", len(records)) } } @@ -851,14 +852,14 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer { return list } -// PeersWithoutNodeMeta retrieves a list of peers that do not have a -// given meta in their set of known hashes. -func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { +// PeersWithoutNodeRecord retrieves a list of peers that do not have a +// given record in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownMetas.Contains(hash) { + if !p.knownRecords.Contains(hash) { list = append(list, p) } } @@ -1118,11 +1119,11 @@ func (ps *peerSet) BuildNotaryConn(round uint64) { } func (ps *peerSet) dumpPeerLabel(s string) { - log.Trace(s, "peer num", len(ps.peers)) + log.Debug(s, "peer num", len(ps.peers)) for id, labels := range ps.peer2Labels { _, ok := ps.peers[id] for label := range labels { - log.Trace(s, "connected", ok, "id", id[:16], + log.Debug(s, "connected", ok, "id", id[:16], "round", label.round, "cid", label.chainID, "set", label.set) } } @@ -1277,6 +1278,7 @@ func (ps *peerSet) removeLabel(node *enode.Node, label peerLabel) { } } +// TODO: improve this by not using pk. func (ps *peerSet) newNode(pk string) *enode.Node { var ip net.IP var tcp, udp int @@ -1290,14 +1292,10 @@ func (ps *peerSet) newNode(pk string) *enode.Node { if err != nil { panic(err) } - n := enode.NewV4(pubkey, net.IP{}, 0, 0) - meta := ps.tab.Get(n.ID()) - if meta != nil { - ip = meta.IP - tcp = int(meta.TCP) - udp = int(meta.UDP) + node := ps.tab.GetNode(enode.PubkeyToIDV4(pubkey)) + if node != nil { + return node } - return enode.NewV4(pubkey, ip, tcp, udp) } diff --git a/dex/peer_test.go b/dex/peer_test.go index 548a61052..9caa62d1e 100644 --- a/dex/peer_test.go +++ b/dex/peer_test.go @@ -26,7 +26,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) { var nodes []*enode.Node for i := 0; i < 9; i++ { - nodes = append(nodes, randomEnode()) + nodes = append(nodes, randomNode()) } round10 := [][]*enode.Node{ @@ -270,7 +270,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) { var nodes []*enode.Node for i := 0; i < 6; i++ { - nodes = append(nodes, randomEnode()) + nodes = append(nodes, randomNode()) } gov := &testGovernance{} diff --git a/dex/protocol.go b/dex/protocol.go index b417c91b6..c6b4c37c4 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -82,7 +82,7 @@ const ( ReceiptsMsg = 0x10 // Protocol messages belonging to dex/64 - MetaMsg = 0x11 + RecordMsg = 0x11 LatticeBlockMsg = 0x20 VoteMsg = 0x21 diff --git a/dex/protocol_test.go b/dex/protocol_test.go index 686ba372f..e9bd4e110 100644 --- a/dex/protocol_test.go +++ b/dex/protocol_test.go @@ -36,6 +36,7 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/rlp" ) @@ -231,73 +232,71 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { } } -func TestRecvNodeMetas(t *testing.T) { +func TestRecvNodeRecords(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) p, _ := newTestPeer("peer", dex64, pm, true) defer pm.Stop() defer p.close() - meta := NodeMeta{ - ID: randomID(), - } + record := randomNode().Record() - ch := make(chan newMetasEvent) - pm.nodeTable.SubscribeNewMetasEvent(ch) + ch := make(chan newRecordsEvent) + pm.nodeTable.SubscribeNewRecordsEvent(ch) - if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil { + if err := p2p.Send(p.app, RecordMsg, []interface{}{record}); err != nil { t.Fatalf("send error: %v", err) } select { case event := <-ch: - metas := event.Metas - if len(metas) != 1 { - t.Errorf("wrong number of new metas: got %d, want 1", len(metas)) - } else if metas[0].Hash() != meta.Hash() { - t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash()) + records := event.Records + if len(records) != 1 { + t.Errorf("wrong number of new records: got %d, want 1", len(records)) + } else if rlpHash(records[0]) != rlpHash(record) { + t.Errorf("added wrong records hash: got %v, want %v", rlpHash(records[0]), rlpHash(record)) } case <-time.After(3 * time.Second): - t.Errorf("no newMetasEvent received within 3 seconds") + t.Errorf("no newRecordsEvent received within 3 seconds") } } -func TestSendNodeMetas(t *testing.T) { +func TestSendNodeRecords(t *testing.T) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) defer pm.Stop() - allmetas := make([]*NodeMeta, 100) - for i := 0; i < len(allmetas); i++ { - allmetas[i] = &NodeMeta{ID: randomID()} + allrecords := make([]*enr.Record, 100) + for i := 0; i < len(allrecords); i++ { + allrecords[i] = randomNode().Record() } // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup - checkmetas := func(p *testPeer) { + checkrecords := func(p *testPeer) { defer wg.Done() defer p.close() seen := make(map[common.Hash]bool) - for _, meta := range allmetas { - seen[meta.Hash()] = false + for _, record := range allrecords { + seen[rlpHash(record)] = false } - for n := 0; n < len(allmetas) && !t.Failed(); { - var metas []*NodeMeta + for n := 0; n < len(allrecords) && !t.Failed(); { + var records []*enr.Record msg, err := p.app.ReadMsg() if err != nil { t.Errorf("%v: read error: %v", p.Peer, err) - } else if msg.Code != MetaMsg { - t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code) + } else if msg.Code != RecordMsg { + t.Errorf("%v: got code %d, want RecordMsg", p.Peer, msg.Code) } - if err := msg.Decode(&metas); err != nil { + if err := msg.Decode(&records); err != nil { t.Errorf("%v: %v", p.Peer, err) } - for _, meta := range metas { - hash := meta.Hash() - seenmeta, want := seen[hash] - if seenmeta { - t.Errorf("%v: got meta more than once: %x", p.Peer, hash) + for _, record := range records { + hash := rlpHash(record) + seenrecord, want := seen[hash] + if seenrecord { + t.Errorf("%v: got record more than once: %x", p.Peer, hash) } if !want { - t.Errorf("%v: got unexpected meta: %x", p.Peer, hash) + t.Errorf("%v: got unexpected record: %x", p.Peer, hash) } seen[hash] = true n++ @@ -307,9 +306,9 @@ func TestSendNodeMetas(t *testing.T) { for i := 0; i < 3; i++ { p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true) wg.Add(1) - go checkmetas(p) + go checkrecords(p) } - pm.nodeTable.Add(allmetas) + pm.nodeTable.AddRecords(allrecords) wg.Wait() } @@ -364,7 +363,7 @@ func TestRecvLatticeBlock(t *testing.T) { t.Errorf("block mismatch") } case <-time.After(3 * time.Second): - t.Errorf("no newMetasEvent received within 3 seconds") + t.Errorf("no newRecordsEvent received within 3 seconds") } } diff --git a/dex/sync.go b/dex/sync.go index 1e35faf21..927c04bc3 100644 --- a/dex/sync.go +++ b/dex/sync.go @@ -26,6 +26,7 @@ import ( "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/dexon-foundation/dexon/p2p/enr" ) const ( @@ -40,8 +41,8 @@ const ( // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 - // This is the target number for the packs of metas sent by metasyncLoop. - metasyncPackNum = 1024 + // This is the target number for the packs of records sent by recordsyncLoop. + recordsyncPackNum = 1024 ) type txsync struct { @@ -136,58 +137,59 @@ func (pm *ProtocolManager) txsyncLoop() { } } -type metasync struct { - p *peer - metas []*NodeMeta +type recordsync struct { + p *peer + records []*enr.Record } -// syncNodeMetas starts sending all node metas to the given peer. -func (pm *ProtocolManager) syncNodeMetas(p *peer) { - metas := pm.nodeTable.Metas() - if len(metas) == 0 { +// syncNodeRecords starts sending all node records to the given peer. +func (pm *ProtocolManager) syncNodeRecords(p *peer) { + records := pm.nodeTable.Records() + p.Log().Debug("Sync node records", "num", len(records)) + if len(records) == 0 { return } select { - case pm.metasyncCh <- &metasync{p, metas}: + case pm.recordsyncCh <- &recordsync{p, records}: case <-pm.quitSync: } } -// metasyncLoop takes care of the initial node meta sync for each new -// connection. When a new peer appears, we relay all currently node metas. +// recordsyncLoop takes care of the initial node record sync for each new +// connection. When a new peer appears, we relay all currently node records. // In order to minimise egress bandwidth usage, we send -// the metas in small packs to one peer at a time. -func (pm *ProtocolManager) metasyncLoop() { +// the records in small packs to one peer at a time. +func (pm *ProtocolManager) recordsyncLoop() { var ( - pending = make(map[enode.ID]*metasync) + pending = make(map[enode.ID]*recordsync) sending = false // whether a send is active - pack = new(metasync) // the pack that is being sent + pack = new(recordsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) // send starts a sending a pack of transactions from the sync. - send := func(s *metasync) { - // Fill pack with node metas up to the target num. + send := func(s *recordsync) { + // Fill pack with node records up to the target num. var num int pack.p = s.p - pack.metas = pack.metas[:0] - for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ { - pack.metas = append(pack.metas, s.metas[i]) + pack.records = pack.records[:0] + for i := 0; i < len(s.records) && num < recordsyncPackNum; i++ { + pack.records = append(pack.records, s.records[i]) num += 1 } - // Remove the metas that will be sent. - s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])] - if len(s.metas) == 0 { + // Remove the records that will be sent. + s.records = s.records[:copy(s.records, s.records[len(pack.records):])] + if len(s.records) == 0 { delete(pending, s.p.ID()) } // Send the pack in the background. - s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num) + s.p.Log().Trace("Sending batch of records", "count", len(pack.records), "bytes", num) sending = true - go func() { done <- pack.p.SendNodeMetas(pack.metas) }() + go func() { done <- pack.p.SendNodeRecords(pack.records) }() } // pick chooses the next pending sync. - pick := func() *metasync { + pick := func() *recordsync { if len(pending) == 0 { return nil } @@ -202,7 +204,7 @@ func (pm *ProtocolManager) metasyncLoop() { for { select { - case s := <-pm.metasyncCh: + case s := <-pm.recordsyncCh: pending[s.p.ID()] = s if !sending { send(s) @@ -211,7 +213,7 @@ func (pm *ProtocolManager) metasyncLoop() { sending = false // Stop tracking peers that cause send failures. if err != nil { - pack.p.Log().Debug("NodeMeta send failed", "err", err) + pack.p.Log().Debug("Record send failed", "err", err) delete(pending, pack.p.ID()) } // Schedule the next send. |