aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-03-21 15:57:28 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:58 +0800
commita18f103408de2552bb5f47b1d76f4c6f18f0677e (patch)
tree2725210ada2691958e8f82e5366db249dd98325e
parent9ad0e05b15f8b36f38f19cc1b24dc8b93e845844 (diff)
downloaddexon-a18f103408de2552bb5f47b1d76f4c6f18f0677e.tar.gz
dexon-a18f103408de2552bb5f47b1d76f4c6f18f0677e.tar.zst
dexon-a18f103408de2552bb5f47b1d76f4c6f18f0677e.zip
dex: reduce msg when broadcast tx (#292)
Our network topology is different from ethereum, the nodes in notary set will connect each other directly. So there is a waste for flooding tx msgs in notary set. And nodes in notary set are more likely to propose block successfully, it is not useful to broadcast tx msgs to non notary set nodes. This PR will increase some tx confirm latency, but can reduce waste tx msgs a lot.
-rw-r--r--dex/handler.go55
-rw-r--r--dex/peer.go15
2 files changed, 51 insertions, 19 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 3a5a81f50..245b31807 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -77,6 +77,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ minTxReceiver = 3
+
finalizedBlockChanSize = 128
recordChanSize = 10240
@@ -1004,17 +1006,62 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
- var txset = make(map[*peer]types.Transactions)
+ round := pm.blockchain.CurrentBlock().Round()
+ label := peerLabel{
+ set: notaryset,
+ round: round,
+ }
+
+ // Send to at most `notaryReceiverNum`.
+ // If we don't have many notary peers,
+ // send to at least `minTxReceiver` notary peers. (set notaryReceiverNum = minTxReceiver)
+ notaryPeers := pm.peers.PeersWithLabel(label)
+ notaryReceiverNum := int(math.Sqrt(float64(len(notaryPeers))))
+ if notaryReceiverNum < minTxReceiver {
+ notaryReceiverNum = minTxReceiver
+ }
+
+ // Send to at most `maxReceiver` peers (including notary peers).
+ // If we don't have many peers,
+ // send to a least `minTxReceiver` peers. (set maxReceiver = minTxReceiver)
+ peers := pm.peers.Peers()
+ maxReceiver := int(math.Sqrt(float64(len(peers))))
+ if maxReceiver < minTxReceiver {
+ maxReceiver = minTxReceiver
+ }
+ var txset = make(map[*peer]types.Transactions)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
- peers := pm.peers.PeersWithoutTx(tx.Hash())
+ receivers := make(map[*peer]struct{})
+
+ // notary peers first
+ for _, peer := range notaryPeers {
+ if !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ if len(receivers) >= notaryReceiverNum {
+ break
+ }
+ }
+
for _, peer := range peers {
+ if len(receivers) >= maxReceiver {
+ break
+ }
+
+ // not add to receivers yet and not known the tx
+ if _, ok := receivers[peer]; !ok && !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ }
+
+ for peer := range receivers {
txset[peer] = append(txset[peer], tx)
}
- log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(receivers))
}
- // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
diff --git a/dex/peer.go b/dex/peer.go
index f92fce130..562cbfaca 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -854,21 +854,6 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
return list
}
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownTxs.Contains(hash) {
- list = append(list, p)
- }
- }
- return list
-}
-
func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()