aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-03-21 15:57:28 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-13 18:11:44 +0800
commit3155f92f8f6c8018877c715c9b58bfbe19bfb8ea (patch)
tree4a4faac86c66940bc446827943b618962a9b825c
parentd82cacf5ac51ef695b921a9c2683c38c779d1050 (diff)
downloadgo-tangerine-3155f92f8f6c8018877c715c9b58bfbe19bfb8ea.tar.gz
go-tangerine-3155f92f8f6c8018877c715c9b58bfbe19bfb8ea.tar.zst
go-tangerine-3155f92f8f6c8018877c715c9b58bfbe19bfb8ea.zip
dex: reduce msg when broadcast tx (#292)
Our network topology is different from ethereum, the nodes in notary set will connect each other directly. So there is a waste for flooding tx msgs in notary set. And nodes in notary set are more likely to propose block successfully, it is not useful to broadcast tx msgs to non notary set nodes. This PR will increase some tx confirm latency, but can reduce waste tx msgs a lot.
-rw-r--r--dex/handler.go55
-rw-r--r--dex/peer.go15
2 files changed, 51 insertions, 19 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 3a5a81f50..245b31807 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -77,6 +77,8 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
+ minTxReceiver = 3
+
finalizedBlockChanSize = 128
recordChanSize = 10240
@@ -1004,17 +1006,62 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
- var txset = make(map[*peer]types.Transactions)
+ round := pm.blockchain.CurrentBlock().Round()
+ label := peerLabel{
+ set: notaryset,
+ round: round,
+ }
+
+ // Send to at most `notaryReceiverNum`.
+ // If we don't have many notary peers,
+ // send to at least `minTxReceiver` notary peers. (set notaryReceiverNum = minTxReceiver)
+ notaryPeers := pm.peers.PeersWithLabel(label)
+ notaryReceiverNum := int(math.Sqrt(float64(len(notaryPeers))))
+ if notaryReceiverNum < minTxReceiver {
+ notaryReceiverNum = minTxReceiver
+ }
+
+ // Send to at most `maxReceiver` peers (including notary peers).
+ // If we don't have many peers,
+ // send to a least `minTxReceiver` peers. (set maxReceiver = minTxReceiver)
+ peers := pm.peers.Peers()
+ maxReceiver := int(math.Sqrt(float64(len(peers))))
+ if maxReceiver < minTxReceiver {
+ maxReceiver = minTxReceiver
+ }
+ var txset = make(map[*peer]types.Transactions)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
- peers := pm.peers.PeersWithoutTx(tx.Hash())
+ receivers := make(map[*peer]struct{})
+
+ // notary peers first
+ for _, peer := range notaryPeers {
+ if !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ if len(receivers) >= notaryReceiverNum {
+ break
+ }
+ }
+
for _, peer := range peers {
+ if len(receivers) >= maxReceiver {
+ break
+ }
+
+ // not add to receivers yet and not known the tx
+ if _, ok := receivers[peer]; !ok && !peer.knownTxs.Contains(tx.Hash()) {
+ receivers[peer] = struct{}{}
+ }
+ }
+
+ for peer := range receivers {
txset[peer] = append(txset[peer], tx)
}
- log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(receivers))
}
- // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
diff --git a/dex/peer.go b/dex/peer.go
index f92fce130..562cbfaca 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -854,21 +854,6 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
return list
}
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownTxs.Contains(hash) {
- list = append(list, p)
- }
- }
- return list
-}
-
func (ps *peerSet) PeersWithLabel(label peerLabel) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()