aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-25 20:37:11 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:49 +0800
commit8335eac4a488716c396f114cbe7522919b97e224 (patch)
treec680248b9a7346e63ddde403689c2a484a43c4b4 /dex/peer.go
parent6f442cd7793daad014aa0d55b3b7320392c22f02 (diff)
downloaddexon-8335eac4a488716c396f114cbe7522919b97e224.tar.gz
dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.zst
dexon-8335eac4a488716c396f114cbe7522919b97e224.zip
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection. - Introduce node meta table to maintain IP of all nodes in node set, in memory and let nodes in the network can sync this table. - Let peerSet able to manage direct connections to notary set and dkg set. The mechanism to refresh the network topology when configuration round change is not done yet.
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go296
1 files changed, 230 insertions, 66 deletions
diff --git a/dex/peer.go b/dex/peer.go
index f1a4335d1..8c218f1d9 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/core/types"
"github.com/dexon-foundation/dexon/p2p"
+ "github.com/dexon-foundation/dexon/p2p/discover"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -34,19 +35,20 @@ var (
errClosed = errors.New("peer set is closed")
errAlreadyRegistered = errors.New("peer is already registered")
errNotRegistered = errors.New("peer is not registered")
- errInvalidRound = errors.New("invalid round")
)
const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- maxKnownInfos = 1024
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
maxQueuedTxs = 128
+ maxQueuedMetas = 512
+
// maxQueuedProps is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
@@ -57,9 +59,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
- maxQueuedInfos = 1024
-
handshakeTimeout = 5 * time.Second
+
+ groupNodeNum = 3
)
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
@@ -76,13 +78,27 @@ type propEvent struct {
td *big.Int
}
+type setType uint32
+
+const (
+ dkgset = iota
+ notaryset
+)
+
+type peerLabel struct {
+ set setType
+ chainID uint32
+ round uint64
+}
+
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
- version int // Protocol version negotiated
+ version int // Protocol version negotiated
+ labels mapset.Set
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
@@ -90,12 +106,12 @@ type peer struct {
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownInfos mapset.Set // Set of infos known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
Peer: p,
rw: rw,
version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ labels: mapset.NewSet(),
+ id: p.ID().String(),
knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
- knownInfos: mapset.NewSet(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos),
term: make(chan struct{}),
}
}
// broadcast is a write loop that multiplexes block propagations, announcements,
-// transaction and notary node infos broadcasts into the remote peer.
+// transaction and notary node metas broadcasts into the remote peer.
// The goal is to have an async writer that does not lock up node internals.
func (p *peer) broadcast() {
for {
@@ -128,6 +145,12 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Broadcast transactions", "count", len(txs))
+ case metas := <-p.queuedMetas:
+ if err := p.SendNodeMetas(metas); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast node metas", "count", len(metas))
+
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
@@ -140,12 +163,6 @@ func (p *peer) broadcast() {
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case info := <-p.queuedInfos:
- if err := p.SendNotaryNodeInfo(info); err != nil {
- return
- }
- p.Log().Trace("Broadcast notary node info")
-
case <-p.term:
return
}
@@ -157,6 +174,14 @@ func (p *peer) close() {
close(p.term)
}
+func (p *peer) addLabel(label peerLabel) {
+ p.labels.Add(label)
+}
+
+func (p *peer) removeLabel(label peerLabel) {
+ p.labels.Remove(label)
+}
+
// Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *PeerInfo {
hash, td := p.Head()
@@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-func (p *peer) MarkNotaryNodeInfo(hash common.Hash) {
- for p.knownInfos.Cardinality() >= maxKnownInfos {
- p.knownInfos.Pop()
+func (p *peer) MarkNodeMeta(hash common.Hash) {
+ for p.knownMetas.Cardinality() >= maxKnownMetas {
+ p.knownMetas.Pop()
}
- p.knownInfos.Add(hash)
+ p.knownMetas.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
@@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
}
}
-// SendNotaryNodeInfo sends the info to the peer and includes the hashes
-// in its info hash set for future reference.
-func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error {
- return p2p.Send(p.rw, NotaryNodeInfoMsg, info)
+// SendNodeMetas sends the metas to the peer and includes the hashes
+// in its metas hash set for future reference.
+func (p *peer) SendNodeMetas(metas []*NodeMeta) error {
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
+ return p2p.Send(p.rw, MetaMsg, metas)
}
-// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a
+// AsyncSendNodeMeta queues list of notary node meta propagation to a
// remote peer. If the peer's broadcast queue is full, the event is silently
// dropped.
-func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) {
+func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) {
select {
- case p.queuedInfos <- info:
- p.knownInfos.Add(info.Hash())
+ case p.queuedMetas <- metas:
+ for _, meta := range metas {
+ p.knownMetas.Add(meta.Hash())
+ }
default:
- p.Log().Debug("Dropping notary node info propagation")
+ p.Log().Debug("Dropping node meta propagation", "count", len(metas))
}
}
@@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has
// String implements fmt.Stringer.
func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("eth/%2d", p.version),
+ fmt.Sprintf("dex/%2d", p.version),
)
}
@@ -441,18 +471,25 @@ type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
+ tab *nodeTable
- // TODO(sonic): revist this map after dexon core SDK is finalized.
- // use types.ValidatorID? or implement Stringer for types.ValidatorID
- notaryPeers map[uint64]map[string]*peer
- round uint64
+ srvr p2pServer
+ gov governance
+ peerLabels map[string]map[peerLabel]struct{}
+ notaryHistory map[uint64]struct{}
+ dkgHistory map[uint64]struct{}
}
// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
+func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet {
return &peerSet{
- peers: make(map[string]*peer),
- notaryPeers: make(map[uint64]map[string]*peer),
+ peers: make(map[string]*peer),
+ gov: gov,
+ srvr: srvr,
+ tab: tab,
+ peerLabels: make(map[string]map[peerLabel]struct{}),
+ notaryHistory: make(map[uint64]struct{}),
+ dkgHistory: make(map[uint64]struct{}),
}
}
@@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
-// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a
-// given info in their set of known hashes.
-func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer {
+// PeersWithoutNodeMeta retrieves a list of peers that do not have a
+// given meta in their set of known hashes.
+func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- if p.knownInfos.Contains(hash) {
+ if !p.knownMetas.Contains(hash) {
list = append(list, p)
}
}
@@ -580,44 +617,171 @@ func (ps *peerSet) Close() {
ps.closed = true
}
-// AddNotaryPeer adds a peer into notary peer of the given round.
-// Caller of this function need to make sure that the peer is acutally in
-// notary set.
-func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error {
+func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- // TODO(sonic): revisit this round check after dexon core SDK is finalized.
- if round < ps.round || round > ps.round+2 {
- return errInvalidRound
+ if _, ok := ps.notaryHistory[round]; ok {
+ return
}
- if _, ok := ps.peers[p.id]; !ok {
- return errNotRegistered
+ ps.notaryHistory[round] = struct{}{}
+
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+
+ // not in notary set, add group
+ if _, ok := s[selfID]; !ok {
+ var nodes []*discover.Node
+ for id := range s {
+ nodes = append(nodes, ps.newNode(id))
+ }
+ ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum)
+ continue
+ }
+
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, label)
+ }
}
+}
- ps.notaryPeers[round][p.id] = p
- return nil
+func (ps *peerSet) ForgetNotaryConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.notaryHistory {
+ if r <= round {
+ ps.forgetNotaryConn(r)
+ delete(ps.notaryHistory, r)
+ }
+ }
}
-// NotaryPeers return peers in notary set of the given round.
-func (ps *peerSet) NotaryPeers(round uint64) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) forgetNotaryConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ {
+ s := ps.gov.GetNotarySet(chainID, round)
+ if _, ok := s[selfID]; !ok {
+ ps.srvr.RemoveGroup(notarySetName(chainID, round))
+ continue
+ }
- list := make([]*peer, 0, len(ps.notaryPeers[round]))
- for _, p := range ps.notaryPeers[round] {
- if _, ok := ps.peers[p.id]; ok {
- list = append(list, p)
+ label := peerLabel{
+ set: notaryset,
+ chainID: chainID,
+ round: round,
+ }
+ delete(s, selfID)
+ for id := range s {
+ ps.removeDirectPeer(id, label)
}
}
- return list
}
-// NextRound moves notary peer set to next round.
-func (ps *peerSet) NextRound() {
+func notarySetName(chainID uint32, round uint64) string {
+ return fmt.Sprintf("%d-%d-notaryset", chainID, round)
+}
+
+func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
- delete(ps.notaryPeers, ps.round)
- ps.round = ps.round + 1
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+ ps.dkgHistory[round] = struct{}{}
+
+ delete(s, selfID)
+ for id := range s {
+ ps.addDirectPeer(id, peerLabel{
+ set: dkgset,
+ round: round,
+ })
+ }
+}
+
+func (ps *peerSet) ForgetDKGConn(round uint64) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ // forget all the rounds before the given round
+ for r := range ps.dkgHistory {
+ if r <= round {
+ ps.forgetDKGConn(r)
+ delete(ps.dkgHistory, r)
+ }
+ }
+}
+
+func (ps *peerSet) forgetDKGConn(round uint64) {
+ selfID := ps.srvr.Self().ID.String()
+ s := ps.gov.GetDKGSet(round)
+ if _, ok := s[selfID]; !ok {
+ return
+ }
+
+ delete(s, selfID)
+ label := peerLabel{
+ set: dkgset,
+ round: round,
+ }
+ for id := range s {
+ ps.removeDirectPeer(id, label)
+ }
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ // if the peer exists add the label
+ if p, ok := ps.peers[id]; ok {
+ p.addLabel(label)
+ }
+
+ if _, ok := ps.peerLabels[id]; !ok {
+ ps.peerLabels[id] = make(map[peerLabel]struct{})
+ }
+
+ ps.peerLabels[id][label] = struct{}{}
+ ps.srvr.AddDirectPeer(ps.newNode(id))
+}
+
+// make sure the ps.lock is hold
+func (ps *peerSet) removeDirectPeer(id string, label peerLabel) {
+ if p, ok := ps.peers[id]; ok {
+ p.removeLabel(label)
+ }
+
+ delete(ps.peerLabels[id], label)
+
+ if len(ps.peerLabels[id]) == 0 {
+ ps.srvr.RemoveDirectPeer(ps.newNode(id))
+ delete(ps.peerLabels, id)
+ }
+}
+
+func (ps *peerSet) newNode(id string) *enode.Node {
+ nodeID := enode.HexID(id)
+ meta := ps.tab.Get(enode.HexID(id))
+
+ var r enr.Record
+ r.Set(enr.ID(nodeID.String()))
+ r.Set(enr.IP(meta.IP))
+ r.Set(enr.TCP(meta.TCP))
+ r.Set(enr.UDP(meta.UDP))
+
+ n, err := enode.New(enode.ValidSchemes, &r)
+ if err != nil {
+ panic(err)
+ }
+ return n
}