aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-01-07 15:38:59 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:56 +0800
commit3e405c7fdc936d3cdb516273105efc233144965c (patch)
treee6da7f943ea073a66b85b48478c08ac10230f233
parent7ee984823b496de502fcf5ec334bf292cf86eea9 (diff)
downloaddexon-3e405c7fdc936d3cdb516273105efc233144965c.tar.gz
dexon-3e405c7fdc936d3cdb516273105efc233144965c.tar.zst
dexon-3e405c7fdc936d3cdb516273105efc233144965c.zip
dex: replace NodeMeta with ENR (#132)
-rw-r--r--dex/backend.go2
-rw-r--r--dex/handler.go108
-rw-r--r--dex/nodetable.go71
-rw-r--r--dex/nodetable_test.go97
-rw-r--r--dex/peer.go80
-rw-r--r--dex/peer_test.go4
-rw-r--r--dex/protocol.go2
-rw-r--r--dex/protocol_test.go67
-rw-r--r--dex/sync.go60
9 files changed, 236 insertions, 255 deletions
diff --git a/dex/backend.go b/dex/backend.go
index 7b1f4905d..c7e0773ad 100644
--- a/dex/backend.go
+++ b/dex/backend.go
@@ -245,7 +245,7 @@ func (s *Dexon) Start(srvr *p2p.Server) error {
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(srvr, maxPeers)
- s.protocolManager.addSelfMeta()
+ s.protocolManager.addSelfRecord()
return nil
}
diff --git a/dex/handler.go b/dex/handler.go
index 8a60c2a56..66a0fd7fd 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -61,6 +61,7 @@ import (
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/params"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -75,7 +76,7 @@ const (
finalizedBlockChanSize = 128
- metaChanSize = 10240
+ recordChanSize = 10240
maxPullPeers = 3
)
@@ -108,18 +109,18 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
- eventMux *event.TypeMux
- txsCh chan core.NewTxsEvent
- txsSub event.Subscription
- metasCh chan newMetasEvent
- metasSub event.Subscription
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ recordsCh chan newRecordsEvent
+ recordsSub event.Subscription
// channels for fetcher, syncer, txsyncLoop
- newPeerCh chan *peer
- txsyncCh chan *txsync
- metasyncCh chan *metasync
- quitSync chan struct{}
- noMorePeers chan struct{}
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ recordsyncCh chan *recordsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
// channels for peerSetLoop
chainHeadCh chan core.ChainHeadEvent
@@ -163,7 +164,7 @@ func NewProtocolManager(
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
- metasyncCh: make(chan *metasync),
+ recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
isBlockProposer: isBlockProposer,
@@ -272,10 +273,10 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
go pm.finalizedBlockBroadcastLoop()
}
- // broadcast node metas
- pm.metasCh = make(chan newMetasEvent, metaChanSize)
- pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh)
- go pm.metaBroadcastLoop()
+ // broadcast node records
+ pm.recordsCh = make(chan newRecordsEvent, recordChanSize)
+ pm.recordsSub = pm.nodeTable.SubscribeNewRecordsEvent(pm.recordsCh)
+ go pm.recordBroadcastLoop()
// run the peer set loop
pm.chainHeadCh = make(chan core.ChainHeadEvent)
@@ -285,37 +286,12 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
- go pm.metasyncLoop()
+ go pm.recordsyncLoop()
}
-func (pm *ProtocolManager) addSelfMeta() {
- pm.nodeTable.Add([]*NodeMeta{pm.makeSelfNodeMeta()})
-}
-
-func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta {
- self := pm.srvr.Self()
- meta := &NodeMeta{
- ID: self.ID(),
- IP: self.IP(),
- UDP: uint(self.UDP()),
- TCP: uint(self.TCP()),
- Timestamp: uint64(time.Now().Unix()),
- }
-
- h := rlpHash([]interface{}{
- meta.ID,
- meta.IP,
- meta.UDP,
- meta.TCP,
- meta.Timestamp,
- })
- sig, err := crypto.Sign(h[:], pm.srvr.GetPrivateKey())
- if err != nil {
- panic(err)
- }
- meta.Sig = sig
- return meta
+func (pm *ProtocolManager) addSelfRecord() {
+ pm.nodeTable.AddRecords([]*enr.Record{pm.srvr.Self().Record()})
}
func (pm *ProtocolManager) Stop() {
@@ -388,7 +364,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
- pm.syncNodeMetas(p)
+ pm.syncNodeRecords(p)
// main loop. handle incoming messages.
for {
@@ -784,18 +760,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs)
pm.txpool.AddRemotes(txs)
- case msg.Code == MetaMsg:
- var metas []*NodeMeta
- if err := msg.Decode(&metas); err != nil {
+ case msg.Code == RecordMsg:
+ var records []*enr.Record
+ if err := msg.Decode(&records); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- for i, meta := range metas {
- if meta == nil {
- return errResp(ErrDecode, "node meta %d is nil", i)
+ for i, record := range records {
+ if record == nil {
+ return errResp(ErrDecode, "node record %d is nil", i)
}
- p.MarkNodeMeta(meta.Hash())
+ p.MarkNodeRecord(rlpHash(record))
}
- pm.nodeTable.Add(metas)
+ pm.nodeTable.AddRecords(records)
// Block proposer-only messages.
@@ -979,20 +955,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
-// BroadcastMetas will propagate node metas to its peers.
-func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
- var metaset = make(map[*peer][]*NodeMeta)
+// BroadcastRecords will propagate node records to its peers.
+func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
+ var recordset = make(map[*peer][]*enr.Record)
- for _, meta := range metas {
- peers := pm.peers.PeersWithoutNodeMeta(meta.Hash())
+ for _, record := range records {
+ peers := pm.peers.PeersWithoutNodeRecord(rlpHash(record))
for _, peer := range peers {
- metaset[peer] = append(metaset[peer], meta)
+ recordset[peer] = append(recordset[peer], record)
}
- log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers))
+ log.Trace("Broadcast record", "recipients", len(peers))
}
- for peer, metas := range metaset {
- peer.AsyncSendNodeMetas(metas)
+ for peer, records := range recordset {
+ peer.AsyncSendNodeRecords(records)
}
}
@@ -1178,14 +1154,14 @@ func (pm *ProtocolManager) finalizedBlockBroadcastLoop() {
}
}
-func (pm *ProtocolManager) metaBroadcastLoop() {
+func (pm *ProtocolManager) recordBroadcastLoop() {
for {
select {
- case event := <-pm.metasCh:
- pm.BroadcastMetas(event.Metas)
+ case event := <-pm.recordsCh:
+ pm.BroadcastRecords(event.Records)
// Err() channel will be closed when unsubscribing.
- case <-pm.metasSub.Err():
+ case <-pm.recordsSub.Err():
return
}
}
diff --git a/dex/nodetable.go b/dex/nodetable.go
index 9fe1d6b71..12cc9ba46 100644
--- a/dex/nodetable.go
+++ b/dex/nodetable.go
@@ -1,82 +1,71 @@
package dex
import (
- "net"
"sync"
- "github.com/dexon-foundation/dexon/common"
- "github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/event"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p/enode"
- "github.com/dexon-foundation/dexon/rlp"
+ "github.com/dexon-foundation/dexon/p2p/enr"
)
-type NodeMeta struct {
- ID enode.ID
- IP net.IP
- UDP uint
- TCP uint
- Timestamp uint64
- Sig []byte
-}
-
-func (n *NodeMeta) Hash() (h common.Hash) {
- hw := sha3.NewKeccak256()
- rlp.Encode(hw, n)
- hw.Sum(h[:0])
- return h
-}
-
-type newMetasEvent struct{ Metas []*NodeMeta }
+type newRecordsEvent struct{ Records []*enr.Record }
type nodeTable struct {
mu sync.RWMutex
- entry map[enode.ID]*NodeMeta
+ entry map[enode.ID]*enode.Node
feed event.Feed
}
func newNodeTable() *nodeTable {
return &nodeTable{
- entry: make(map[enode.ID]*NodeMeta),
+ entry: make(map[enode.ID]*enode.Node),
}
}
-func (t *nodeTable) Get(id enode.ID) *NodeMeta {
+func (t *nodeTable) GetNode(id enode.ID) *enode.Node {
t.mu.RLock()
defer t.mu.RUnlock()
return t.entry[id]
}
-func (t *nodeTable) Add(metas []*NodeMeta) {
+func (t *nodeTable) AddRecords(records []*enr.Record) {
t.mu.Lock()
defer t.mu.Unlock()
- var newMetas []*NodeMeta
- for _, meta := range metas {
- // TODO: validate the meta
- if e, ok := t.entry[meta.ID]; ok && e.Timestamp > meta.Timestamp {
+ var newRecords []*enr.Record
+ for _, record := range records {
+ node, err := enode.New(enode.ValidSchemes, record)
+ if err != nil {
+ log.Error("invalid node record", "err", err)
+ return
+ }
+
+ if n, ok := t.entry[node.ID()]; ok && n.Seq() >= node.Seq() {
+ log.Debug("Ignore new record, already exists", "id", node.ID().String(),
+ "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP())
continue
}
- t.entry[meta.ID] = meta
- newMetas = append(newMetas, meta)
- log.Trace("add new node meta", "id", meta.ID[:8],
- "ip", meta.IP, "udp", meta.UDP, "tcp", meta.TCP)
+
+ t.entry[node.ID()] = node
+ newRecords = append(newRecords, record)
+ log.Debug("Add new record to node table", "id", node.ID().String(),
+ "ip", node.IP().String(), "udp", node.UDP(), "tcp", node.TCP())
}
- t.feed.Send(newMetasEvent{newMetas})
+ t.feed.Send(newRecordsEvent{newRecords})
}
-func (t *nodeTable) Metas() []*NodeMeta {
+func (t *nodeTable) Records() []*enr.Record {
t.mu.RLock()
defer t.mu.RUnlock()
- metas := make([]*NodeMeta, 0, len(t.entry))
- for _, meta := range t.entry {
- metas = append(metas, meta)
+ records := make([]*enr.Record, 0, len(t.entry))
+ for _, node := range t.entry {
+ records = append(records, node.Record())
}
- return metas
+ return records
}
-func (t *nodeTable) SubscribeNewMetasEvent(
- ch chan<- newMetasEvent) event.Subscription {
+func (t *nodeTable) SubscribeNewRecordsEvent(
+ ch chan<- newRecordsEvent) event.Subscription {
return t.feed.Subscribe(ch)
}
diff --git a/dex/nodetable_test.go b/dex/nodetable_test.go
index da00098a9..06078a0d8 100644
--- a/dex/nodetable_test.go
+++ b/dex/nodetable_test.go
@@ -9,85 +9,90 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/p2p/enr"
)
func TestNodeTable(t *testing.T) {
table := newNodeTable()
- ch := make(chan newMetasEvent)
- table.SubscribeNewMetasEvent(ch)
+ ch := make(chan newRecordsEvent)
+ table.SubscribeNewRecordsEvent(ch)
- metas1 := []*NodeMeta{
- {ID: randomID()},
- {ID: randomID()},
+ records1 := []*enr.Record{
+ randomNode().Record(),
+ randomNode().Record(),
}
- metas2 := []*NodeMeta{
- {ID: randomID()},
- {ID: randomID()},
+ records2 := []*enr.Record{
+ randomNode().Record(),
+ randomNode().Record(),
}
- go table.Add(metas1)
+ go table.AddRecords(records1)
select {
- case newMetas := <-ch:
+ case newRecords := <-ch:
m := map[common.Hash]struct{}{}
- for _, meta := range newMetas.Metas {
- m[meta.Hash()] = struct{}{}
+ for _, record := range newRecords.Records {
+ m[rlpHash(record)] = struct{}{}
}
- if len(m) != len(metas1) {
+ if len(m) != len(records1) {
t.Errorf("len mismatch: got %d, want: %d",
- len(m), len(metas1))
+ len(m), len(records1))
}
- for _, meta := range metas1 {
- if _, ok := m[meta.Hash()]; !ok {
- t.Errorf("expected meta (%s) not exists", meta.Hash())
+ for _, record := range records1 {
+ if _, ok := m[rlpHash(record)]; !ok {
+ t.Errorf("expected record (%s) not exists", rlpHash(record))
}
}
case <-time.After(1 * time.Second):
- t.Error("did not receive new metas event within one second")
+ t.Error("did not receive new records event within one second")
}
- go table.Add(metas2)
+ go table.AddRecords(records2)
select {
- case newMetas := <-ch:
+ case newRecords := <-ch:
m := map[common.Hash]struct{}{}
- for _, meta := range newMetas.Metas {
- m[meta.Hash()] = struct{}{}
+ for _, record := range newRecords.Records {
+ m[rlpHash(record)] = struct{}{}
}
- if len(m) != len(metas1) {
+ if len(m) != len(records2) {
t.Errorf("len mismatch: got %d, want: %d",
- len(m), len(metas2))
+ len(m), len(records2))
}
- for _, meta := range metas2 {
- if _, ok := m[meta.Hash()]; !ok {
- t.Errorf("expected meta (%s) not exists", meta.Hash())
+ for _, record := range records2 {
+ if _, ok := m[rlpHash(record)]; !ok {
+ t.Errorf("expected record (%s) not exists", rlpHash(record))
}
}
case <-time.After(1 * time.Second):
- t.Error("did not receive new metas event within one second")
+ t.Error("did not receive new records event within one second")
}
- var metas []*NodeMeta
- metas = append(metas, metas1...)
- metas = append(metas, metas2...)
- allMetas := table.Metas()
- if len(allMetas) != len(metas) {
+ var records []*enr.Record
+ records = append(records, records1...)
+ records = append(records, records2...)
+ allRecords := table.Records()
+ if len(allRecords) != len(records) {
t.Errorf("all metas num mismatch: got %d, want %d",
- len(metas), len(allMetas))
+ len(records), len(allRecords))
}
- for _, m := range metas {
- if m.Hash() != table.Get(m.ID).Hash() {
- t.Errorf("meta (%s) mismatch", m.ID.String())
+ for _, r := range records {
+ n, err := enode.New(enode.V4ID{}, r)
+ if err != nil {
+ t.Errorf(err.Error())
+ }
+ if rlpHash(r) != rlpHash(table.GetNode(n.ID()).Record()) {
+ t.Errorf("record (%s) mismatch", n.ID().String())
}
}
}
-func randomEnode() *enode.Node {
+func randomNode() *enode.Node {
var err error
var privkey *ecdsa.PrivateKey
for {
@@ -96,9 +101,21 @@ func randomEnode() *enode.Node {
break
}
}
- return enode.NewV4(&privkey.PublicKey, net.IP{}, 0, 0)
+ var r enr.Record
+ r.Set(enr.IP(net.IP{}))
+ r.Set(enr.UDP(0))
+ r.Set(enr.TCP(0))
+ if err := enode.SignV4(&r, privkey); err != nil {
+ panic(err)
+ }
+ node, err := enode.New(enode.V4ID{}, &r)
+ if err != nil {
+ panic(err)
+
+ }
+ return node
}
func randomID() enode.ID {
- return randomEnode().ID()
+ return randomNode().ID()
}
diff --git a/dex/peer.go b/dex/peer.go
index aecf9dc7c..295c5c400 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -52,6 +52,7 @@ import (
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p"
"github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -62,9 +63,9 @@ var (
)
const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownRecords = 32768 // Maximum records hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
/*
maxKnownLatticeBLocks = 2048
@@ -80,7 +81,7 @@ const (
// contain a single transaction, or thousands.
maxQueuedTxs = 1024
- maxQueuedMetas = 512
+ maxQueuedRecords = 512
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
@@ -141,7 +142,7 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownMetas mapset.Set // Set of node metas known to be known by this peer
+ knownRecords mapset.Set // Set of node record known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownLatticeBlocks mapset.Set
knownVotes mapset.Set
@@ -150,7 +151,7 @@ type peer struct {
knownDKGPrivateShares mapset.Set
knownDKGPartialSignatures mapset.Set
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
+ queuedRecords chan []*enr.Record // Queue of node records to broadcast to the peer
queuedProps chan *types.Block // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
queuedLatticeBlocks chan *coreTypes.Block
@@ -172,7 +173,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
version: version,
id: p.ID().String(),
knownTxs: mapset.NewSet(),
- knownMetas: mapset.NewSet(),
+ knownRecords: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownLatticeBlocks: mapset.NewSet(),
knownVotes: mapset.NewSet(),
@@ -181,7 +182,7 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
knownDKGPrivateShares: mapset.NewSet(),
knownDKGPartialSignatures: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
+ queuedRecords: make(chan []*enr.Record, maxQueuedRecords),
queuedProps: make(chan *types.Block, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks),
@@ -198,16 +199,16 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node metas broadcasts into the remote peer.
+// transaction and notary node records broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
select {
- case metas := <-p.queuedMetas:
- if err := p.SendNodeMetas(metas); err != nil {
+ case records := <-p.queuedRecords:
+ if err := p.SendNodeRecords(records); err != nil {
return
}
- p.Log().Trace("Broadcast node metas", "count", len(metas))
+ p.Log().Trace("Broadcast node records", "count", len(records))
case block := <-p.queuedProps:
if err := p.SendNewBlock(block); err != nil {
@@ -335,11 +336,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNodeMeta(hash common.Hash) {
- for p.knownMetas.Cardinality() >= maxKnownMetas {
- p.knownMetas.Pop()
+func (p *peer) MarkNodeRecord(hash common.Hash) {
+ for p.knownRecords.Cardinality() >= maxKnownRecords {
+ p.knownRecords.Pop()
}
- p.knownMetas.Add(hash)
+ p.knownRecords.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -364,26 +365,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNodeMetas sends the metas to the peer and includes the hashes
-// in its metas hash set for future reference.
-func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
- for _, meta := range metas {
- p.knownMetas.Add(meta.Hash())
+// SendNodeRecords sends the records to the peer and includes the hashes
+// in its records hash set for future reference.
+func (p *peer) SendNodeRecords(records []*enr.Record) error {
+ for _, record := range records {
+ p.knownRecords.Add(rlpHash(record))
}
- return p2p.Send(p.rw, MetaMsg, metas)
+ return p2p.Send(p.rw, RecordMsg, records)
}
-// AsyncSendNodeMeta queues list of notary node meta propagation to a
+// AsyncSendNodeRecord queues list of notary node records propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
+func (p *peer) AsyncSendNodeRecords(records []*enr.Record) {
select {
- case p.queuedMetas <- metas:
- for _, meta := range metas {
- p.knownMetas.Add(meta.Hash())
+ case p.queuedRecords <- records:
+ for _, record := range records {
+ p.knownRecords.Add(rlpHash(record))
}
default:
- p.Log().Debug("Dropping node meta propagation", "count", len(metas))
+ p.Log().Debug("Dropping node record propagation", "count", len(records))
}
}
@@ -851,14 +852,14 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash, label peerLabel) []*peer {
return list
}
-// PeersWithoutNodeMeta retrieves a list of peers that do not have a
-// given meta in their set of known hashes.
-func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
+// PeersWithoutNodeRecord retrieves a list of peers that do not have a
+// given record in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeRecord(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if !p.knownMetas.Contains(hash) {
+ if !p.knownRecords.Contains(hash) {
list = append(list, p)
}
}
@@ -1118,11 +1119,11 @@ func (ps *peerSet) BuildNotaryConn(round uint64) {
}
func (ps *peerSet) dumpPeerLabel(s string) {
- log.Trace(s, "peer num", len(ps.peers))
+ log.Debug(s, "peer num", len(ps.peers))
for id, labels := range ps.peer2Labels {
_, ok := ps.peers[id]
for label := range labels {
- log.Trace(s, "connected", ok, "id", id[:16],
+ log.Debug(s, "connected", ok, "id", id[:16],
"round", label.round, "cid", label.chainID, "set", label.set)
}
}
@@ -1277,6 +1278,7 @@ func (ps *peerSet) removeLabel(node *enode.Node, label peerLabel) {
}
}
+// TODO: improve this by not using pk.
func (ps *peerSet) newNode(pk string) *enode.Node {
var ip net.IP
var tcp, udp int
@@ -1290,14 +1292,10 @@ func (ps *peerSet) newNode(pk string) *enode.Node {
if err != nil {
panic(err)
}
- n := enode.NewV4(pubkey, net.IP{}, 0, 0)
- meta := ps.tab.Get(n.ID())
- if meta != nil {
- ip = meta.IP
- tcp = int(meta.TCP)
- udp = int(meta.UDP)
+ node := ps.tab.GetNode(enode.PubkeyToIDV4(pubkey))
+ if node != nil {
+ return node
}
-
return enode.NewV4(pubkey, ip, tcp, udp)
}
diff --git a/dex/peer_test.go b/dex/peer_test.go
index 548a61052..9caa62d1e 100644
--- a/dex/peer_test.go
+++ b/dex/peer_test.go
@@ -26,7 +26,7 @@ func TestPeerSetBuildAndForgetNotaryConn(t *testing.T) {
var nodes []*enode.Node
for i := 0; i < 9; i++ {
- nodes = append(nodes, randomEnode())
+ nodes = append(nodes, randomNode())
}
round10 := [][]*enode.Node{
@@ -270,7 +270,7 @@ func TestPeerSetBuildDKGConn(t *testing.T) {
var nodes []*enode.Node
for i := 0; i < 6; i++ {
- nodes = append(nodes, randomEnode())
+ nodes = append(nodes, randomNode())
}
gov := &testGovernance{}
diff --git a/dex/protocol.go b/dex/protocol.go
index b417c91b6..c6b4c37c4 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -82,7 +82,7 @@ const (
ReceiptsMsg = 0x10
// Protocol messages belonging to dex/64
- MetaMsg = 0x11
+ RecordMsg = 0x11
LatticeBlockMsg = 0x20
VoteMsg = 0x21
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
index 686ba372f..e9bd4e110 100644
--- a/dex/protocol_test.go
+++ b/dex/protocol_test.go
@@ -36,6 +36,7 @@ import (
"github.com/dexon-foundation/dexon/crypto"
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/enr"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -231,73 +232,71 @@ func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
}
}
-func TestRecvNodeMetas(t *testing.T) {
+func TestRecvNodeRecords(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
p, _ := newTestPeer("peer", dex64, pm, true)
defer pm.Stop()
defer p.close()
- meta := NodeMeta{
- ID: randomID(),
- }
+ record := randomNode().Record()
- ch := make(chan newMetasEvent)
- pm.nodeTable.SubscribeNewMetasEvent(ch)
+ ch := make(chan newRecordsEvent)
+ pm.nodeTable.SubscribeNewRecordsEvent(ch)
- if err := p2p.Send(p.app, MetaMsg, []interface{}{meta}); err != nil {
+ if err := p2p.Send(p.app, RecordMsg, []interface{}{record}); err != nil {
t.Fatalf("send error: %v", err)
}
select {
case event := <-ch:
- metas := event.Metas
- if len(metas) != 1 {
- t.Errorf("wrong number of new metas: got %d, want 1", len(metas))
- } else if metas[0].Hash() != meta.Hash() {
- t.Errorf("added wrong meta hash: got %v, want %v", metas[0].Hash(), meta.Hash())
+ records := event.Records
+ if len(records) != 1 {
+ t.Errorf("wrong number of new records: got %d, want 1", len(records))
+ } else if rlpHash(records[0]) != rlpHash(record) {
+ t.Errorf("added wrong records hash: got %v, want %v", rlpHash(records[0]), rlpHash(record))
}
case <-time.After(3 * time.Second):
- t.Errorf("no newMetasEvent received within 3 seconds")
+ t.Errorf("no newRecordsEvent received within 3 seconds")
}
}
-func TestSendNodeMetas(t *testing.T) {
+func TestSendNodeRecords(t *testing.T) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
defer pm.Stop()
- allmetas := make([]*NodeMeta, 100)
- for i := 0; i < len(allmetas); i++ {
- allmetas[i] = &NodeMeta{ID: randomID()}
+ allrecords := make([]*enr.Record, 100)
+ for i := 0; i < len(allrecords); i++ {
+ allrecords[i] = randomNode().Record()
}
// Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup
- checkmetas := func(p *testPeer) {
+ checkrecords := func(p *testPeer) {
defer wg.Done()
defer p.close()
seen := make(map[common.Hash]bool)
- for _, meta := range allmetas {
- seen[meta.Hash()] = false
+ for _, record := range allrecords {
+ seen[rlpHash(record)] = false
}
- for n := 0; n < len(allmetas) && !t.Failed(); {
- var metas []*NodeMeta
+ for n := 0; n < len(allrecords) && !t.Failed(); {
+ var records []*enr.Record
msg, err := p.app.ReadMsg()
if err != nil {
t.Errorf("%v: read error: %v", p.Peer, err)
- } else if msg.Code != MetaMsg {
- t.Errorf("%v: got code %d, want MetaMsg", p.Peer, msg.Code)
+ } else if msg.Code != RecordMsg {
+ t.Errorf("%v: got code %d, want RecordMsg", p.Peer, msg.Code)
}
- if err := msg.Decode(&metas); err != nil {
+ if err := msg.Decode(&records); err != nil {
t.Errorf("%v: %v", p.Peer, err)
}
- for _, meta := range metas {
- hash := meta.Hash()
- seenmeta, want := seen[hash]
- if seenmeta {
- t.Errorf("%v: got meta more than once: %x", p.Peer, hash)
+ for _, record := range records {
+ hash := rlpHash(record)
+ seenrecord, want := seen[hash]
+ if seenrecord {
+ t.Errorf("%v: got record more than once: %x", p.Peer, hash)
}
if !want {
- t.Errorf("%v: got unexpected meta: %x", p.Peer, hash)
+ t.Errorf("%v: got unexpected record: %x", p.Peer, hash)
}
seen[hash] = true
n++
@@ -307,9 +306,9 @@ func TestSendNodeMetas(t *testing.T) {
for i := 0; i < 3; i++ {
p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), dex64, pm, true)
wg.Add(1)
- go checkmetas(p)
+ go checkrecords(p)
}
- pm.nodeTable.Add(allmetas)
+ pm.nodeTable.AddRecords(allrecords)
wg.Wait()
}
@@ -364,7 +363,7 @@ func TestRecvLatticeBlock(t *testing.T) {
t.Errorf("block mismatch")
}
case <-time.After(3 * time.Second):
- t.Errorf("no newMetasEvent received within 3 seconds")
+ t.Errorf("no newRecordsEvent received within 3 seconds")
}
}
diff --git a/dex/sync.go b/dex/sync.go
index 1e35faf21..927c04bc3 100644
--- a/dex/sync.go
+++ b/dex/sync.go
@@ -26,6 +26,7 @@ import (
"github.com/dexon-foundation/dexon/dex/downloader"
"github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/dexon-foundation/dexon/p2p/enr"
)
const (
@@ -40,8 +41,8 @@ const (
// A pack can get larger than this if a single transactions exceeds this size.
txsyncPackSize = 100 * 1024
- // This is the target number for the packs of metas sent by metasyncLoop.
- metasyncPackNum = 1024
+ // This is the target number for the packs of records sent by recordsyncLoop.
+ recordsyncPackNum = 1024
)
type txsync struct {
@@ -136,58 +137,59 @@ func (pm *ProtocolManager) txsyncLoop() {
}
}
-type metasync struct {
- p *peer
- metas []*NodeMeta
+type recordsync struct {
+ p *peer
+ records []*enr.Record
}
-// syncNodeMetas starts sending all node metas to the given peer.
-func (pm *ProtocolManager) syncNodeMetas(p *peer) {
- metas := pm.nodeTable.Metas()
- if len(metas) == 0 {
+// syncNodeRecords starts sending all node records to the given peer.
+func (pm *ProtocolManager) syncNodeRecords(p *peer) {
+ records := pm.nodeTable.Records()
+ p.Log().Debug("Sync node records", "num", len(records))
+ if len(records) == 0 {
return
}
select {
- case pm.metasyncCh <- &metasync{p, metas}:
+ case pm.recordsyncCh <- &recordsync{p, records}:
case <-pm.quitSync:
}
}
-// metasyncLoop takes care of the initial node meta sync for each new
-// connection. When a new peer appears, we relay all currently node metas.
+// recordsyncLoop takes care of the initial node record sync for each new
+// connection. When a new peer appears, we relay all currently node records.
// In order to minimise egress bandwidth usage, we send
-// the metas in small packs to one peer at a time.
-func (pm *ProtocolManager) metasyncLoop() {
+// the records in small packs to one peer at a time.
+func (pm *ProtocolManager) recordsyncLoop() {
var (
- pending = make(map[enode.ID]*metasync)
+ pending = make(map[enode.ID]*recordsync)
sending = false // whether a send is active
- pack = new(metasync) // the pack that is being sent
+ pack = new(recordsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
- send := func(s *metasync) {
- // Fill pack with node metas up to the target num.
+ send := func(s *recordsync) {
+ // Fill pack with node records up to the target num.
var num int
pack.p = s.p
- pack.metas = pack.metas[:0]
- for i := 0; i < len(s.metas) && num < metasyncPackNum; i++ {
- pack.metas = append(pack.metas, s.metas[i])
+ pack.records = pack.records[:0]
+ for i := 0; i < len(s.records) && num < recordsyncPackNum; i++ {
+ pack.records = append(pack.records, s.records[i])
num += 1
}
- // Remove the metas that will be sent.
- s.metas = s.metas[:copy(s.metas, s.metas[len(pack.metas):])]
- if len(s.metas) == 0 {
+ // Remove the records that will be sent.
+ s.records = s.records[:copy(s.records, s.records[len(pack.records):])]
+ if len(s.records) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
- s.p.Log().Trace("Sending batch of transactions", "count", len(pack.metas), "bytes", num)
+ s.p.Log().Trace("Sending batch of records", "count", len(pack.records), "bytes", num)
sending = true
- go func() { done <- pack.p.SendNodeMetas(pack.metas) }()
+ go func() { done <- pack.p.SendNodeRecords(pack.records) }()
}
// pick chooses the next pending sync.
- pick := func() *metasync {
+ pick := func() *recordsync {
if len(pending) == 0 {
return nil
}
@@ -202,7 +204,7 @@ func (pm *ProtocolManager) metasyncLoop() {
for {
select {
- case s := <-pm.metasyncCh:
+ case s := <-pm.recordsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
@@ -211,7 +213,7 @@ func (pm *ProtocolManager) metasyncLoop() {
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
- pack.p.Log().Debug("NodeMeta send failed", "err", err)
+ pack.p.Log().Debug("Record send failed", "err", err)
delete(pending, pack.p.ID())
}
// Schedule the next send.