aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/blockchain.go25
-rw-r--r--core/state/sync.go98
-rw-r--r--core/state/sync_test.go238
-rw-r--r--eth/downloader/downloader.go568
-rw-r--r--eth/downloader/downloader_test.go123
-rw-r--r--eth/downloader/metrics.go5
-rw-r--r--eth/downloader/peer.go107
-rw-r--r--eth/downloader/queue.go271
-rw-r--r--eth/downloader/types.go137
-rw-r--r--eth/handler.go25
-rw-r--r--eth/peer.go2
-rw-r--r--trie/sync.go233
-rw-r--r--trie/sync_test.go257
13 files changed, 1626 insertions, 463 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index b68e7d3ae..6c8a24751 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -37,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru"
)
@@ -246,6 +247,26 @@ func (bc *BlockChain) SetHead(head uint64) {
bc.loadLastState()
}
+// FastSyncCommitHead sets the current head block to the one defined by the hash
+// irrelevant what the chain contents were prior.
+func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
+ // Make sure that both the block as well at it's state trie exists
+ block := self.GetBlock(hash)
+ if block == nil {
+ return fmt.Errorf("non existent block [%x…]", hash[:4])
+ }
+ if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil {
+ return err
+ }
+ // If all checks out, manually set the head block
+ self.mu.Lock()
+ self.currentBlock = block
+ self.mu.Unlock()
+
+ glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
+ return nil
+}
+
func (self *BlockChain) GasLimit() *big.Int {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -721,10 +742,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
self.wg.Add(1)
defer self.wg.Done()
- // Make sure only one thread manipulates the chain at once
- self.chainmu.Lock()
- defer self.chainmu.Unlock()
-
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
start := time.Now()
diff --git a/core/state/sync.go b/core/state/sync.go
new file mode 100644
index 000000000..e9bebe8ee
--- /dev/null
+++ b/core/state/sync.go
@@ -0,0 +1,98 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package state
+
+import (
+ "bytes"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+type StateSync struct {
+ db ethdb.Database
+ sync *trie.TrieSync
+ codeReqs map[common.Hash]struct{} // requested but not yet written to database
+ codeReqList []common.Hash // requested since last Missing
+}
+
+var sha3_nil = common.BytesToHash(sha3.NewKeccak256().Sum(nil))
+
+func NewStateSync(root common.Hash, db ethdb.Database) *StateSync {
+ ss := &StateSync{
+ db: db,
+ codeReqs: make(map[common.Hash]struct{}),
+ }
+ ss.codeReqs[sha3_nil] = struct{}{} // never request the nil hash
+ ss.sync = trie.NewTrieSync(root, db, ss.leafFound)
+ return ss
+}
+
+func (self *StateSync) leafFound(leaf []byte, parent common.Hash) error {
+ var obj struct {
+ Nonce uint64
+ Balance *big.Int
+ Root common.Hash
+ CodeHash []byte
+ }
+ if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
+ return err
+ }
+ self.sync.AddSubTrie(obj.Root, 64, parent, nil)
+
+ codehash := common.BytesToHash(obj.CodeHash)
+ if _, ok := self.codeReqs[codehash]; !ok {
+ code, _ := self.db.Get(obj.CodeHash)
+ if code == nil {
+ self.codeReqs[codehash] = struct{}{}
+ self.codeReqList = append(self.codeReqList, codehash)
+ }
+ }
+ return nil
+}
+
+func (self *StateSync) Missing(max int) []common.Hash {
+ cr := len(self.codeReqList)
+ gh := 0
+ if max != 0 {
+ if cr > max {
+ cr = max
+ }
+ gh = max - cr
+ }
+ list := append(self.sync.Missing(gh), self.codeReqList[:cr]...)
+ self.codeReqList = self.codeReqList[cr:]
+ return list
+}
+
+func (self *StateSync) Process(list []trie.SyncResult) error {
+ for i := 0; i < len(list); i++ {
+ if _, ok := self.codeReqs[list[i].Hash]; ok { // code data, not a node
+ self.db.Put(list[i].Hash[:], list[i].Data)
+ delete(self.codeReqs, list[i].Hash)
+ list[i] = list[len(list)-1]
+ list = list[:len(list)-1]
+ i--
+ }
+ }
+ _, err := self.sync.Process(list)
+ return err
+}
diff --git a/core/state/sync_test.go b/core/state/sync_test.go
new file mode 100644
index 000000000..f6afe8bd8
--- /dev/null
+++ b/core/state/sync_test.go
@@ -0,0 +1,238 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package state
+
+import (
+ "bytes"
+ "math/big"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+// testAccount is the data associated with an account used by the state tests.
+type testAccount struct {
+ address common.Address
+ balance *big.Int
+ nonce uint64
+ code []byte
+}
+
+// makeTestState create a sample test state to test node-wise reconstruction.
+func makeTestState() (ethdb.Database, common.Hash, []*testAccount) {
+ // Create an empty state
+ db, _ := ethdb.NewMemDatabase()
+ state := New(common.Hash{}, db)
+
+ // Fill it with some arbitrary data
+ accounts := []*testAccount{}
+ for i := byte(0); i < 255; i++ {
+ obj := state.GetOrNewStateObject(common.BytesToAddress([]byte{i}))
+ acc := &testAccount{address: common.BytesToAddress([]byte{i})}
+
+ obj.AddBalance(big.NewInt(int64(11 * i)))
+ acc.balance = big.NewInt(int64(11 * i))
+
+ obj.SetNonce(uint64(42 * i))
+ acc.nonce = uint64(42 * i)
+
+ if i%3 == 0 {
+ obj.SetCode([]byte{i, i, i, i, i})
+ acc.code = []byte{i, i, i, i, i}
+ }
+ state.UpdateStateObject(obj)
+ accounts = append(accounts, acc)
+ }
+ root, _ := state.Commit()
+
+ // Return the generated state
+ return db, root, accounts
+}
+
+// checkStateAccounts cross references a reconstructed state with an expected
+// account array.
+func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) {
+ state := New(root, db)
+ for i, acc := range accounts {
+
+ if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 {
+ t.Errorf("account %d: balance mismatch: have %v, want %v", i, balance, acc.balance)
+ }
+ if nonce := state.GetNonce(acc.address); nonce != acc.nonce {
+ t.Errorf("account %d: nonce mismatch: have %v, want %v", i, nonce, acc.nonce)
+ }
+ if code := state.GetCode(acc.address); bytes.Compare(code, acc.code) != 0 {
+ t.Errorf("account %d: code mismatch: have %x, want %x", i, code, acc.code)
+ }
+ }
+}
+
+// Tests that an empty state is not scheduled for syncing.
+func TestEmptyStateSync(t *testing.T) {
+ empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
+ db, _ := ethdb.NewMemDatabase()
+ if req := NewStateSync(empty, db).Missing(1); len(req) != 0 {
+ t.Errorf("content requested for empty state: %v", req)
+ }
+}
+
+// Tests that given a root hash, a state can sync iteratively on a single thread,
+// requesting retrieval tasks and returning all of them in one go.
+func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) }
+func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) }
+
+func testIterativeStateSync(t *testing.T, batch int) {
+ // Create a random state to copy
+ srcDb, srcRoot, srcAccounts := makeTestState()
+
+ // Create a destination state and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewStateSync(srcRoot, dstDb)
+
+ queue := append([]common.Hash{}, sched.Missing(batch)...)
+ for len(queue) > 0 {
+ results := make([]trie.SyncResult, len(queue))
+ for i, hash := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results[i] = trie.SyncResult{hash, data}
+ }
+ if err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process results: %v", err)
+ }
+ queue = append(queue[:0], sched.Missing(batch)...)
+ }
+ // Cross check that the two states are in sync
+ checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
+}
+
+// Tests that the trie scheduler can correctly reconstruct the state even if only
+// partial results are returned, and the others sent only later.
+func TestIterativeDelayedStateSync(t *testing.T) {
+ // Create a random state to copy
+ srcDb, srcRoot, srcAccounts := makeTestState()
+
+ // Create a destination state and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewStateSync(srcRoot, dstDb)
+
+ queue := append([]common.Hash{}, sched.Missing(0)...)
+ for len(queue) > 0 {
+ // Sync only half of the scheduled nodes
+ results := make([]trie.SyncResult, len(queue)/2+1)
+ for i, hash := range queue[:len(results)] {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results[i] = trie.SyncResult{hash, data}
+ }
+ if err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process results: %v", err)
+ }
+ queue = append(queue[len(results):], sched.Missing(0)...)
+ }
+ // Cross check that the two states are in sync
+ checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
+}
+
+// Tests that given a root hash, a trie can sync iteratively on a single thread,
+// requesting retrieval tasks and returning all of them in one go, however in a
+// random order.
+func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) }
+func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) }
+
+func testIterativeRandomStateSync(t *testing.T, batch int) {
+ // Create a random state to copy
+ srcDb, srcRoot, srcAccounts := makeTestState()
+
+ // Create a destination state and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewStateSync(srcRoot, dstDb)
+
+ queue := make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(batch) {
+ queue[hash] = struct{}{}
+ }
+ for len(queue) > 0 {
+ // Fetch all the queued nodes in a random order
+ results := make([]trie.SyncResult, 0, len(queue))
+ for hash, _ := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results = append(results, trie.SyncResult{hash, data})
+ }
+ // Feed the retrieved results back and queue new tasks
+ if err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process results: %v", err)
+ }
+ queue = make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(batch) {
+ queue[hash] = struct{}{}
+ }
+ }
+ // Cross check that the two states are in sync
+ checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
+}
+
+// Tests that the trie scheduler can correctly reconstruct the state even if only
+// partial results are returned (Even those randomly), others sent only later.
+func TestIterativeRandomDelayedStateSync(t *testing.T) {
+ // Create a random state to copy
+ srcDb, srcRoot, srcAccounts := makeTestState()
+
+ // Create a destination state and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewStateSync(srcRoot, dstDb)
+
+ queue := make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(0) {
+ queue[hash] = struct{}{}
+ }
+ for len(queue) > 0 {
+ // Sync only half of the scheduled nodes, even those in random order
+ results := make([]trie.SyncResult, 0, len(queue)/2+1)
+ for hash, _ := range queue {
+ delete(queue, hash)
+
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results = append(results, trie.SyncResult{hash, data})
+
+ if len(results) >= cap(results) {
+ break
+ }
+ }
+ // Feed the retrieved results back and queue new tasks
+ if err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process results: %v", err)
+ }
+ for _, hash := range sched.Missing(0) {
+ queue[hash] = struct{}{}
+ }
+ }
+ // Cross check that the two states are in sync
+ checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
+}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 24ba3da17..96177ae8a 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -19,7 +19,6 @@ package downloader
import (
"errors"
- "fmt"
"math"
"math/big"
"strings"
@@ -29,9 +28,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/rcrowley/go-metrics"
)
var (
@@ -39,8 +40,8 @@ var (
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request
MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request
- MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
+ MaxStateFetch = 384 // Amount of node state values to allow fetching per request
hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
@@ -49,10 +50,13 @@ var (
bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a block body request is considered expired
+ receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
+ stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
+ maxQueuedStates = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection)
maxResultsProcess = 256 // Number of download results to import at once into the chain
headerCheckFrequency = 64 // Verification frequency of the downloaded headers during fast sync
@@ -84,98 +88,6 @@ var (
errNoSyncActive = errors.New("no sync active")
)
-// headerCheckFn is a callback type for verifying a header's presence in the local chain.
-type headerCheckFn func(common.Hash) bool
-
-// blockCheckFn is a callback type for verifying a block's presence in the local chain.
-type blockCheckFn func(common.Hash) bool
-
-// headerRetrievalFn is a callback type for retrieving a header from the local chain.
-type headerRetrievalFn func(common.Hash) *types.Header
-
-// blockRetrievalFn is a callback type for retrieving a block from the local chain.
-type blockRetrievalFn func(common.Hash) *types.Block
-
-// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
-type headHeaderRetrievalFn func() *types.Header
-
-// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
-type headBlockRetrievalFn func() *types.Block
-
-// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
-type headFastBlockRetrievalFn func() *types.Block
-
-// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
-type tdRetrievalFn func(common.Hash) *big.Int
-
-// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
-type headerChainInsertFn func([]*types.Header, bool) (int, error)
-
-// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
-type blockChainInsertFn func(types.Blocks) (int, error)
-
-// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
-type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
-
-// peerDropFn is a callback type for dropping a peer detected as malicious.
-type peerDropFn func(id string)
-
-// dataPack is a data message returned by a peer for some query.
-type dataPack interface {
- PeerId() string
- Empty() bool
- Stats() string
-}
-
-// hashPack is a batch of block hashes returned by a peer (eth/61).
-type hashPack struct {
- peerId string
- hashes []common.Hash
-}
-
-// blockPack is a batch of blocks returned by a peer (eth/61).
-type blockPack struct {
- peerId string
- blocks []*types.Block
-}
-
-// headerPack is a batch of block headers returned by a peer.
-type headerPack struct {
- peerId string
- headers []*types.Header
-}
-
-// bodyPack is a batch of block bodies returned by a peer.
-type bodyPack struct {
- peerId string
- transactions [][]*types.Transaction
- uncles [][]*types.Header
-}
-
-// PeerId retrieves the origin peer who sent this block body packet.
-func (p *bodyPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no block bodies were delivered.
-func (p *bodyPack) Empty() bool { return len(p.transactions) == 0 || len(p.uncles) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
-
-// receiptPack is a batch of receipts returned by a peer.
-type receiptPack struct {
- peerId string
- receipts [][]*types.Receipt
-}
-
-// PeerId retrieves the origin peer who sent this receipt packet.
-func (p *receiptPack) PeerId() string { return p.peerId }
-
-// Empty returns whether the no receipts were delivered.
-func (p *receiptPack) Empty() bool { return len(p.receipts) == 0 }
-
-// Stats creates a textual stats report for logging purposes.
-func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
-
type Downloader struct {
mode SyncMode // Synchronisation mode defining the strategies used
mux *event.TypeMux // Event multiplexer to announce sync operation events
@@ -186,23 +98,26 @@ type Downloader struct {
interrupt int32 // Atomic boolean to signal termination
// Statistics
- syncStatsOrigin uint64 // Origin block number where syncing started at
- syncStatsHeight uint64 // Highest block number known when syncing started
- syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
+ syncStatsChainOrigin uint64 // Origin block number where syncing started at
+ syncStatsChainHeight uint64 // Highest block number known when syncing started
+ syncStatsStateTotal uint64 // Total number of node state entries known so far
+ syncStatsStateDone uint64 // Number of state trie entries already pulled
+ syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlock blockCheckFn // Checks if a block is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
+ commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -212,14 +127,16 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
- hashCh chan hashPack // [eth/61] Channel receiving inbound hashes
- blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
- headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
- bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
- receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
- blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
- bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
- receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
+ blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
+ headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
+ bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
+ receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
+ stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
+ blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
+ bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
+ receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
+ stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
@@ -232,36 +149,40 @@ type Downloader struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
-func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
- headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn,
- insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn,
+ getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn,
+ commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
+ insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
return &Downloader{
- mode: mode,
- mux: mux,
- queue: newQueue(),
- peers: newPeerSet(),
- hasHeader: hasHeader,
- hasBlock: hasBlock,
- getHeader: getHeader,
- getBlock: getBlock,
- headHeader: headHeader,
- headBlock: headBlock,
- headFastBlock: headFastBlock,
- getTd: getTd,
- insertHeaders: insertHeaders,
- insertBlocks: insertBlocks,
- insertReceipts: insertReceipts,
- dropPeer: dropPeer,
- newPeerCh: make(chan *peer, 1),
- hashCh: make(chan hashPack, 1),
- blockCh: make(chan blockPack, 1),
- headerCh: make(chan headerPack, 1),
- bodyCh: make(chan dataPack, 1),
- receiptCh: make(chan dataPack, 1),
- blockWakeCh: make(chan bool, 1),
- bodyWakeCh: make(chan bool, 1),
- receiptWakeCh: make(chan bool, 1),
+ mode: mode,
+ mux: mux,
+ queue: newQueue(stateDb),
+ peers: newPeerSet(),
+ hasHeader: hasHeader,
+ hasBlock: hasBlock,
+ getHeader: getHeader,
+ getBlock: getBlock,
+ headHeader: headHeader,
+ headBlock: headBlock,
+ headFastBlock: headFastBlock,
+ commitHeadBlock: commitHeadBlock,
+ getTd: getTd,
+ insertHeaders: insertHeaders,
+ insertBlocks: insertBlocks,
+ insertReceipts: insertReceipts,
+ dropPeer: dropPeer,
+ newPeerCh: make(chan *peer, 1),
+ hashCh: make(chan dataPack, 1),
+ blockCh: make(chan dataPack, 1),
+ headerCh: make(chan dataPack, 1),
+ bodyCh: make(chan dataPack, 1),
+ receiptCh: make(chan dataPack, 1),
+ stateCh: make(chan dataPack, 1),
+ blockWakeCh: make(chan bool, 1),
+ bodyWakeCh: make(chan bool, 1),
+ receiptWakeCh: make(chan bool, 1),
+ stateWakeCh: make(chan bool, 1),
}
}
@@ -272,7 +193,7 @@ func (d *Downloader) Boundaries() (uint64, uint64) {
d.syncStatsLock.RLock()
defer d.syncStatsLock.RUnlock()
- return d.syncStatsOrigin, d.syncStatsHeight
+ return d.syncStatsChainOrigin, d.syncStatsChainHeight
}
// Synchronising returns whether the downloader is currently retrieving blocks.
@@ -284,10 +205,11 @@ func (d *Downloader) Synchronising() bool {
// used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
- getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn) error {
+ getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id)
- if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts)); err != nil {
+ if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err)
return err
}
@@ -357,12 +279,18 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset()
d.peers.Reset()
- for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case <-ch:
default:
}
}
+ // Reset and ephemeral sync statistics
+ d.syncStatsLock.Lock()
+ d.syncStatsStateTotal = 0
+ d.syncStatsStateDone = 0
+ d.syncStatsLock.Unlock()
+
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
@@ -414,17 +342,17 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, 1)
+ d.queue.Prepare(origin+1, d.mode, 0)
errc := make(chan error, 2)
go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
@@ -449,26 +377,27 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
return err
}
d.syncStatsLock.Lock()
- if d.syncStatsHeight <= origin || d.syncStatsOrigin > origin {
- d.syncStatsOrigin = origin
+ if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
+ d.syncStatsChainOrigin = origin
}
- d.syncStatsHeight = latest
+ d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent header and content retrieval algorithm
- parts := 1
- if d.mode == FastSync {
- parts = 2 // receipts are fetched too
+ pivot := uint64(0)
+ if latest > uint64(minFullBlocks) {
+ pivot = latest - uint64(minFullBlocks)
}
- d.queue.Prepare(origin+1, parts)
+ d.queue.Prepare(origin+1, d.mode, pivot)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 3)
+ errc := make(chan error, 4)
go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
+ go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal sync
+ go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
// If any fetcher fails, cancel the others
var fail error
@@ -538,14 +467,14 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.hashCh:
// Out of bounds hashes received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// Discard anything not from the origin peer
- if blockPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", blockPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- blocks := blockPack.blocks
+ blocks := packet.(*blockPack).blocks
if len(blocks) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
return 0, errBadPeer
@@ -584,14 +513,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("%v: empty head hash set", p)
return 0, errEmptyHashSet
@@ -639,14 +568,14 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Discard anything not from the origin peer
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- hashes := hashPack.hashes
+ hashes := packet.(*hashPack).hashes
if len(hashes) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
return 0, errBadPeer
@@ -716,17 +645,17 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case hashPack := <-d.hashCh:
+ case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
- if hashPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
hashReqTimer.UpdateSince(request)
timeout.Stop()
// If no more hashes are inbound, notify the block fetcher and return
- if len(hashPack.hashes) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
@@ -751,12 +680,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHashes = true
+ hashes := packet.(*hashPack).hashes
// Otherwise insert all the new hashes, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashPack.hashes), from)
+ glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
- inserts := d.queue.Schedule61(hashPack.hashes, true)
- if len(inserts) != len(hashPack.hashes) {
+ inserts := d.queue.Schedule61(hashes, true)
+ if len(inserts) != len(hashes) {
glog.V(logger.Debug).Infof("%v: stale hashes", p)
return errBadPeer
}
@@ -776,7 +706,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return nil
}
// Queue not yet full, fetch the next batch
- from += uint64(len(hashPack.hashes))
+ from += uint64(len(hashes))
getHashes(from)
case <-timeout.C:
@@ -813,16 +743,17 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.bodyCh:
// Out of bounds eth/62 block bodies received, ignore them
- case blockPack := <-d.blockCh:
+ case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
- if peer := d.peers.Peer(blockPack.peerId); peer != nil {
+ if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of blocks, and demote in case of errors
- err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks)
+ blocks := packet.(*blockPack).blocks
+ err := d.queue.DeliverBlocks(peer.id, blocks)
switch err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above)
- if len(blockPack.blocks) == 0 {
+ if len(blocks) == 0 {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
@@ -831,7 +762,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// All was successful, promote the peer and potentially start processing
peer.Promote()
peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
go d.process()
case errInvalidChain:
@@ -891,7 +822,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.Expire61(blockHardTTL) {
+ for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
@@ -907,7 +838,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
}
// Send a download request to all idle peers, until throttled
throttled := false
- idles, total := d.peers.BlockIdlePeers(61)
+ idles, total := d.peers.BlockIdlePeers()
for _, peer := range idles {
// Short circuit if throttling activated
@@ -918,7 +849,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
- request := d.queue.Reserve61(peer, peer.BlockCapacity())
+ request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
if request == nil {
continue
}
@@ -928,7 +859,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer.Fetch61(request); err != nil {
glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
- d.queue.Cancel61(request)
+ d.queue.CancelBlocks(request)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
@@ -954,14 +885,14 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
return 0, errBadPeer
@@ -1014,14 +945,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packet.(*headerPack).headers
if len(headers) == 0 {
glog.V(logger.Debug).Infof("%v: empty head header set", p)
return 0, errEmptyHeaderSet
@@ -1069,14 +1000,14 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelHashFetch
- case headerPack := <-d.headerCh:
+ case packer := <-d.headerCh:
// Discard anything not from the origin peer
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
+ if packer.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
break
}
// Make sure the peer actually gave something valid
- headers := headerPack.headers
+ headers := packer.(*headerPack).headers
if len(headers) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
return 0, errBadPeer
@@ -1150,20 +1081,20 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.blockCh:
// Out of bounds eth/61 blocks received, ignore them
- case headerPack := <-d.headerCh:
+ case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
- if headerPack.peerId != p.id {
- glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId)
+ if packet.PeerId() != p.id {
+ glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
break
}
headerReqTimer.UpdateSince(request)
timeout.Stop()
// If no more headers are inbound, notify the content fetchers and return
- if len(headerPack.headers) == 0 {
+ if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available headers", p)
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1187,26 +1118,27 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return nil
}
gotHeaders = true
+ headers := packet.(*headerPack).headers
// Otherwise insert all the new headers, aborting in case of junk
- glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
+ glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
if d.mode == FastSync || d.mode == LightSync {
- if n, err := d.insertHeaders(headerPack.headers, false); err != nil {
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ if n, err := d.insertHeaders(headers, false); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
return errInvalidChain
}
}
if d.mode == FullSync || d.mode == FastSync {
- inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
- if len(inserts) != len(headerPack.headers) {
+ inserts := d.queue.Schedule(headers, from)
+ if len(inserts) != len(headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
}
}
// Notify the content fetchers of new headers, but stop if queue is full
cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
if cont {
// We still have headers to fetch, send continuation wake signal (potential)
select {
@@ -1223,7 +1155,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
// Queue not yet full, fetch the next batch
- from += uint64(len(headerPack.headers))
+ from += uint64(len(headers))
getHeaders(from)
case <-timeout.C:
@@ -1233,7 +1165,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
d.dropPeer(p.id)
// Finish the sync gracefully instead of dumping the gathered data though
- for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
+ for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@@ -1251,19 +1183,19 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*bodyPack)
- return d.queue.DeliverBlocks(pack.peerId, pack.transactions, pack.uncles)
+ return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBlocks(bodyHardTTL) }
+ expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- getIdles = func() ([]*peer, int) { return d.peers.BlockIdlePeers(62) }
+ getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() }
setIdle = func(p *peer) { p.SetBlocksIdle() }
)
- err := d.fetchParts(from, errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
- d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBlocks, d.bodyFetchHook,
- fetch, d.queue.CancelBlocks, capacity, getIdles, setIdle, "Body")
+ err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
+ d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook,
+ fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
return err
@@ -1276,7 +1208,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet interface{}) error {
+ deliver = func(packet dataPack) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
@@ -1285,7 +1217,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peer) int { return p.ReceiptCapacity() }
setIdle = func(p *peer) { p.SetReceiptsIdle() }
)
- err := d.fetchParts(from, errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
+ err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
@@ -1293,10 +1225,46 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
+// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
+// available peers, reserving a chunk of nodes for each, waiting for delivery and
+// also periodically checking for timeouts.
+func (d *Downloader) fetchNodeData() error {
+ glog.V(logger.Debug).Infof("Downloading node state data")
+
+ var (
+ deliver = func(packet dataPack) error {
+ start := time.Now()
+ done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states)
+
+ d.syncStatsLock.Lock()
+ totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found)
+ d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown
+ d.syncStatsLock.Unlock()
+
+ glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start))
+ return err
+ }
+ expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ throttle = func() bool { return false }
+ reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
+ return d.queue.ReserveNodeData(p, count), false, nil
+ }
+ fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
+ capacity = func(p *peer) int { return p.NodeDataCapacity() }
+ setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ )
+ err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire,
+ d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData,
+ capacity, d.peers.ReceiptIdlePeers, setIdle, "State")
+
+ glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
+ return err
+}
+
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan dataPack, deliver func(packet interface{}) error, wakeCh chan bool,
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
@@ -1327,7 +1295,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
switch err := deliver(packet); err {
case nil:
// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Empty() {
+ if packet.Items() == 0 {
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
@@ -1441,7 +1409,11 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
continue
}
if glog.V(logger.Detail) {
- glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ if len(request.Headers) > 0 {
+ glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
+ } else {
+ glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
+ }
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if fetchHook != nil {
@@ -1528,7 +1500,9 @@ func (d *Downloader) process() {
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
case d.mode == FastSync:
blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
- receipts = append(receipts, result.Receipts)
+ if result.Header.Number.Uint64() <= d.queue.fastSyncPivot {
+ receipts = append(receipts, result.Receipts)
+ }
case d.mode == LightSync:
headers = append(headers, result.Header)
}
@@ -1539,12 +1513,16 @@ func (d *Downloader) process() {
index int
)
switch {
- case d.mode == FullSync:
- index, err = d.insertBlocks(blocks)
- case d.mode == FastSync:
- index, err = d.insertReceipts(blocks, receipts)
- case d.mode == LightSync:
+ case len(headers) > 0:
index, err = d.insertHeaders(headers, true)
+
+ case len(receipts) > 0:
+ index, err = d.insertReceipts(blocks, receipts)
+ if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot {
+ err = d.commitHeadBlock(blocks[len(blocks)-1].Hash())
+ }
+ default:
+ index, err = d.insertBlocks(blocks)
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
@@ -1557,125 +1535,47 @@ func (d *Downloader) process() {
}
}
-// DeliverHashes61 injects a new batch of hashes received from a remote node into
+// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
-func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- hashInMeter.Mark(int64(len(hashes)))
- defer func() {
- if err != nil {
- hashDropMeter.Mark(int64(len(hashes)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.hashCh <- hashPack{id, hashes}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
+ return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
}
-// DeliverBlocks61 injects a new batch of blocks received from a remote node.
+// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
-func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- blockInMeter.Mark(int64(len(blocks)))
- defer func() {
- if err != nil {
- blockDropMeter.Mark(int64(len(blocks)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.blockCh <- blockPack{id, blocks}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
+ return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
}
// DeliverHeaders injects a new batch of blck headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- headerInMeter.Mark(int64(len(headers)))
- defer func() {
- if err != nil {
- headerDropMeter.Mark(int64(len(headers)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.headerCh <- headerPack{id, headers}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
}
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
- // Update the delivery metrics for both good and failed deliveries
- bodyInMeter.Mark(int64(len(transactions)))
- defer func() {
- if err != nil {
- bodyDropMeter.Mark(int64(len(transactions)))
- }
- }()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
- // Deliver or abort if the sync is canceled while queuing
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
- select {
- case d.bodyCh <- &bodyPack{id, transactions, uncles}:
- return nil
-
- case <-cancel:
- return errNoSyncActive
- }
+ return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
+ return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
+}
+
+// DeliverNodeData injects a new batch of node state data received from a remote node.
+func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
+ return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
+}
+
+// deliver injects a new batch of data received from a remote node.
+func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
- receiptInMeter.Mark(int64(len(receipts)))
+ inMeter.Mark(int64(packet.Items()))
defer func() {
if err != nil {
- receiptDropMeter.Mark(int64(len(receipts)))
+ dropMeter.Mark(int64(packet.Items()))
}
}()
// Make sure the downloader is active
@@ -1688,7 +1588,7 @@ func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (er
d.cancelLock.RUnlock()
select {
- case d.receiptCh <- &receiptPack{id, receipts}:
+ case destCh <- packet:
return nil
case <-cancel:
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 68c4ca26e..8944ae4b0 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -27,11 +27,13 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/trie"
)
var (
@@ -115,6 +117,7 @@ func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts)
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
+ stateDb ethdb.Database
downloader *Downloader
ownHashes []common.Hash // Hash chain belonging to the tester
@@ -146,8 +149,10 @@ func newTester(mode SyncMode) *downloadTester {
peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
}
- tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock,
- tester.headHeader, tester.headBlock, tester.headFastBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer)
+ tester.stateDb, _ = ethdb.NewMemDatabase()
+ tester.downloader = New(mode, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader,
+ tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd,
+ tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer)
return tester
}
@@ -213,7 +218,7 @@ func (dl *downloadTester) headHeader() *types.Header {
return header
}
}
- return nil
+ return genesis.Header()
}
// headBlock retrieves the current head block from the canonical chain.
@@ -223,10 +228,12 @@ func (dl *downloadTester) headBlock() *types.Block {
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.getBlock(dl.ownHashes[i]); block != nil {
- return block
+ if _, err := dl.stateDb.Get(block.Root().Bytes()); err == nil {
+ return block
+ }
}
}
- return nil
+ return genesis
}
// headFastBlock retrieves the current head fast-sync block from the canonical chain.
@@ -236,12 +243,20 @@ func (dl *downloadTester) headFastBlock() *types.Block {
for i := len(dl.ownHashes) - 1; i >= 0; i-- {
if block := dl.getBlock(dl.ownHashes[i]); block != nil {
- if _, ok := dl.ownReceipts[block.Hash()]; ok {
- return block
- }
+ return block
}
}
- return nil
+ return genesis
+}
+
+// commitHeadBlock manually sets the head block to a given hash.
+func (dl *downloadTester) commitHeadBlock(hash common.Hash) error {
+ // For now only check that the state trie is correct
+ if block := dl.getBlock(hash); block != nil {
+ _, err := trie.NewSecure(block.Root(), dl.stateDb)
+ return err
+ }
+ return fmt.Errorf("non existent block: %x", hash[:4])
}
// getTd retrieves the block's total difficulty from the canonical chain.
@@ -283,6 +298,7 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
dl.ownHashes = append(dl.ownHashes, block.Hash())
dl.ownHeaders[block.Hash()] = block.Header()
dl.ownBlocks[block.Hash()] = block
+ dl.stateDb.Put(block.Root().Bytes(), []byte{})
dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()]
}
return len(blocks), nil
@@ -321,13 +337,13 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
var err error
switch version {
case 61:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil)
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil, nil)
case 62:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil)
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
case 63:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
case 64:
- err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay))
+ err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
}
if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy)
@@ -399,7 +415,7 @@ func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) fun
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
- dl.downloader.DeliverHashes61(id, result)
+ dl.downloader.DeliverHashes(id, result)
}()
return nil
}
@@ -424,7 +440,7 @@ func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) fun
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
- dl.downloader.DeliverHashes61(id, result)
+ dl.downloader.DeliverHashes(id, result)
}()
return nil
}
@@ -447,7 +463,7 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([
result = append(result, block)
}
}
- go dl.downloader.DeliverBlocks61(id, result)
+ go dl.downloader.DeliverBlocks(id, result)
return nil
}
@@ -553,17 +569,54 @@ func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func
}
}
+// peerGetNodeDataFn constructs a getNodeData method associated with a particular
+// peer in the download tester. The returned function can be used to retrieve
+// batches of node state data from the particularly requested peer.
+func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error {
+ return func(hashes []common.Hash) error {
+ time.Sleep(delay)
+
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ results := make([][]byte, 0, len(hashes))
+ for _, hash := range hashes {
+ if data, err := testdb.Get(hash.Bytes()); err == nil {
+ results = append(results, data)
+ }
+ }
+ go dl.downloader.DeliverNodeData(id, results)
+
+ return nil
+ }
+}
+
// assertOwnChain checks if the local chain contains the correct number of items
// of the various chain components.
func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
- headers, blocks, receipts := length, length, length
+ assertOwnForkedChain(t, tester, 1, []int{length})
+}
+
+// assertOwnForkedChain checks if the local forked chain contains the correct
+// number of items of the various chain components.
+func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, lengths []int) {
+ // Initialize the counters for the first fork
+ headers, blocks, receipts := lengths[0], lengths[0], lengths[0]-minFullBlocks
+ if receipts < 0 {
+ receipts = 1
+ }
+ // Update the counters for each subsequent fork
+ for _, length := range lengths[1:] {
+ headers += length - common
+ blocks += length - common
+ receipts += length - common - minFullBlocks
+ }
switch tester.downloader.mode {
case FullSync:
receipts = 1
case LightSync:
blocks, receipts = 1, 1
}
-
if hs := len(tester.ownHeaders); hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
}
@@ -573,6 +626,14 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
if rs := len(tester.ownReceipts); rs != receipts {
t.Fatalf("synchronised receipts mismatch: have %v, want %v", rs, receipts)
}
+ // Verify the state trie too for fast syncs
+ if tester.downloader.mode == FastSync {
+ if index := lengths[len(lengths)-1] - minFullBlocks - 1; index > 0 {
+ if statedb := state.New(tester.ownHeaders[tester.ownHashes[index]].Root, tester.stateDb); statedb == nil {
+ t.Fatalf("state reconstruction failed")
+ }
+ }
+ }
}
// Tests that simple synchronization against a canonical chain works correctly.
@@ -647,7 +708,9 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
- cached = receipts
+ if tester.downloader.queue.resultCache[receipts].Header.Number.Uint64() < tester.downloader.queue.fastSyncPivot {
+ cached = receipts
+ }
}
}
tester.downloader.queue.lock.RUnlock()
@@ -704,7 +767,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
if err := tester.sync("fork B", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- assertOwnChain(t, tester, common+2*fork+1)
+ assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork + 1})
}
// Tests that an inactive downloader will not accept incoming hashes and blocks.
@@ -712,10 +775,10 @@ func TestInactiveDownloader61(t *testing.T) {
tester := newTester(FullSync)
// Check that neither hashes nor blocks are accepted
- if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive {
+ if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
- if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive {
+ if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
}
@@ -809,14 +872,6 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
id := fmt.Sprintf("peer #%d", i)
tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
}
- // Synchronise with the middle peer and make sure half of the blocks were retrieved
- id := fmt.Sprintf("peer #%d", targetPeers/2)
- if err := tester.sync(id, nil); err != nil {
- t.Fatalf("failed to synchronise blocks: %v", err)
- }
- assertOwnChain(t, tester, len(tester.peerHashes[id]))
-
- // Synchronise with the best peer and make sure everything is retrieved
if err := tester.sync("peer #0", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -870,8 +925,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
- // Create a small enough block chain to download
- targetBlocks := blockCacheLimit - 15
+ // Create a block chain to download
+ targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
@@ -898,8 +953,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
bodiesNeeded++
}
}
- for _, receipt := range receipts {
- if mode == FastSync && len(receipt) > 0 {
+ for hash, receipt := range receipts {
+ if mode == FastSync && len(receipt) > 0 && headers[hash].Number.Uint64() <= uint64(targetBlocks-minFullBlocks) {
receiptsNeeded++
}
}
diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go
index 92acb6ba8..d6fcfa25c 100644
--- a/eth/downloader/metrics.go
+++ b/eth/downloader/metrics.go
@@ -47,4 +47,9 @@ var (
receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req")
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
+
+ stateInMeter = metrics.NewMeter("eth/downloader/states/in")
+ stateReqTimer = metrics.NewTimer("eth/downloader/states/req")
+ stateDropMeter = metrics.NewMeter("eth/downloader/states/drop")
+ stateTimeoutMeter = metrics.NewMeter("eth/downloader/states/timeout")
)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 5fc0db587..5011d5d46 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -41,6 +41,7 @@ type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error
+type stateFetcherFn func([]common.Hash) error
var (
errAlreadyFetching = errors.New("already fetching blocks from peer")
@@ -55,12 +56,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
+ stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- blockStarted time.Time // Time instance when the last block (body)fetch was started
- receiptStarted time.Time // Time instance when the last receipt fetch was started
+ blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
+ receiptCapacity int32 // Number of receipts allowed to fetch per request
+ stateCapacity int32 // Number of node data pieces allowed to fetch per request
+
+ blockStarted time.Time // Time instance when the last block (body)fetch was started
+ receiptStarted time.Time // Time instance when the last receipt fetch was started
+ stateStarted time.Time // Time instance when the last node data fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously)
@@ -73,6 +78,7 @@ type peer struct {
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
+ getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
}
@@ -82,12 +88,13 @@ type peer struct {
func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
- getReceipts receiptFetcherFn) *peer {
+ getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
id: id,
head: head,
blockCapacity: 1,
receiptCapacity: 1,
+ stateCapacity: 1,
ignored: set.New(),
getRelHashes: getRelHashes,
@@ -99,6 +106,7 @@ func newPeer(id string, version int, head common.Hash,
getBlockBodies: getBlockBodies,
getReceipts: getReceipts,
+ getNodeData: getNodeData,
version: version,
}
@@ -110,6 +118,7 @@ func (p *peer) Reset() {
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1)
+ atomic.StoreInt32(&p.stateCapacity, 1)
p.ignored.Clear()
}
@@ -167,6 +176,24 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
return nil
}
+// FetchNodeData sends a node state data retrieval request to the remote peer.
+func (p *peer) FetchNodeData(request *fetchRequest) error {
+ // Short circuit if the peer is already fetching
+ if !atomic.CompareAndSwapInt32(&p.stateIdle, 0, 1) {
+ return errAlreadyFetching
+ }
+ p.stateStarted = time.Now()
+
+ // Convert the hash set to a retrievable slice
+ hashes := make([]common.Hash, 0, len(request.Hashes))
+ for hash, _ := range request.Hashes {
+ hashes = append(hashes, hash)
+ }
+ go p.getNodeData(hashes)
+
+ return nil
+}
+
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -188,6 +215,13 @@ func (p *peer) SetReceiptsIdle() {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
}
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
+// requests. Its node data retrieval allowance will also be updated either up- or
+// downwards, depending on whether the previous fetch completed in time or not.
+func (p *peer) SetNodeDataIdle() {
+ p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+}
+
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
@@ -230,6 +264,12 @@ func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity))
}
+// NodeDataCapacity retrieves the peers block download allowance based on its
+// previously discovered bandwidth capacity.
+func (p *peer) NodeDataCapacity() int {
+ return int(atomic.LoadInt32(&p.stateCapacity))
+}
+
// Promote increases the peer's reputation.
func (p *peer) Promote() {
atomic.AddInt32(&p.rep, 1)
@@ -340,39 +380,50 @@ func (ps *peerSet) AllPeers() []*peer {
// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
-func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- idle, total := make([]*peer, 0, len(ps.peers)), 0
- for _, p := range ps.peers {
- if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
- if atomic.LoadInt32(&p.blockIdle) == 0 {
- idle = append(idle, p)
- }
- total++
- }
+func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- for i := 0; i < len(idle); i++ {
- for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
- idle[i], idle[j] = idle[j], idle[i]
- }
- }
+ return ps.idlePeers(61, 61, idle)
+}
+
+// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
+// the active peer set, ordered by their reputation.
+func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.blockIdle) == 0
}
- return idle, total
+ return ps.idlePeers(62, 64, idle)
}
-// ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
-// active peer set, ordered by their reputation.
+// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
+// within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.receiptIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
+// peers within the active peer set, ordered by their reputation.
+func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
+ idle := func(p *peer) bool {
+ return atomic.LoadInt32(&p.stateIdle) == 0
+ }
+ return ps.idlePeers(63, 64, idle)
+}
+
+// idlePeers retrieves a flat list of all currently idle peers satisfying the
+// protocol version constraints, using the provided function to check idleness.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
- if p.version >= 63 {
- if atomic.LoadInt32(&p.receiptIdle) == 0 {
+ if p.version >= minProtocol && p.version <= maxProtocol {
+ if idleCheck(p) {
idle = append(idle, p)
}
total++
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index c53ad939e..942ed0d63 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -26,9 +26,13 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/trie"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
@@ -39,13 +43,14 @@ var (
var (
errNoFetchesPending = errors.New("no fetches pending")
+ errStateSyncPending = errors.New("state trie sync already scheduled")
errStaleDelivery = errors.New("stale delivery")
)
// fetchRequest is a currently running data retrieval operation.
type fetchRequest struct {
Peer *peer // Peer to which the request was sent
- Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority)
+ Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made
}
@@ -64,6 +69,9 @@ type fetchResult struct {
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
+ mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
+ fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
+
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
@@ -80,15 +88,22 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
+ stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order
+ stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority
+ stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
+ statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
+
+ stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
+ stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
+
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block-chain
- resultParts int // Number of fetch components required to complete an item
lock sync.RWMutex
}
// newQueue creates a new download queue for scheduling block retrieval.
-func newQueue() *queue {
+func newQueue(stateDb ethdb.Database) *queue {
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -100,6 +115,10 @@ func newQueue() *queue {
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
+ stateTaskPool: make(map[common.Hash]int),
+ stateTaskQueue: prque.New(),
+ statePendPool: make(map[string]*fetchRequest),
+ stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
}
}
@@ -109,6 +128,9 @@ func (q *queue) Reset() {
q.lock.Lock()
defer q.lock.Unlock()
+ q.mode = FullSync
+ q.fastSyncPivot = 0
+
q.hashPool = make(map[common.Hash]int)
q.hashQueue.Reset()
q.hashCounter = 0
@@ -125,9 +147,14 @@ func (q *queue) Reset() {
q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
+ q.stateTaskIndex = 0
+ q.stateTaskPool = make(map[common.Hash]int)
+ q.stateTaskQueue.Reset()
+ q.statePendPool = make(map[string]*fetchRequest)
+ q.stateScheduler = nil
+
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
- q.resultParts = 0
}
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
@@ -146,12 +173,20 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
+// PendingNodeData retrieves the number of node data entries pending for retrieval.
+func (q *queue) PendingNodeData() int {
+ q.lock.RLock()
+ defer q.lock.RUnlock()
+
+ return q.stateTaskQueue.Size()
+}
+
// InFlight retrieves the number of fetch requests currently in flight.
func (q *queue) InFlight() int {
q.lock.RLock()
defer q.lock.RUnlock()
- return len(q.blockPendPool) + len(q.receiptPendPool)
+ return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
}
// Idle returns if the queue is fully idle or has some data still inside. This
@@ -160,8 +195,8 @@ func (q *queue) Idle() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
- pending := len(q.blockPendPool) + len(q.receiptPendPool)
+ queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
+ pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
return (queued + pending + cached) == 0
@@ -227,7 +262,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
-func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header {
+func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
@@ -256,10 +291,21 @@ func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
- if receipts {
+
+ if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ // Fast phase of the fast sync, retrieve receipts too
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot {
+ // Pivoting point of the fast sync, retrieve the state tries
+ q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
+ for _, hash := range q.stateScheduler.Missing(0) {
+ q.stateTaskPool[hash] = q.stateTaskIndex
+ q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
+ q.stateTaskIndex++
+ }
+ }
inserts = append(inserts, header)
q.headerHead = hash
from++
@@ -279,6 +325,9 @@ func (q *queue) GetHeadResult() *fetchResult {
if q.resultCache[0].Pending > 0 {
return nil
}
+ if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
+ return nil
+ }
return q.resultCache[0]
}
@@ -291,9 +340,18 @@ func (q *queue) TakeResults() []*fetchResult {
// Accumulate all available results
results := []*fetchResult{}
for _, result := range q.resultCache {
+ // Stop if no more results are ready
if result == nil || result.Pending > 0 {
break
}
+ // The fast sync pivot block may only be processed after state fetch completes
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 {
+ break
+ }
+ // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
+ if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
+ break
+ }
results = append(results, result)
hash := result.Header.Hash()
@@ -312,31 +370,45 @@ func (q *queue) TakeResults() []*fetchResult {
return results
}
-// Reserve61 reserves a set of hashes for the given peer, skipping any previously
-// failed download.
-func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
+// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
+// previously failed download.
+func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
+ return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
+}
+
+// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
+// any previously failed download.
+func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
+ return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0)
+}
+
+// reserveHashes reserves a set of hashes for the given peer, skipping previously
+// failed ones.
+func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
- if q.hashQueue.Empty() {
+ if taskQueue.Empty() {
return nil
}
- if _, ok := q.blockPendPool[p.id]; ok {
+ if _, ok := pendPool[p.id]; ok {
return nil
}
// Calculate an upper limit on the hashes we might fetch (i.e. throttling)
- space := len(q.resultCache) - len(q.blockDonePool)
- for _, request := range q.blockPendPool {
- space -= len(request.Hashes)
+ allowance := maxPending
+ if allowance > 0 {
+ for _, request := range pendPool {
+ allowance -= len(request.Hashes)
+ }
}
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
- for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ {
- hash, priority := q.hashQueue.Pop()
+ for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
+ hash, priority := taskQueue.Pop()
if p.ignored.Has(hash) {
skip[hash.(common.Hash)] = int(priority)
} else {
@@ -345,7 +417,7 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
}
// Merge all the skipped hashes back
for hash, index := range skip {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
// Assemble and return the block download request
if len(send) == 0 {
@@ -356,19 +428,19 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
Hashes: send,
Time: time.Now(),
}
- q.blockPendPool[p.id] = request
+ pendPool[p.id] = request
return request
}
-// ReserveBlocks reserves a set of body fetches for the given peer, skipping any
+// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
-func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) {
+func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
noop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
- return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
+ return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
}
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
@@ -378,13 +450,13 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error)
noop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash
}
- return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
+ return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
}
-// reserveFetch reserves a set of data download operations for a given peer,
+// reserveHeaders reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
-func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -416,8 +488,12 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
+ components := 1
+ if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot {
+ components = 2
+ }
q.resultCache[index] = &fetchResult{
- Pending: q.resultParts,
+ Pending: components,
Header: header,
}
}
@@ -456,30 +532,36 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types
return request, progress, nil
}
-// Cancel61 aborts a fetch request, returning all pending hashes to the queue.
-func (q *queue) Cancel61(request *fetchRequest) {
- q.cancel(request, nil, q.blockPendPool)
+// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
+func (q *queue) CancelBlocks(request *fetchRequest) {
+ q.cancel(request, q.hashQueue, q.blockPendPool)
}
-// CancelBlocks aborts a body fetch request, returning all pending hashes to the
+// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
-func (q *queue) CancelBlocks(request *fetchRequest) {
+func (q *queue) CancelBodies(request *fetchRequest) {
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
-// CancelReceipts aborts a body fetch request, returning all pending hashes to
+// CancelReceipts aborts a body fetch request, returning all pending headers to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
+// CancelNodeData aborts a node state data fetch request, returning all pending
+// hashes to the task queue.
+func (q *queue) CancelNodeData(request *fetchRequest) {
+ q.cancel(request, q.stateTaskQueue, q.statePendPool)
+}
+
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
@@ -509,29 +591,41 @@ func (q *queue) Revoke(peerId string) {
}
delete(q.receiptPendPool, peerId)
}
+ if request, ok := q.statePendPool[peerId]; ok {
+ for hash, index := range request.Hashes {
+ q.stateTaskQueue.Push(hash, float32(index))
+ }
+ delete(q.statePendPool, peerId)
+ }
}
-// Expire61 checks for in flight requests that exceeded a timeout allowance,
+// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalization.
-func (q *queue) Expire61(timeout time.Duration) []string {
- return q.expire(timeout, q.blockPendPool, nil)
+func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter)
}
-// ExpireBlocks checks for in flight block body requests that exceeded a timeout
+// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
- return q.expire(timeout, q.blockPendPool, q.blockTaskQueue)
+func (q *queue) ExpireBodies(timeout time.Duration) []string {
+ return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
}
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
func (q *queue) ExpireReceipts(timeout time.Duration) []string {
- return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue)
+ return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
+}
+
+// ExpireNodeData checks for in flight node data requests that exceeded a timeout
+// allowance, canceling them and returning the responsible peers for penalization.
+func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+ return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter)
}
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
q.lock.Lock()
defer q.lock.Unlock()
@@ -540,14 +634,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
- if len(request.Hashes) > 0 {
- blockTimeoutMeter.Mark(1)
- } else {
- bodyTimeoutMeter.Mark(1)
- }
+ timeoutMeter.Mark(1)
+
// Return any non satisfied requests to the pool
for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
+ taskQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
@@ -562,8 +653,8 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
return peers
}
-// Deliver61 injects a block retrieval response into the download queue.
-func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
+// DeliverBlocks injects a block retrieval response into the download queue.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
q.lock.Lock()
defer q.lock.Unlock()
@@ -626,8 +717,8 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
}
}
-// DeliverBlocks injects a block (body) retrieval response into the results queue.
-func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// DeliverBodies injects a block body retrieval response into the results queue.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
return errInvalidBody
@@ -717,14 +808,84 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
}
+// DeliverNodeData injects a node state data retrieval response into the queue.
+func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Short circuit if the data was never requested
+ request := q.statePendPool[id]
+ if request == nil {
+ return 0, 0, errNoFetchesPending
+ }
+ stateReqTimer.UpdateSince(request.Time)
+ delete(q.statePendPool, id)
+
+ // If no data was retrieved, mark them as unavailable for the origin peer
+ if len(data) == 0 {
+ for hash, _ := range request.Hashes {
+ request.Peer.ignored.Add(hash)
+ }
+ }
+ // Iterate over the downloaded data and verify each of them
+ errs := make([]error, 0)
+ processed := 0
+ for _, blob := range data {
+ // Skip any blocks that were not requested
+ hash := common.BytesToHash(crypto.Sha3(blob))
+ if _, ok := request.Hashes[hash]; !ok {
+ errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
+ continue
+ }
+ // Inject the next state trie item into the database
+ if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil {
+ errs = []error{err}
+ break
+ }
+ processed++
+
+ delete(request.Hashes, hash)
+ delete(q.stateTaskPool, hash)
+ }
+ // Return all failed or missing fetches to the queue
+ for hash, index := range request.Hashes {
+ q.stateTaskQueue.Push(hash, float32(index))
+ }
+ // Also enqueue any newly required state trie nodes
+ discovered := 0
+ if len(q.stateTaskPool) < maxQueuedStates {
+ for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) {
+ q.stateTaskPool[hash] = q.stateTaskIndex
+ q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex))
+ q.stateTaskIndex++
+ discovered++
+ }
+ }
+ // If none of the data items were good, it's a stale delivery
+ switch {
+ case len(errs) == 0:
+ return processed, discovered, nil
+
+ case len(errs) == len(request.Hashes):
+ return processed, discovered, errStaleDelivery
+
+ default:
+ return processed, discovered, fmt.Errorf("multiple failures: %v", errs)
+ }
+}
+
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
-func (q *queue) Prepare(offset uint64, parts int) {
+func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) {
q.lock.Lock()
defer q.lock.Unlock()
if q.resultOffset < offset {
q.resultOffset = offset
}
- q.resultParts = parts
+ q.fastSyncPivot = 0
+ if mode == FastSync {
+ q.fastSyncPivot = pivot
+ }
+ q.mode = mode
}
diff --git a/eth/downloader/types.go b/eth/downloader/types.go
new file mode 100644
index 000000000..221ef38f6
--- /dev/null
+++ b/eth/downloader/types.go
@@ -0,0 +1,137 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package downloader
+
+import (
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+)
+
+// headerCheckFn is a callback type for verifying a header's presence in the local chain.
+type headerCheckFn func(common.Hash) bool
+
+// blockCheckFn is a callback type for verifying a block's presence in the local chain.
+type blockCheckFn func(common.Hash) bool
+
+// headerRetrievalFn is a callback type for retrieving a header from the local chain.
+type headerRetrievalFn func(common.Hash) *types.Header
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// headHeaderRetrievalFn is a callback type for retrieving the head header from the local chain.
+type headHeaderRetrievalFn func() *types.Header
+
+// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
+type headBlockRetrievalFn func() *types.Block
+
+// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
+type headFastBlockRetrievalFn func() *types.Block
+
+// headBlockCommitterFn is a callback for directly committing the head block to a certain entity.
+type headBlockCommitterFn func(common.Hash) error
+
+// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
+type tdRetrievalFn func(common.Hash) *big.Int
+
+// headerChainInsertFn is a callback type to insert a batch of headers into the local chain.
+type headerChainInsertFn func([]*types.Header, bool) (int, error)
+
+// blockChainInsertFn is a callback type to insert a batch of blocks into the local chain.
+type blockChainInsertFn func(types.Blocks) (int, error)
+
+// receiptChainInsertFn is a callback type to insert a batch of receipts into the local chain.
+type receiptChainInsertFn func(types.Blocks, []types.Receipts) (int, error)
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
+
+// dataPack is a data message returned by a peer for some query.
+type dataPack interface {
+ PeerId() string
+ Items() int
+ Stats() string
+}
+
+// hashPack is a batch of block hashes returned by a peer (eth/61).
+type hashPack struct {
+ peerId string
+ hashes []common.Hash
+}
+
+func (p *hashPack) PeerId() string { return p.peerId }
+func (p *hashPack) Items() int { return len(p.hashes) }
+func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) }
+
+// blockPack is a batch of blocks returned by a peer (eth/61).
+type blockPack struct {
+ peerId string
+ blocks []*types.Block
+}
+
+func (p *blockPack) PeerId() string { return p.peerId }
+func (p *blockPack) Items() int { return len(p.blocks) }
+func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) }
+
+// headerPack is a batch of block headers returned by a peer.
+type headerPack struct {
+ peerId string
+ headers []*types.Header
+}
+
+func (p *headerPack) PeerId() string { return p.peerId }
+func (p *headerPack) Items() int { return len(p.headers) }
+func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) }
+
+// bodyPack is a batch of block bodies returned by a peer.
+type bodyPack struct {
+ peerId string
+ transactions [][]*types.Transaction
+ uncles [][]*types.Header
+}
+
+func (p *bodyPack) PeerId() string { return p.peerId }
+func (p *bodyPack) Items() int {
+ if len(p.transactions) <= len(p.uncles) {
+ return len(p.transactions)
+ }
+ return len(p.uncles)
+}
+func (p *bodyPack) Stats() string { return fmt.Sprintf("%d:%d", len(p.transactions), len(p.uncles)) }
+
+// receiptPack is a batch of receipts returned by a peer.
+type receiptPack struct {
+ peerId string
+ receipts [][]*types.Receipt
+}
+
+func (p *receiptPack) PeerId() string { return p.peerId }
+func (p *receiptPack) Items() int { return len(p.receipts) }
+func (p *receiptPack) Stats() string { return fmt.Sprintf("%d", len(p.receipts)) }
+
+// statePack is a batch of states returned by a peer.
+type statePack struct {
+ peerId string
+ states [][]byte
+}
+
+func (p *statePack) PeerId() string { return p.peerId }
+func (p *statePack) Items() int { return len(p.states) }
+func (p *statePack) Stats() string { return fmt.Sprintf("%d", len(p.states)) }
diff --git a/eth/handler.go b/eth/handler.go
index 1117cb1b7..b0916d50b 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -129,9 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP
case LightMode:
syncMode = downloader.LightSync
}
- manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
- blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd,
- blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
+ manager.downloader = downloader.New(syncMode, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
+ blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead,
+ blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
@@ -220,8 +220,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
- p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
- p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil {
+ p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash,
+ p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@@ -307,7 +307,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
break
}
// Deliver them all to the downloader for queuing
- err := pm.downloader.DeliverHashes61(p.id, hashes)
+ err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
@@ -353,7 +353,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
- pm.downloader.DeliverBlocks61(p.id, blocks)
+ pm.downloader.DeliverBlocks(p.id, blocks)
}
// Block header query, collect the requested headers and reply
@@ -515,6 +515,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendNodeData(data)
+ case p.version >= eth63 && msg.Code == NodeDataMsg:
+ // A batch of node state data arrived to one of our previous requests
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
+ glog.V(logger.Debug).Infof("failed to deliver node state data: %v", err)
+ }
+
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
diff --git a/eth/peer.go b/eth/peer.go
index e24be97f1..68ce903a6 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -191,7 +191,7 @@ func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies)
}
-// SendNodeData sends a batch of arbitrary internal data, corresponding to the
+// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, data)
diff --git a/trie/sync.go b/trie/sync.go
new file mode 100644
index 000000000..65cfd6ed8
--- /dev/null
+++ b/trie/sync.go
@@ -0,0 +1,233 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package trie
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/common"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+)
+
+// request represents a scheduled or already in-flight state retrieval request.
+type request struct {
+ hash common.Hash // Hash of the node data content to retrieve
+ data []byte // Data content of the node, cached until all subtrees complete
+ object *node // Target node to populate with retrieved data (hashnode originally)
+
+ parents []*request // Parent state nodes referencing this entry (notify all upon completion)
+ depth int // Depth level within the trie the node is located to prioritize DFS
+ deps int // Number of dependencies before allowed to commit this node
+
+ callback TrieSyncLeafCallback // Callback to invoke if a leaf node it reached on this branch
+}
+
+// SyncResult is a simple list to return missing nodes along with their request
+// hashes.
+type SyncResult struct {
+ Hash common.Hash // Hash of the originally unknown trie node
+ Data []byte // Data content of the retrieved node
+}
+
+// TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a
+// leaf node. It's used by state syncing to check if the leaf node requires some
+// further data syncing.
+type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error
+
+// TrieSync is the main state trie synchronisation scheduler, which provides yet
+// unknown trie hashes to retrieve, accepts node data associated with said hashes
+// and reconstructs the trie steb by step until all is done.
+type TrieSync struct {
+ database Database // State database for storing all the assembled node data
+ requests map[common.Hash]*request // Pending requests pertaining to a key hash
+ queue *prque.Prque // Priority queue with the pending requests
+}
+
+// NewTrieSync creates a new trie data download scheduler.
+func NewTrieSync(root common.Hash, database Database, callback TrieSyncLeafCallback) *TrieSync {
+ ts := &TrieSync{
+ database: database,
+ requests: make(map[common.Hash]*request),
+ queue: prque.New(),
+ }
+ ts.AddSubTrie(root, 0, common.Hash{}, callback)
+ return ts
+}
+
+// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
+func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback TrieSyncLeafCallback) {
+ // Short circuit if the trie is empty
+ if root == emptyRoot {
+ return
+ }
+ // Assemble the new sub-trie sync request
+ node := node(hashNode(root.Bytes()))
+ req := &request{
+ object: &node,
+ hash: root,
+ depth: depth,
+ callback: callback,
+ }
+ // If this sub-trie has a designated parent, link them together
+ if parent != (common.Hash{}) {
+ ancestor := s.requests[parent]
+ if ancestor == nil {
+ panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
+ }
+ ancestor.deps++
+ req.parents = append(req.parents, ancestor)
+ }
+ s.schedule(req)
+}
+
+// Missing retrieves the known missing nodes from the trie for retrieval.
+func (s *TrieSync) Missing(max int) []common.Hash {
+ requests := []common.Hash{}
+ for !s.queue.Empty() && (max == 0 || len(requests) < max) {
+ requests = append(requests, s.queue.PopItem().(common.Hash))
+ }
+ return requests
+}
+
+// Process injects a batch of retrieved trie nodes data.
+func (s *TrieSync) Process(results []SyncResult) (int, error) {
+ for i, item := range results {
+ // If the item was not requested, bail out
+ request := s.requests[item.Hash]
+ if request == nil {
+ return i, fmt.Errorf("not requested: %x", item.Hash)
+ }
+ // Decode the node data content and update the request
+ node, err := decodeNode(item.Data)
+ if err != nil {
+ return i, err
+ }
+ *request.object = node
+ request.data = item.Data
+
+ // Create and schedule a request for all the children nodes
+ requests, err := s.children(request)
+ if err != nil {
+ return i, err
+ }
+ if len(requests) == 0 && request.deps == 0 {
+ s.commit(request)
+ continue
+ }
+ request.deps += len(requests)
+ for _, child := range requests {
+ s.schedule(child)
+ }
+ }
+ return 0, nil
+}
+
+// schedule inserts a new state retrieval request into the fetch queue. If there
+// is already a pending request for this node, the new request will be discarded
+// and only a parent reference added to the old one.
+func (s *TrieSync) schedule(req *request) {
+ // If we're already requesting this node, add a new reference and stop
+ if old, ok := s.requests[req.hash]; ok {
+ old.parents = append(old.parents, req.parents...)
+ return
+ }
+ // Schedule the request for future retrieval
+ s.queue.Push(req.hash, float32(req.depth))
+ s.requests[req.hash] = req
+}
+
+// children retrieves all the missing children of a state trie entry for future
+// retrieval scheduling.
+func (s *TrieSync) children(req *request) ([]*request, error) {
+ // Gather all the children of the node, irrelevant whether known or not
+ type child struct {
+ node *node
+ depth int
+ }
+ children := []child{}
+
+ switch node := (*req.object).(type) {
+ case shortNode:
+ children = []child{{
+ node: &node.Val,
+ depth: req.depth + len(node.Key),
+ }}
+ case fullNode:
+ for i := 0; i < 17; i++ {
+ if node[i] != nil {
+ children = append(children, child{
+ node: &node[i],
+ depth: req.depth + 1,
+ })
+ }
+ }
+ default:
+ panic(fmt.Sprintf("unknown node: %+v", node))
+ }
+ // Iterate over the children, and request all unknown ones
+ requests := make([]*request, 0, len(children))
+ for _, child := range children {
+ // Notify any external watcher of a new key/value node
+ if req.callback != nil {
+ if node, ok := (*child.node).(valueNode); ok {
+ if err := req.callback(node, req.hash); err != nil {
+ return nil, err
+ }
+ }
+ }
+ // If the child references another node, resolve or schedule
+ if node, ok := (*child.node).(hashNode); ok {
+ // Try to resolve the node from the local database
+ blob, _ := s.database.Get(node)
+ if local, err := decodeNode(blob); local != nil && err == nil {
+ *child.node = local
+ continue
+ }
+ // Locally unknown node, schedule for retrieval
+ requests = append(requests, &request{
+ object: child.node,
+ hash: common.BytesToHash(node),
+ parents: []*request{req},
+ depth: child.depth,
+ callback: req.callback,
+ })
+ }
+ }
+ return requests, nil
+}
+
+// commit finalizes a retrieval request and stores it into the database. If any
+// of the referencing parent requests complete due to this commit, they are also
+// committed themselves.
+func (s *TrieSync) commit(req *request) error {
+ // Write the node content to disk
+ if err := s.database.Put(req.hash[:], req.data); err != nil {
+ return err
+ }
+ delete(s.requests, req.hash)
+
+ // Check all parents for completion
+ for _, parent := range req.parents {
+ parent.deps--
+ if parent.deps == 0 {
+ if err := s.commit(parent); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/trie/sync_test.go b/trie/sync_test.go
new file mode 100644
index 000000000..9c036a3a9
--- /dev/null
+++ b/trie/sync_test.go
@@ -0,0 +1,257 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package trie
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+// makeTestTrie create a sample test trie to test node-wise reconstruction.
+func makeTestTrie() (ethdb.Database, *Trie, map[string][]byte) {
+ // Create an empty trie
+ db, _ := ethdb.NewMemDatabase()
+ trie, _ := New(common.Hash{}, db)
+
+ // Fill it with some arbitrary data
+ content := make(map[string][]byte)
+ for i := byte(0); i < 255; i++ {
+ key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i}
+ content[string(key)] = val
+ trie.Update(key, val)
+
+ key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i}
+ content[string(key)] = val
+ trie.Update(key, val)
+ }
+ trie.Commit()
+
+ // Return the generated trie
+ return db, trie, content
+}
+
+// checkTrieContents cross references a reconstructed trie with an expected data
+// content map.
+func checkTrieContents(t *testing.T, db Database, root []byte, content map[string][]byte) {
+ trie, err := New(common.BytesToHash(root), db)
+ if err != nil {
+ t.Fatalf("failed to create trie at %x: %v", root, err)
+ }
+ for key, val := range content {
+ if have := trie.Get([]byte(key)); bytes.Compare(have, val) != 0 {
+ t.Errorf("entry %x: content mismatch: have %x, want %x", key, have, val)
+ }
+ }
+}
+
+// Tests that an empty trie is not scheduled for syncing.
+func TestEmptyTrieSync(t *testing.T) {
+ emptyA, _ := New(common.Hash{}, nil)
+ emptyB, _ := New(emptyRoot, nil)
+
+ for i, trie := range []*Trie{emptyA, emptyB} {
+ db, _ := ethdb.NewMemDatabase()
+ if req := NewTrieSync(common.BytesToHash(trie.Root()), db, nil).Missing(1); len(req) != 0 {
+ t.Errorf("test %d: content requested for empty trie: %v", i, req)
+ }
+ }
+}
+
+// Tests that given a root hash, a trie can sync iteratively on a single thread,
+// requesting retrieval tasks and returning all of them in one go.
+func TestIterativeTrieSyncIndividual(t *testing.T) { testIterativeTrieSync(t, 1) }
+func TestIterativeTrieSyncBatched(t *testing.T) { testIterativeTrieSync(t, 100) }
+
+func testIterativeTrieSync(t *testing.T, batch int) {
+ // Create a random trie to copy
+ srcDb, srcTrie, srcData := makeTestTrie()
+
+ // Create a destination trie and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
+
+ queue := append([]common.Hash{}, sched.Missing(batch)...)
+ for len(queue) > 0 {
+ results := make([]SyncResult, len(queue))
+ for i, hash := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results[i] = SyncResult{hash, data}
+ }
+ if index, err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process result #%d: %v", index, err)
+ }
+ queue = append(queue[:0], sched.Missing(batch)...)
+ }
+ // Cross check that the two tries re in sync
+ checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
+}
+
+// Tests that the trie scheduler can correctly reconstruct the state even if only
+// partial results are returned, and the others sent only later.
+func TestIterativeDelayedTrieSync(t *testing.T) {
+ // Create a random trie to copy
+ srcDb, srcTrie, srcData := makeTestTrie()
+
+ // Create a destination trie and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
+
+ queue := append([]common.Hash{}, sched.Missing(10000)...)
+ for len(queue) > 0 {
+ // Sync only half of the scheduled nodes
+ results := make([]SyncResult, len(queue)/2+1)
+ for i, hash := range queue[:len(results)] {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results[i] = SyncResult{hash, data}
+ }
+ if index, err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process result #%d: %v", index, err)
+ }
+ queue = append(queue[len(results):], sched.Missing(10000)...)
+ }
+ // Cross check that the two tries re in sync
+ checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
+}
+
+// Tests that given a root hash, a trie can sync iteratively on a single thread,
+// requesting retrieval tasks and returning all of them in one go, however in a
+// random order.
+func TestIterativeRandomTrieSyncIndividual(t *testing.T) { testIterativeRandomTrieSync(t, 1) }
+func TestIterativeRandomTrieSyncBatched(t *testing.T) { testIterativeRandomTrieSync(t, 100) }
+
+func testIterativeRandomTrieSync(t *testing.T, batch int) {
+ // Create a random trie to copy
+ srcDb, srcTrie, srcData := makeTestTrie()
+
+ // Create a destination trie and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
+
+ queue := make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(batch) {
+ queue[hash] = struct{}{}
+ }
+ for len(queue) > 0 {
+ // Fetch all the queued nodes in a random order
+ results := make([]SyncResult, 0, len(queue))
+ for hash, _ := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results = append(results, SyncResult{hash, data})
+ }
+ // Feed the retrieved results back and queue new tasks
+ if index, err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process result #%d: %v", index, err)
+ }
+ queue = make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(batch) {
+ queue[hash] = struct{}{}
+ }
+ }
+ // Cross check that the two tries re in sync
+ checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
+}
+
+// Tests that the trie scheduler can correctly reconstruct the state even if only
+// partial results are returned (Even those randomly), others sent only later.
+func TestIterativeRandomDelayedTrieSync(t *testing.T) {
+ // Create a random trie to copy
+ srcDb, srcTrie, srcData := makeTestTrie()
+
+ // Create a destination trie and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
+
+ queue := make(map[common.Hash]struct{})
+ for _, hash := range sched.Missing(10000) {
+ queue[hash] = struct{}{}
+ }
+ for len(queue) > 0 {
+ // Sync only half of the scheduled nodes, even those in random order
+ results := make([]SyncResult, 0, len(queue)/2+1)
+ for hash, _ := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ results = append(results, SyncResult{hash, data})
+
+ if len(results) >= cap(results) {
+ break
+ }
+ }
+ // Feed the retrieved results back and queue new tasks
+ if index, err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process result #%d: %v", index, err)
+ }
+ for _, result := range results {
+ delete(queue, result.Hash)
+ }
+ for _, hash := range sched.Missing(10000) {
+ queue[hash] = struct{}{}
+ }
+ }
+ // Cross check that the two tries re in sync
+ checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
+}
+
+// Tests that a trie sync will not request nodes multiple times, even if they
+// have such references.
+func TestDuplicateAvoidanceTrieSync(t *testing.T) {
+ // Create a random trie to copy
+ srcDb, srcTrie, srcData := makeTestTrie()
+
+ // Create a destination trie and sync with the scheduler
+ dstDb, _ := ethdb.NewMemDatabase()
+ sched := NewTrieSync(common.BytesToHash(srcTrie.Root()), dstDb, nil)
+
+ queue := append([]common.Hash{}, sched.Missing(0)...)
+ requested := make(map[common.Hash]struct{})
+
+ for len(queue) > 0 {
+ results := make([]SyncResult, len(queue))
+ for i, hash := range queue {
+ data, err := srcDb.Get(hash.Bytes())
+ if err != nil {
+ t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
+ }
+ if _, ok := requested[hash]; ok {
+ t.Errorf("hash %x already requested once", hash)
+ }
+ requested[hash] = struct{}{}
+
+ results[i] = SyncResult{hash, data}
+ }
+ if index, err := sched.Process(results); err != nil {
+ t.Fatalf("failed to process result #%d: %v", index, err)
+ }
+ queue = append(queue[:0], sched.Missing(0)...)
+ }
+ // Cross check that the two tries re in sync
+ checkTrieContents(t, dstDb, srcTrie.Root(), srcData)
+}