diff options
Diffstat (limited to 'dex/peer.go')
-rw-r--r-- | dex/peer.go | 296 |
1 files changed, 230 insertions, 66 deletions
diff --git a/dex/peer.go b/dex/peer.go index f1a4335d1..8c218f1d9 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -27,6 +27,7 @@ import ( "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/p2p" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/rlp" ) @@ -34,19 +35,20 @@ var ( errClosed = errors.New("peer set is closed") errAlreadyRegistered = errors.New("peer is already registered") errNotRegistered = errors.New("peer is not registered") - errInvalidRound = errors.New("invalid round") ) const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - maxKnownInfos = 1024 // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. maxQueuedTxs = 128 + maxQueuedMetas = 512 + // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. @@ -57,9 +59,9 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 - maxQueuedInfos = 1024 - handshakeTimeout = 5 * time.Second + + groupNodeNum = 3 ) // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known @@ -76,13 +78,27 @@ type propEvent struct { td *big.Int } +type setType uint32 + +const ( + dkgset = iota + notaryset +) + +type peerLabel struct { + set setType + chainID uint32 + round uint64 +} + type peer struct { id string *p2p.Peer rw p2p.MsgReadWriter - version int // Protocol version negotiated + version int // Protocol version negotiated + labels mapset.Set forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time head common.Hash @@ -90,12 +106,12 @@ type peer struct { lock sync.RWMutex knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownMetas mapset.Set // Set of node metas known to be known by this peer knownBlocks mapset.Set // Set of block hashes known to be known by this peer - knownInfos mapset.Set // Set of infos known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer - queuedInfos chan *notaryNodeInfo // Queue of infos to broadcast to the peer term chan struct{} // Termination channel to stop the broadcaster } @@ -104,20 +120,21 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { Peer: p, rw: rw, version: version, - id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + labels: mapset.NewSet(), + id: p.ID().String(), knownTxs: mapset.NewSet(), + knownMetas: mapset.NewSet(), knownBlocks: mapset.NewSet(), - knownInfos: mapset.NewSet(), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedMetas: make(chan []*NodeMeta, maxQueuedMetas), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), - queuedInfos: make(chan *notaryNodeInfo, maxQueuedInfos), term: make(chan struct{}), } } // broadcast is a write loop that multiplexes block propagations, announcements, -// transaction and notary node infos broadcasts into the remote peer. +// transaction and notary node metas broadcasts into the remote peer. // The goal is to have an async writer that does not lock up node internals. func (p *peer) broadcast() { for { @@ -128,6 +145,12 @@ func (p *peer) broadcast() { } p.Log().Trace("Broadcast transactions", "count", len(txs)) + case metas := <-p.queuedMetas: + if err := p.SendNodeMetas(metas); err != nil { + return + } + p.Log().Trace("Broadcast node metas", "count", len(metas)) + case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return @@ -140,12 +163,6 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) - case info := <-p.queuedInfos: - if err := p.SendNotaryNodeInfo(info); err != nil { - return - } - p.Log().Trace("Broadcast notary node info") - case <-p.term: return } @@ -157,6 +174,14 @@ func (p *peer) close() { close(p.term) } +func (p *peer) addLabel(label peerLabel) { + p.labels.Add(label) +} + +func (p *peer) removeLabel(label peerLabel) { + p.labels.Remove(label) +} + // Info gathers and returns a collection of metadata known about a peer. func (p *peer) Info() *PeerInfo { hash, td := p.Head() @@ -207,11 +232,11 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -func (p *peer) MarkNotaryNodeInfo(hash common.Hash) { - for p.knownInfos.Cardinality() >= maxKnownInfos { - p.knownInfos.Pop() +func (p *peer) MarkNodeMeta(hash common.Hash) { + for p.knownMetas.Cardinality() >= maxKnownMetas { + p.knownMetas.Pop() } - p.knownInfos.Add(hash) + p.knownMetas.Add(hash) } // SendTransactions sends transactions to the peer and includes the hashes @@ -236,21 +261,26 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { } } -// SendNotaryNodeInfo sends the info to the peer and includes the hashes -// in its info hash set for future reference. -func (p *peer) SendNotaryNodeInfo(info *notaryNodeInfo) error { - return p2p.Send(p.rw, NotaryNodeInfoMsg, info) +// SendNodeMetas sends the metas to the peer and includes the hashes +// in its metas hash set for future reference. +func (p *peer) SendNodeMetas(metas []*NodeMeta) error { + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } + return p2p.Send(p.rw, MetaMsg, metas) } -// AsyncSendNotaryNodeInfo queues list of notary node info propagation to a +// AsyncSendNodeMeta queues list of notary node meta propagation to a // remote peer. If the peer's broadcast queue is full, the event is silently // dropped. -func (p *peer) AsyncSendNotaryNodeInfo(info *notaryNodeInfo) { +func (p *peer) AsyncSendNodeMetas(metas []*NodeMeta) { select { - case p.queuedInfos <- info: - p.knownInfos.Add(info.Hash()) + case p.queuedMetas <- metas: + for _, meta := range metas { + p.knownMetas.Add(meta.Hash()) + } default: - p.Log().Debug("Dropping notary node info propagation") + p.Log().Debug("Dropping node meta propagation", "count", len(metas)) } } @@ -431,7 +461,7 @@ func (p *peer) readStatus(network uint64, status *statusData, genesis common.Has // String implements fmt.Stringer. func (p *peer) String() string { return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("eth/%2d", p.version), + fmt.Sprintf("dex/%2d", p.version), ) } @@ -441,18 +471,25 @@ type peerSet struct { peers map[string]*peer lock sync.RWMutex closed bool + tab *nodeTable - // TODO(sonic): revist this map after dexon core SDK is finalized. - // use types.ValidatorID? or implement Stringer for types.ValidatorID - notaryPeers map[uint64]map[string]*peer - round uint64 + srvr p2pServer + gov governance + peerLabels map[string]map[peerLabel]struct{} + notaryHistory map[uint64]struct{} + dkgHistory map[uint64]struct{} } // newPeerSet creates a new peer set to track the active participants. -func newPeerSet() *peerSet { +func newPeerSet(gov governance, srvr p2pServer, tab *nodeTable) *peerSet { return &peerSet{ - peers: make(map[string]*peer), - notaryPeers: make(map[uint64]map[string]*peer), + peers: make(map[string]*peer), + gov: gov, + srvr: srvr, + tab: tab, + peerLabels: make(map[string]map[peerLabel]struct{}), + notaryHistory: make(map[uint64]struct{}), + dkgHistory: make(map[uint64]struct{}), } } @@ -537,14 +574,14 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } -// PeersWithoutNotaryNodeInfo retrieves a list of peers that do not have a -// given info in their set of known hashes. -func (ps *peerSet) PeersWithoutNotaryNodeInfo(hash common.Hash) []*peer { +// PeersWithoutNodeMeta retrieves a list of peers that do not have a +// given meta in their set of known hashes. +func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if p.knownInfos.Contains(hash) { + if !p.knownMetas.Contains(hash) { list = append(list, p) } } @@ -580,44 +617,171 @@ func (ps *peerSet) Close() { ps.closed = true } -// AddNotaryPeer adds a peer into notary peer of the given round. -// Caller of this function need to make sure that the peer is acutally in -// notary set. -func (ps *peerSet) AddNotaryPeer(round uint64, p *peer) error { +func (ps *peerSet) BuildNotaryConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - // TODO(sonic): revisit this round check after dexon core SDK is finalized. - if round < ps.round || round > ps.round+2 { - return errInvalidRound + if _, ok := ps.notaryHistory[round]; ok { + return } - if _, ok := ps.peers[p.id]; !ok { - return errNotRegistered + ps.notaryHistory[round] = struct{}{} + + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + + // not in notary set, add group + if _, ok := s[selfID]; !ok { + var nodes []*discover.Node + for id := range s { + nodes = append(nodes, ps.newNode(id)) + } + ps.srvr.AddGroup(notarySetName(chainID, round), nodes, groupNodeNum) + continue + } + + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, label) + } } +} - ps.notaryPeers[round][p.id] = p - return nil +func (ps *peerSet) ForgetNotaryConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.notaryHistory { + if r <= round { + ps.forgetNotaryConn(r) + delete(ps.notaryHistory, r) + } + } } -// NotaryPeers return peers in notary set of the given round. -func (ps *peerSet) NotaryPeers(round uint64) []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() +func (ps *peerSet) forgetNotaryConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + for chainID := uint32(0); chainID < ps.gov.GetChainNum(round); chainID++ { + s := ps.gov.GetNotarySet(chainID, round) + if _, ok := s[selfID]; !ok { + ps.srvr.RemoveGroup(notarySetName(chainID, round)) + continue + } - list := make([]*peer, 0, len(ps.notaryPeers[round])) - for _, p := range ps.notaryPeers[round] { - if _, ok := ps.peers[p.id]; ok { - list = append(list, p) + label := peerLabel{ + set: notaryset, + chainID: chainID, + round: round, + } + delete(s, selfID) + for id := range s { + ps.removeDirectPeer(id, label) } } - return list } -// NextRound moves notary peer set to next round. -func (ps *peerSet) NextRound() { +func notarySetName(chainID uint32, round uint64) string { + return fmt.Sprintf("%d-%d-notaryset", chainID, round) +} + +func (ps *peerSet) BuildDKGConn(round uint64) { ps.lock.Lock() defer ps.lock.Unlock() - delete(ps.notaryPeers, ps.round) - ps.round = ps.round + 1 + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + ps.dkgHistory[round] = struct{}{} + + delete(s, selfID) + for id := range s { + ps.addDirectPeer(id, peerLabel{ + set: dkgset, + round: round, + }) + } +} + +func (ps *peerSet) ForgetDKGConn(round uint64) { + ps.lock.Lock() + defer ps.lock.Unlock() + + // forget all the rounds before the given round + for r := range ps.dkgHistory { + if r <= round { + ps.forgetDKGConn(r) + delete(ps.dkgHistory, r) + } + } +} + +func (ps *peerSet) forgetDKGConn(round uint64) { + selfID := ps.srvr.Self().ID.String() + s := ps.gov.GetDKGSet(round) + if _, ok := s[selfID]; !ok { + return + } + + delete(s, selfID) + label := peerLabel{ + set: dkgset, + round: round, + } + for id := range s { + ps.removeDirectPeer(id, label) + } +} + +// make sure the ps.lock is hold +func (ps *peerSet) addDirectPeer(id string, label peerLabel) { + // if the peer exists add the label + if p, ok := ps.peers[id]; ok { + p.addLabel(label) + } + + if _, ok := ps.peerLabels[id]; !ok { + ps.peerLabels[id] = make(map[peerLabel]struct{}) + } + + ps.peerLabels[id][label] = struct{}{} + ps.srvr.AddDirectPeer(ps.newNode(id)) +} + +// make sure the ps.lock is hold +func (ps *peerSet) removeDirectPeer(id string, label peerLabel) { + if p, ok := ps.peers[id]; ok { + p.removeLabel(label) + } + + delete(ps.peerLabels[id], label) + + if len(ps.peerLabels[id]) == 0 { + ps.srvr.RemoveDirectPeer(ps.newNode(id)) + delete(ps.peerLabels, id) + } +} + +func (ps *peerSet) newNode(id string) *enode.Node { + nodeID := enode.HexID(id) + meta := ps.tab.Get(enode.HexID(id)) + + var r enr.Record + r.Set(enr.ID(nodeID.String())) + r.Set(enr.IP(meta.IP)) + r.Set(enr.TCP(meta.TCP)) + r.Set(enr.UDP(meta.UDP)) + + n, err := enode.New(enode.ValidSchemes, &r) + if err != nil { + panic(err) + } + return n } |