aboutsummaryrefslogtreecommitdiffstats
path: root/dex/sync.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-19 13:49:30 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:21:31 +0800
commit4dc829780c3bf1dcfc097a0f176d0df3f221f028 (patch)
treee7d36bd5b41b0af2a509bce76ab5ba3b3b92acd6 /dex/sync.go
parent76ad273b6d2ca081d72879f6284ae69b1013f2df (diff)
downloadgo-tangerine-4dc829780c3bf1dcfc097a0f176d0df3f221f028.tar.gz
go-tangerine-4dc829780c3bf1dcfc097a0f176d0df3f221f028.tar.zst
go-tangerine-4dc829780c3bf1dcfc097a0f176d0df3f221f028.zip
Copy codebase from eth
Diffstat (limited to 'dex/sync.go')
-rw-r--r--dex/sync.go217
1 files changed, 217 insertions, 0 deletions
diff --git a/dex/sync.go b/dex/sync.go
new file mode 100644
index 000000000..e49e40087
--- /dev/null
+++ b/dex/sync.go
@@ -0,0 +1,217 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "math/rand"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+
+ // This is the target size for the packs of transactions sent by txsyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txsyncPackSize = 100 * 1024
+)
+
+type txsync struct {
+ p *peer
+ txs []*types.Transaction
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pm *ProtocolManager) syncTransactions(p *peer) {
+ var txs types.Transactions
+ pending, _ := pm.txpool.Pending()
+ for _, batch := range pending {
+ txs = append(txs, batch...)
+ }
+ if len(txs) == 0 {
+ return
+ }
+ select {
+ case pm.txsyncCh <- &txsync{p, txs}:
+ case <-pm.quitSync:
+ }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (pm *ProtocolManager) txsyncLoop() {
+ var (
+ pending = make(map[discover.NodeID]*txsync)
+ sending = false // whether a send is active
+ pack = new(txsync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *txsync) {
+ // Fill pack with transactions up to the target size.
+ size := common.StorageSize(0)
+ pack.p = s.p
+ pack.txs = pack.txs[:0]
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += s.txs[i].Size()
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
+ sending = true
+ go func() { done <- pack.p.SendTransactions(pack.txs) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txsync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.txsyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("Transaction send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as handling the announcement handler.
+func (pm *ProtocolManager) syncer() {
+ // Start and ensure cleanup of sync mechanisms
+ pm.fetcher.Start()
+ defer pm.fetcher.Stop()
+ defer pm.downloader.Terminate()
+
+ // Wait for different events to fire synchronisation operations
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
+ for {
+ select {
+ case <-pm.newPeerCh:
+ // Make sure we have peers to select from, then sync
+ if pm.peers.Len() < minDesiredPeerCount {
+ break
+ }
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-forceSync.C:
+ // Force a sync even if not enough peers are present
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-pm.noMorePeers:
+ return
+ }
+ }
+}
+
+// synchronise tries to sync up our local block chain with a remote peer.
+func (pm *ProtocolManager) synchronise(peer *peer) {
+ // Short circuit if no peers are available
+ if peer == nil {
+ return
+ }
+ // Make sure the peer's TD is higher than our own
+ currentBlock := pm.blockchain.CurrentBlock()
+ td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
+
+ pHead, pTd := peer.Head()
+ if pTd.Cmp(td) <= 0 {
+ return
+ }
+ // Otherwise try to sync with the downloader
+ mode := downloader.FullSync
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Fast sync was explicitly requested, and explicitly granted
+ mode = downloader.FastSync
+ } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
+ // The database seems empty as the current block is the genesis. Yet the fast
+ // block is ahead, so fast sync was enabled for this node at a certain point.
+ // The only scenario where this can happen is if the user manually (or via a
+ // bad block) rolled back a fast sync node below the sync point. In this case
+ // however it's safe to reenable fast sync.
+ atomic.StoreUint32(&pm.fastSync, 1)
+ mode = downloader.FastSync
+ }
+
+ if mode == downloader.FastSync {
+ // Make sure the peer's total difficulty we are synchronizing is higher.
+ if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
+ return
+ }
+ }
+
+ // Run the sync cycle, and disable fast sync if we've went past the pivot block
+ if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
+ return
+ }
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ log.Info("Fast sync complete, auto disabling")
+ atomic.StoreUint32(&pm.fastSync, 0)
+ }
+ atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
+ if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
+ // We've completed a sync cycle, notify all peers of new state. This path is
+ // essential in star-topology networks where a gateway node needs to notify
+ // all its out-of-date peers of the availability of a new block. This failure
+ // scenario will most often crop up in private and hackathon networks with
+ // degenerate connectivity, but it should be healthy for the mainnet too to
+ // more reliably update peers or the local TD state.
+ go pm.BroadcastBlock(head, false)
+ }
+}