aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-01 00:23:31 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-10-19 15:03:09 +0800
commit832b37c8221e330896c36eb419d92af6b1fdc9dd (patch)
treefdeb701ed7a17ef2db176b7cbf1b1159b5afb417
parent42c8afd44006b170c20159abaadc31cc7545bec2 (diff)
downloadgo-tangerine-832b37c8221e330896c36eb419d92af6b1fdc9dd.tar.gz
go-tangerine-832b37c8221e330896c36eb419d92af6b1fdc9dd.tar.zst
go-tangerine-832b37c8221e330896c36eb419d92af6b1fdc9dd.zip
core, eth: receipt chain reconstruction
-rw-r--r--core/bench_test.go2
-rw-r--r--core/block_processor_test.go16
-rw-r--r--core/blockchain.go210
-rw-r--r--core/blockchain_test.go162
-rw-r--r--core/chain_makers.go16
-rw-r--r--core/chain_makers_test.go2
-rw-r--r--core/chain_pow_test.go6
-rw-r--r--core/chain_util.go24
-rw-r--r--core/chain_util_test.go25
-rw-r--r--core/genesis.go2
-rw-r--r--core/transaction_util.go5
-rw-r--r--core/types/block.go5
-rw-r--r--core/types/receipt.go4
-rw-r--r--core/vm/log.go14
-rw-r--r--eth/downloader/downloader.go55
-rw-r--r--eth/downloader/downloader_test.go225
-rw-r--r--eth/fetcher/fetcher_test.go2
-rw-r--r--eth/handler.go60
-rw-r--r--eth/helper_test.go2
-rw-r--r--eth/protocol.go2
-rw-r--r--miner/worker.go2
-rw-r--r--rpc/api/eth_args.go2
22 files changed, 613 insertions, 230 deletions
diff --git a/core/bench_test.go b/core/bench_test.go
index 27f3e3158..b5eb51803 100644
--- a/core/bench_test.go
+++ b/core/bench_test.go
@@ -163,7 +163,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
// Generate a chain of b.N blocks using the supplied block
// generator function.
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds})
- chain := GenerateChain(genesis, db, b.N, gen)
+ chain, _ := GenerateChain(genesis, db, b.N, gen)
// Time the insertion of the new chain.
// State and blocks are stored in the same DB.
diff --git a/core/block_processor_test.go b/core/block_processor_test.go
index c2c85ebfa..3050456b4 100644
--- a/core/block_processor_test.go
+++ b/core/block_processor_test.go
@@ -71,14 +71,14 @@ func TestPutReceipt(t *testing.T) {
receipt := new(types.Receipt)
receipt.Logs = vm.Logs{&vm.Log{
- Address: addr,
- Topics: []common.Hash{hash},
- Data: []byte("hi"),
- Number: 42,
- TxHash: hash,
- TxIndex: 0,
- BlockHash: hash,
- Index: 0,
+ Address: addr,
+ Topics: []common.Hash{hash},
+ Data: []byte("hi"),
+ BlockNumber: 42,
+ TxHash: hash,
+ TxIndex: 0,
+ BlockHash: hash,
+ Index: 0,
}}
PutReceipts(db, types.Receipts{receipt})
diff --git a/core/blockchain.go b/core/blockchain.go
index 6b42ea97e..b68e7d3ae 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@@ -67,9 +68,10 @@ type BlockChain struct {
chainmu sync.RWMutex
tsmu sync.RWMutex
- checkpoint int // checkpoint counts towards the new checkpoint
- currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
- currentBlock *types.Block // Current head of the block chain
+ checkpoint int // checkpoint counts towards the new checkpoint
+ currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
+ currentBlock *types.Block // Current head of the block chain
+ currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
headerCache *lru.Cache // Cache for the most recent block headers
bodyCache *lru.Cache // Cache for the most recent block bodies
@@ -160,12 +162,21 @@ func (self *BlockChain) loadLastState() error {
self.currentHeader = header
}
}
+ // Restore the last known head fast block
+ self.currentFastBlock = self.currentBlock
+ if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
+ if block := self.GetBlock(head); block != nil {
+ self.currentFastBlock = block
+ }
+ }
// Issue a status log and return
headerTd := self.GetTd(self.currentHeader.Hash())
blockTd := self.GetTd(self.currentBlock.Hash())
+ fastTd := self.GetTd(self.currentFastBlock.Hash())
- glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash(), headerTd)
- glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash(), blockTd)
+ glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd)
+ glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
+ glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
return nil
}
@@ -178,23 +189,48 @@ func (bc *BlockChain) SetHead(head uint64) {
bc.mu.Lock()
defer bc.mu.Unlock()
- // Delete everything from the current header head (is above block head)
- for i := bc.currentHeader.Number.Uint64(); i > head; i-- {
- if hash := GetCanonicalHash(bc.chainDb, i); hash != (common.Hash{}) {
- DeleteCanonicalHash(bc.chainDb, i)
- DeleteHeader(bc.chainDb, hash)
- DeleteBody(bc.chainDb, hash)
- DeleteTd(bc.chainDb, hash)
+ // Figure out the highest known canonical assignment
+ height := uint64(0)
+ if bc.currentHeader != nil {
+ if hh := bc.currentHeader.Number.Uint64(); hh > height {
+ height = hh
}
}
- bc.currentHeader = GetHeader(bc.chainDb, GetCanonicalHash(bc.chainDb, head))
-
- // Rewind the block chain until a whole block is found
- for bc.GetBlockByNumber(head) == nil {
- head--
+ if bc.currentBlock != nil {
+ if bh := bc.currentBlock.NumberU64(); bh > height {
+ height = bh
+ }
}
- bc.currentBlock = bc.GetBlockByNumber(head)
+ if bc.currentFastBlock != nil {
+ if fbh := bc.currentFastBlock.NumberU64(); fbh > height {
+ height = fbh
+ }
+ }
+ // Gather all the hashes that need deletion
+ drop := make(map[common.Hash]struct{})
+ for bc.currentHeader != nil && bc.currentHeader.Number.Uint64() > head {
+ drop[bc.currentHeader.Hash()] = struct{}{}
+ bc.currentHeader = bc.GetHeader(bc.currentHeader.ParentHash)
+ }
+ for bc.currentBlock != nil && bc.currentBlock.NumberU64() > head {
+ drop[bc.currentBlock.Hash()] = struct{}{}
+ bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash())
+ }
+ for bc.currentFastBlock != nil && bc.currentFastBlock.NumberU64() > head {
+ drop[bc.currentFastBlock.Hash()] = struct{}{}
+ bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash())
+ }
+ // Roll back the canonical chain numbering
+ for i := height; i > head; i-- {
+ DeleteCanonicalHash(bc.chainDb, i)
+ }
+ // Delete everything found by the above rewind
+ for hash, _ := range drop {
+ DeleteHeader(bc.chainDb, hash)
+ DeleteBody(bc.chainDb, hash)
+ DeleteTd(bc.chainDb, hash)
+ }
// Clear out any stale content from the caches
bc.headerCache.Purge()
bc.bodyCache.Purge()
@@ -203,6 +239,9 @@ func (bc *BlockChain) SetHead(head uint64) {
bc.futureBlocks.Purge()
// Update all computed fields to the new head
+ if bc.currentBlock == nil {
+ bc.currentBlock = bc.genesisBlock
+ }
bc.insert(bc.currentBlock)
bc.loadLastState()
}
@@ -222,8 +261,7 @@ func (self *BlockChain) LastBlockHash() common.Hash {
}
// CurrentHeader retrieves the current head header of the canonical chain. The
-// header is retrieved from the chain manager's internal cache, involving no
-// database operations.
+// header is retrieved from the chain manager's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -232,8 +270,7 @@ func (self *BlockChain) CurrentHeader() *types.Header {
}
// CurrentBlock retrieves the current head block of the canonical chain. The
-// block is retrieved from the chain manager's internal cache, involving no
-// database operations.
+// block is retrieved from the chain manager's internal cache.
func (self *BlockChain) CurrentBlock() *types.Block {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -241,6 +278,15 @@ func (self *BlockChain) CurrentBlock() *types.Block {
return self.currentBlock
}
+// CurrentFastBlock retrieves the current fast-sync head block of the canonical
+// chain. The block is retrieved from the chain manager's internal cache.
+func (self *BlockChain) CurrentFastBlock() *types.Block {
+ self.mu.RLock()
+ defer self.mu.RUnlock()
+
+ return self.currentFastBlock
+}
+
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -264,22 +310,12 @@ func (bc *BlockChain) Reset() {
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
+ // Dump the entire block chain and purge the caches
+ bc.SetHead(0)
+
bc.mu.Lock()
defer bc.mu.Unlock()
- // Dump the entire block chain and purge the caches
- for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.ParentHash()) {
- DeleteBlock(bc.chainDb, block.Hash())
- }
- for header := bc.currentHeader; header != nil; header = bc.GetHeader(header.ParentHash) {
- DeleteBlock(bc.chainDb, header.Hash())
- }
- bc.headerCache.Purge()
- bc.bodyCache.Purge()
- bc.bodyRLPCache.Purge()
- bc.blockCache.Purge()
- bc.futureBlocks.Purge()
-
// Prepare the genesis block and reinitialize the chain
if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil {
glog.Fatalf("failed to write genesis block TD: %v", err)
@@ -291,6 +327,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
bc.insert(bc.genesisBlock)
bc.currentBlock = bc.genesisBlock
bc.currentHeader = bc.genesisBlock.Header()
+ bc.currentFastBlock = bc.genesisBlock
}
// Export writes the active chain to the given writer.
@@ -328,8 +365,8 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
-// header to this very same block to prevent the headers from diverging on a
-// different header chain.
+// header and the head fast sync block to this very same block to prevent them
+// from diverging on a different header chain.
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
@@ -343,9 +380,13 @@ func (bc *BlockChain) insert(block *types.Block) {
if err := WriteHeadHeaderHash(bc.chainDb, block.Hash()); err != nil {
glog.Fatalf("failed to insert head header hash: %v", err)
}
+ if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
+ glog.Fatalf("failed to insert head fast block hash: %v", err)
+ }
// Update the internal state with the head block
bc.currentBlock = block
bc.currentHeader = block.Header()
+ bc.currentFastBlock = block
}
// Accessors
@@ -634,7 +675,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i
for i, header := range chain {
// Short circuit insertion if shutting down
if atomic.LoadInt32(&self.procInterrupt) == 1 {
- glog.V(logger.Debug).Infoln("Premature abort during header chain processing")
+ glog.V(logger.Debug).Infoln("premature abort during header chain processing")
break
}
hash := header.Hash()
@@ -653,7 +694,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i
}
}
if BadHashes[hash] {
- glog.V(logger.Error).Infof("Bad header %d [%x…], known bad hash", header.Number, hash)
+ glog.V(logger.Error).Infof("bad header %d [%x…], known bad hash", header.Number, hash)
return i, BadHashError(hash)
}
// Write the header to the chain and get the status
@@ -674,6 +715,95 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i
return 0, nil
}
+// InsertReceiptChain attempts to complete an already existing header chain with
+// transaction and receipt data.
+func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
+ self.wg.Add(1)
+ defer self.wg.Done()
+
+ // Make sure only one thread manipulates the chain at once
+ self.chainmu.Lock()
+ defer self.chainmu.Unlock()
+
+ // Collect some import statistics to report on
+ stats := struct{ processed, ignored int }{}
+ start := time.Now()
+
+ // Iterate over the blocks and receipts, inserting any new ones
+ for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
+ block, receipts := blockChain[i], receiptChain[i]
+
+ // Short circuit insertion if shutting down
+ if atomic.LoadInt32(&self.procInterrupt) == 1 {
+ glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
+ break
+ }
+ // Short circuit if the owner header is unknown
+ if !self.HasHeader(block.Hash()) {
+ glog.V(logger.Debug).Infof("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
+ }
+ // Skip if the entire data is already known
+ if self.HasBlock(block.Hash()) {
+ stats.ignored++
+ continue
+ }
+ // Compute all the non-consensus fields of the receipts
+ transactions, logIndex := block.Transactions(), uint(0)
+ for j := 0; j < len(receipts); j++ {
+ // The transaction hash can be retrieved from the transaction itself
+ receipts[j].TxHash = transactions[j].Hash()
+
+ // The contract address can be derived from the transaction itself
+ if MessageCreatesContract(transactions[j]) {
+ from, _ := transactions[j].From()
+ receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
+ }
+ // The used gas can be calculated based on previous receipts
+ if j == 0 {
+ receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
+ } else {
+ receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
+ }
+ // The derived log fields can simply be set from the block and transaction
+ for k := 0; k < len(receipts[j].Logs); k++ {
+ receipts[j].Logs[k].BlockNumber = block.NumberU64()
+ receipts[j].Logs[k].BlockHash = block.Hash()
+ receipts[j].Logs[k].TxHash = receipts[j].TxHash
+ receipts[j].Logs[k].TxIndex = uint(j)
+ receipts[j].Logs[k].Index = logIndex
+ logIndex++
+ }
+ }
+ // Write all the data out into the database
+ if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil {
+ glog.Fatalf("failed to write block body: %v", err)
+ return i, err
+ }
+ if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ glog.Fatalf("failed to write block receipts: %v", err)
+ return i, err
+ }
+ // Update the head fast sync block if better
+ self.mu.Lock()
+ if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(block.Hash())) < 0 {
+ if err := WriteHeadFastBlockHash(self.chainDb, block.Hash()); err != nil {
+ glog.Fatalf("failed to update head fast block hash: %v", err)
+ }
+ self.currentFastBlock = block
+ }
+ self.mu.Unlock()
+
+ stats.processed++
+ }
+ // Report some public statistics so the user has a clue what's going on
+ first, last := blockChain[0], blockChain[len(blockChain)-1]
+ glog.V(logger.Info).Infof("imported %d receipt(s) (%d ignored) in %v. #%d [%x… / %x…]", stats.processed, stats.ignored,
+ time.Since(start), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4])
+
+ return 0, nil
+}
+
// WriteBlock writes the block to the chain.
func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err error) {
self.wg.Add(1)
@@ -799,7 +929,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
- if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
+ if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 4d0f13ef1..93c2128bc 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -430,9 +430,12 @@ func makeBlockChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.B
var chain []*types.Block
for i, difficulty := range d {
header := &types.Header{
- Coinbase: common.Address{seed},
- Number: big.NewInt(int64(i + 1)),
- Difficulty: big.NewInt(int64(difficulty)),
+ Coinbase: common.Address{seed},
+ Number: big.NewInt(int64(i + 1)),
+ Difficulty: big.NewInt(int64(difficulty)),
+ UncleHash: types.EmptyUncleHash,
+ TxHash: types.EmptyRootHash,
+ ReceiptHash: types.EmptyRootHash,
}
if i == 0 {
header.ParentHash = genesis.Hash()
@@ -668,6 +671,155 @@ func testInsertNonceError(t *testing.T, full bool) {
}
}
+// Tests that fast importing a block chain produces the same chain data as the
+// classical full block processing.
+func TestFastVsFullChains(t *testing.T) {
+ // Configure and generate a sample block chain
+ var (
+ gendb, _ = ethdb.NewMemDatabase()
+ key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ funds = big.NewInt(1000000000)
+ genesis = GenesisBlockForTesting(gendb, address, funds)
+ )
+ blocks, receipts := GenerateChain(genesis, gendb, 1024, func(i int, block *BlockGen) {
+ block.SetCoinbase(common.Address{0x00})
+
+ // If the block number is multiple of 3, send a few bonus transactions to the miner
+ if i%3 == 2 {
+ for j := 0; j < i%4+1; j++ {
+ tx, err := types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key)
+ if err != nil {
+ panic(err)
+ }
+ block.AddTx(tx)
+ }
+ }
+ // If the block number is a multiple of 5, add a few bonus uncles to the block
+ if i%5 == 5 {
+ block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
+ }
+ })
+ // Import the chain as an archive node for the comparison baseline
+ archiveDb, _ := ethdb.NewMemDatabase()
+ WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
+
+ archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
+ archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
+
+ if n, err := archive.InsertChain(blocks); err != nil {
+ t.Fatalf("failed to process block %d: %v", n, err)
+ }
+ // Fast import the chain as a non-archive node to test
+ fastDb, _ := ethdb.NewMemDatabase()
+ WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
+ fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
+
+ headers := make([]*types.Header, len(blocks))
+ for i, block := range blocks {
+ headers[i] = block.Header()
+ }
+ if n, err := fast.InsertHeaderChain(headers, true); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ // Iterate over all chain data components, and cross reference
+ for i := 0; i < len(blocks); i++ {
+ num, hash := blocks[i].NumberU64(), blocks[i].Hash()
+
+ if ftd, atd := fast.GetTd(hash), archive.GetTd(hash); ftd.Cmp(atd) != 0 {
+ t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd)
+ }
+ if fheader, aheader := fast.GetHeader(hash), archive.GetHeader(hash); fheader.Hash() != aheader.Hash() {
+ t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader)
+ }
+ if fblock, ablock := fast.GetBlock(hash), archive.GetBlock(hash); fblock.Hash() != ablock.Hash() {
+ t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock)
+ } else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) {
+ t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions())
+ } else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) {
+ t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles())
+ }
+ if freceipts, areceipts := GetBlockReceipts(fastDb, hash), GetBlockReceipts(archiveDb, hash); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
+ t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts)
+ }
+ }
+ // Check that the canonical chains are the same between the databases
+ for i := 0; i < len(blocks)+1; i++ {
+ if fhash, ahash := GetCanonicalHash(fastDb, uint64(i)), GetCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
+ t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash)
+ }
+ }
+}
+
+// Tests that various import methods move the chain head pointers to the correct
+// positions.
+func TestLightVsFastVsFullChainHeads(t *testing.T) {
+ // Configure and generate a sample block chain
+ var (
+ gendb, _ = ethdb.NewMemDatabase()
+ key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ address = crypto.PubkeyToAddress(key.PublicKey)
+ funds = big.NewInt(1000000000)
+ genesis = GenesisBlockForTesting(gendb, address, funds)
+ )
+ height := uint64(1024)
+ blocks, receipts := GenerateChain(genesis, gendb, int(height), nil)
+
+ // Create a small assertion method to check the three heads
+ assert := func(t *testing.T, kind string, chain *BlockChain, header uint64, fast uint64, block uint64) {
+ if num := chain.CurrentBlock().NumberU64(); num != block {
+ t.Errorf("%s head block mismatch: have #%v, want #%v", kind, num, block)
+ }
+ if num := chain.CurrentFastBlock().NumberU64(); num != fast {
+ t.Errorf("%s head fast-block mismatch: have #%v, want #%v", kind, num, fast)
+ }
+ if num := chain.CurrentHeader().Number.Uint64(); num != header {
+ t.Errorf("%s head header mismatch: have #%v, want #%v", kind, num, header)
+ }
+ }
+ // Import the chain as an archive node and ensure all pointers are updated
+ archiveDb, _ := ethdb.NewMemDatabase()
+ WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
+
+ archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
+ archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
+
+ if n, err := archive.InsertChain(blocks); err != nil {
+ t.Fatalf("failed to process block %d: %v", n, err)
+ }
+ assert(t, "archive", archive, height, height, height)
+
+ // Import the chain as a non-archive node and ensure all pointers are updated
+ fastDb, _ := ethdb.NewMemDatabase()
+ WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
+ fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
+
+ headers := make([]*types.Header, len(blocks))
+ for i, block := range blocks {
+ headers[i] = block.Header()
+ }
+ if n, err := fast.InsertHeaderChain(headers, true); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
+ t.Fatalf("failed to insert receipt %d: %v", n, err)
+ }
+ assert(t, "fast", fast, height, height, 0)
+
+ // Import the chain as a light node and ensure all pointers are updated
+ lightDb, _ := ethdb.NewMemDatabase()
+ WriteGenesisBlockForTesting(lightDb, GenesisAccount{address, funds})
+ light, _ := NewBlockChain(lightDb, FakePow{}, new(event.TypeMux))
+
+ if n, err := light.InsertHeaderChain(headers, true); err != nil {
+ t.Fatalf("failed to insert header %d: %v", n, err)
+ }
+ assert(t, "light", light, height, 0, 0)
+}
+
// Tests that chain reorganizations handle transaction removals and reinsertions.
func TestChainTxReorgs(t *testing.T) {
params.MinGasLimit = big.NewInt(125000) // Minimum the gas limit may ever be.
@@ -704,7 +856,7 @@ func TestChainTxReorgs(t *testing.T) {
// - futureAdd: transaction added after the reorg has already finished
var pastAdd, freshAdd, futureAdd *types.Transaction
- chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
+ chain, _ := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
switch i {
case 0:
pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
@@ -730,7 +882,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// overwrite the old chain
- chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
+ chain, _ = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
switch i {
case 0:
pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)
diff --git a/core/chain_makers.go b/core/chain_makers.go
index be6ba04e4..31b2627af 100644
--- a/core/chain_makers.go
+++ b/core/chain_makers.go
@@ -164,13 +164,13 @@ func (b *BlockGen) OffsetTime(seconds int64) {
// Blocks created by GenerateChain do not contain valid proof of work
// values. Inserting them into BlockChain requires use of FakePow or
// a similar non-validating proof of work implementation.
-func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int, *BlockGen)) []*types.Block {
+func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int, *BlockGen)) ([]*types.Block, []types.Receipts) {
statedb, err := state.New(parent.Root(), db)
if err != nil {
panic(err)
}
- blocks := make(types.Blocks, n)
- genblock := func(i int, h *types.Header) *types.Block {
+ blocks, receipts := make(types.Blocks, n), make([]types.Receipts, n)
+ genblock := func(i int, h *types.Header) (*types.Block, types.Receipts) {
b := &BlockGen{parent: parent, i: i, chain: blocks, header: h, statedb: statedb}
if gen != nil {
gen(i, b)
@@ -181,15 +181,16 @@ func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int,
panic(fmt.Sprintf("state write error: %v", err))
}
h.Root = root
- return types.NewBlock(h, b.txs, b.uncles, b.receipts)
+ return types.NewBlock(h, b.txs, b.uncles, b.receipts), b.receipts
}
for i := 0; i < n; i++ {
header := makeHeader(parent, statedb)
- block := genblock(i, header)
+ block, receipt := genblock(i, header)
blocks[i] = block
+ receipts[i] = receipt
parent = block
}
- return blocks
+ return blocks, receipts
}
func makeHeader(parent *types.Block, state *state.StateDB) *types.Header {
@@ -254,7 +255,8 @@ func makeHeaderChain(parent *types.Header, n int, db ethdb.Database, seed int) [
// makeBlockChain creates a deterministic chain of blocks rooted at parent.
func makeBlockChain(parent *types.Block, n int, db ethdb.Database, seed int) []*types.Block {
- return GenerateChain(parent, db, n, func(i int, b *BlockGen) {
+ blocks, _ := GenerateChain(parent, db, n, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{0: byte(seed), 19: byte(i)})
})
+ return blocks
}
diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go
index 63825c261..7f47cf288 100644
--- a/core/chain_makers_test.go
+++ b/core/chain_makers_test.go
@@ -47,7 +47,7 @@ func ExampleGenerateChain() {
// This call generates a chain of 5 blocks. The function runs for
// each block and adds different features to gen based on the
// block index.
- chain := GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
+ chain, _ := GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
switch i {
case 0:
// In block 1, addr1 sends addr2 some ether.
diff --git a/core/chain_pow_test.go b/core/chain_pow_test.go
index 5aa8ed8a0..d2b0bd144 100644
--- a/core/chain_pow_test.go
+++ b/core/chain_pow_test.go
@@ -60,7 +60,7 @@ func TestPowVerification(t *testing.T) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
- blocks = GenerateChain(genesis, testdb, 8, nil)
+ blocks, _ = GenerateChain(genesis, testdb, 8, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@@ -115,7 +115,7 @@ func testPowConcurrentVerification(t *testing.T, threads int) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
- blocks = GenerateChain(genesis, testdb, 8, nil)
+ blocks, _ = GenerateChain(genesis, testdb, 8, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@@ -186,7 +186,7 @@ func testPowConcurrentAbortion(t *testing.T, threads int) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
- blocks = GenerateChain(genesis, testdb, 1024, nil)
+ blocks, _ = GenerateChain(genesis, testdb, 1024, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
diff --git a/core/chain_util.go b/core/chain_util.go
index 42b6a5be2..907e6668c 100644
--- a/core/chain_util.go
+++ b/core/chain_util.go
@@ -34,6 +34,7 @@ import (
var (
headHeaderKey = []byte("LastHeader")
headBlockKey = []byte("LastBlock")
+ headFastKey = []byte("LastFast")
blockPrefix = []byte("block-")
blockNumPrefix = []byte("block-num-")
@@ -129,7 +130,7 @@ func GetCanonicalHash(db ethdb.Database, number uint64) common.Hash {
// header. The difference between this and GetHeadBlockHash is that whereas the
// last block hash is only updated upon a full block import, the last header
// hash is updated already at header import, allowing head tracking for the
-// fast synchronization mechanism.
+// light synchronization mechanism.
func GetHeadHeaderHash(db ethdb.Database) common.Hash {
data, _ := db.Get(headHeaderKey)
if len(data) == 0 {
@@ -147,6 +148,18 @@ func GetHeadBlockHash(db ethdb.Database) common.Hash {
return common.BytesToHash(data)
}
+// GetHeadFastBlockHash retrieves the hash of the current canonical head block during
+// fast synchronization. The difference between this and GetHeadBlockHash is that
+// whereas the last block hash is only updated upon a full block import, the last
+// fast hash is updated when importing pre-processed blocks.
+func GetHeadFastBlockHash(db ethdb.Database) common.Hash {
+ data, _ := db.Get(headFastKey)
+ if len(data) == 0 {
+ return common.Hash{}
+ }
+ return common.BytesToHash(data)
+}
+
// GetHeaderRLP retrieves a block header in its raw RLP database encoding, or nil
// if the header's not found.
func GetHeaderRLP(db ethdb.Database, hash common.Hash) rlp.RawValue {
@@ -249,6 +262,15 @@ func WriteHeadBlockHash(db ethdb.Database, hash common.Hash) error {
return nil
}
+// WriteHeadFastBlockHash stores the fast head block's hash.
+func WriteHeadFastBlockHash(db ethdb.Database, hash common.Hash) error {
+ if err := db.Put(headFastKey, hash.Bytes()); err != nil {
+ glog.Fatalf("failed to store last fast block's hash into database: %v", err)
+ return err
+ }
+ return nil
+}
+
// WriteHeader serializes a block header into the database.
func WriteHeader(db ethdb.Database, header *types.Header) error {
data, err := rlp.EncodeToBytes(header)
diff --git a/core/chain_util_test.go b/core/chain_util_test.go
index 62b73a064..bc5aa9776 100644
--- a/core/chain_util_test.go
+++ b/core/chain_util_test.go
@@ -163,7 +163,12 @@ func TestBlockStorage(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
// Create a test block to move around the database and make sure it's really new
- block := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block")})
+ block := types.NewBlockWithHeader(&types.Header{
+ Extra: []byte("test block"),
+ UncleHash: types.EmptyUncleHash,
+ TxHash: types.EmptyRootHash,
+ ReceiptHash: types.EmptyRootHash,
+ })
if entry := GetBlock(db, block.Hash()); entry != nil {
t.Fatalf("Non existent block returned: %v", entry)
}
@@ -208,8 +213,12 @@ func TestBlockStorage(t *testing.T) {
// Tests that partial block contents don't get reassembled into full blocks.
func TestPartialBlockStorage(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
- block := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block")})
-
+ block := types.NewBlockWithHeader(&types.Header{
+ Extra: []byte("test block"),
+ UncleHash: types.EmptyUncleHash,
+ TxHash: types.EmptyRootHash,
+ ReceiptHash: types.EmptyRootHash,
+ })
// Store a header and check that it's not recognized as a block
if err := WriteHeader(db, block.Header()); err != nil {
t.Fatalf("Failed to write header into database: %v", err)
@@ -298,6 +307,7 @@ func TestHeadStorage(t *testing.T) {
blockHead := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block header")})
blockFull := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block full")})
+ blockFast := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block fast")})
// Check that no head entries are in a pristine database
if entry := GetHeadHeaderHash(db); entry != (common.Hash{}) {
@@ -306,6 +316,9 @@ func TestHeadStorage(t *testing.T) {
if entry := GetHeadBlockHash(db); entry != (common.Hash{}) {
t.Fatalf("Non head block entry returned: %v", entry)
}
+ if entry := GetHeadFastBlockHash(db); entry != (common.Hash{}) {
+ t.Fatalf("Non fast head block entry returned: %v", entry)
+ }
// Assign separate entries for the head header and block
if err := WriteHeadHeaderHash(db, blockHead.Hash()); err != nil {
t.Fatalf("Failed to write head header hash: %v", err)
@@ -313,6 +326,9 @@ func TestHeadStorage(t *testing.T) {
if err := WriteHeadBlockHash(db, blockFull.Hash()); err != nil {
t.Fatalf("Failed to write head block hash: %v", err)
}
+ if err := WriteHeadFastBlockHash(db, blockFast.Hash()); err != nil {
+ t.Fatalf("Failed to write fast head block hash: %v", err)
+ }
// Check that both heads are present, and different (i.e. two heads maintained)
if entry := GetHeadHeaderHash(db); entry != blockHead.Hash() {
t.Fatalf("Head header hash mismatch: have %v, want %v", entry, blockHead.Hash())
@@ -320,6 +336,9 @@ func TestHeadStorage(t *testing.T) {
if entry := GetHeadBlockHash(db); entry != blockFull.Hash() {
t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash())
}
+ if entry := GetHeadFastBlockHash(db); entry != blockFast.Hash() {
+ t.Fatalf("Fast head block hash mismatch: have %v, want %v", entry, blockFast.Hash())
+ }
}
func TestMipmapBloom(t *testing.T) {
diff --git a/core/genesis.go b/core/genesis.go
index 16c1598c2..dac5de92f 100644
--- a/core/genesis.go
+++ b/core/genesis.go
@@ -103,7 +103,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block,
if err := WriteBlock(chainDb, block); err != nil {
return nil, err
}
- if err := PutBlockReceipts(chainDb, block, nil); err != nil {
+ if err := PutBlockReceipts(chainDb, block.Hash(), nil); err != nil {
return nil, err
}
if err := WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()); err != nil {
diff --git a/core/transaction_util.go b/core/transaction_util.go
index dbda4cfe7..1a3681341 100644
--- a/core/transaction_util.go
+++ b/core/transaction_util.go
@@ -155,7 +155,7 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
// PutBlockReceipts stores the block's transactions associated receipts
// and stores them by block hash in a single slice. This is required for
// forks and chain reorgs
-func PutBlockReceipts(db ethdb.Database, block *types.Block, receipts types.Receipts) error {
+func PutBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
rs := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
rs[i] = (*types.ReceiptForStorage)(receipt)
@@ -164,12 +164,9 @@ func PutBlockReceipts(db ethdb.Database, block *types.Block, receipts types.Rece
if err != nil {
return err
}
-
- hash := block.Hash()
err = db.Put(append(blockReceiptsPre, hash[:]...), bytes)
if err != nil {
return err
}
-
return nil
}
diff --git a/core/types/block.go b/core/types/block.go
index c4377ffa1..1d1cfa515 100644
--- a/core/types/block.go
+++ b/core/types/block.go
@@ -128,7 +128,6 @@ type Block struct {
header *Header
uncles []*Header
transactions Transactions
- receipts Receipts
// caches
hash atomic.Value
@@ -200,8 +199,6 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
} else {
b.header.ReceiptHash = DeriveSha(Receipts(receipts))
b.header.Bloom = CreateBloom(receipts)
- b.receipts = make([]*Receipt, len(receipts))
- copy(b.receipts, receipts)
}
if len(uncles) == 0 {
@@ -299,7 +296,6 @@ func (b *StorageBlock) DecodeRLP(s *rlp.Stream) error {
// TODO: copies
func (b *Block) Uncles() []*Header { return b.uncles }
func (b *Block) Transactions() Transactions { return b.transactions }
-func (b *Block) Receipts() Receipts { return b.receipts }
func (b *Block) Transaction(hash common.Hash) *Transaction {
for _, transaction := range b.transactions {
@@ -364,7 +360,6 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block {
return &Block{
header: &cpy,
transactions: b.transactions,
- receipts: b.receipts,
uncles: b.uncles,
}
}
diff --git a/core/types/receipt.go b/core/types/receipt.go
index d85fe16cf..aea5b3e91 100644
--- a/core/types/receipt.go
+++ b/core/types/receipt.go
@@ -41,8 +41,8 @@ type Receipt struct {
}
// NewReceipt creates a barebone transaction receipt, copying the init fields.
-func NewReceipt(root []byte, cumalativeGasUsed *big.Int) *Receipt {
- return &Receipt{PostState: common.CopyBytes(root), CumulativeGasUsed: new(big.Int).Set(cumalativeGasUsed)}
+func NewReceipt(root []byte, cumulativeGasUsed *big.Int) *Receipt {
+ return &Receipt{PostState: common.CopyBytes(root), CumulativeGasUsed: new(big.Int).Set(cumulativeGasUsed)}
}
// EncodeRLP implements rlp.Encoder, and flattens the consensus fields of a receipt
diff --git a/core/vm/log.go b/core/vm/log.go
index 822476f85..526221e43 100644
--- a/core/vm/log.go
+++ b/core/vm/log.go
@@ -25,19 +25,21 @@ import (
)
type Log struct {
+ // Consensus fields
Address common.Address
Topics []common.Hash
Data []byte
- Number uint64
- TxHash common.Hash
- TxIndex uint
- BlockHash common.Hash
- Index uint
+ // Derived fields (don't reorder!)
+ BlockNumber uint64
+ TxHash common.Hash
+ TxIndex uint
+ BlockHash common.Hash
+ Index uint
}
func NewLog(address common.Address, topics []common.Hash, data []byte, number uint64) *Log {
- return &Log{Address: address, Topics: topics, Data: data, Number: number}
+ return &Log{Address: address, Topics: topics, Data: data, BlockNumber: number}
}
func (l *Log) EncodeRLP(w io.Writer) error {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 7ae7aa221..24ba3da17 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -102,6 +102,9 @@ type headHeaderRetrievalFn func() *types.Header
// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
type headBlockRetrievalFn func() *types.Block
+// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
+type headFastBlockRetrievalFn func() *types.Block
+
// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
type tdRetrievalFn func(common.Hash) *big.Int
@@ -188,17 +191,18 @@ type Downloader struct {
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlock blockCheckFn // Checks if a block is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -229,8 +233,8 @@ type Downloader struct {
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
- headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
- insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+ headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn,
+ insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
return &Downloader{
mode: mode,
@@ -243,6 +247,7 @@ func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock bl
getBlock: getBlock,
headHeader: headHeader,
headBlock: headBlock,
+ headFastBlock: headFastBlock,
getTd: getTd,
insertHeaders: insertHeaders,
insertBlocks: insertBlocks,
@@ -393,7 +398,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
}()
glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
- defer glog.V(logger.Debug).Infof("Synchronisation terminated")
+ defer func(start time.Time) {
+ glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
+ }(time.Now())
switch {
case p.version == 61:
@@ -989,6 +996,8 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
head := d.headHeader().Number.Uint64()
if d.mode == FullSync {
head = d.headBlock().NumberU64()
+ } else if d.mode == FastSync {
+ head = d.headFastBlock().NumberU64()
}
from := int64(head) - int64(MaxHeaderFetch) + 1
if from < 0 {
@@ -1020,7 +1029,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
// Check if a common ancestor was found
finished = true
for i := len(headers) - 1; i >= 0; i-- {
- if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode != LightSync && d.hasBlock(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break
}
@@ -1182,17 +1191,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// Otherwise insert all the new headers, aborting in case of junk
glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
+ if d.mode == FastSync || d.mode == LightSync {
+ if n, err := d.insertHeaders(headerPack.headers, false); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ return errInvalidChain
+ }
+ }
if d.mode == FullSync || d.mode == FastSync {
inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
if len(inserts) != len(headerPack.headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
}
- } else {
- if n, err := d.insertHeaders(headerPack.headers, true); err != nil {
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
- return errInvalidChain
- }
}
// Notify the content fetchers of new headers, but stop if queue is full
cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
@@ -1394,6 +1404,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
for _, pid := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
+ setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
}
}
@@ -1497,7 +1508,7 @@ func (d *Downloader) process() {
// Actually import the blocks
if glog.V(logger.Debug) {
first, last := results[0].Header, results[len(results)-1].Header
- glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
+ glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
}
for len(results) != 0 {
// Check for any termination requests
@@ -1536,7 +1547,7 @@ func (d *Downloader) process() {
index, err = d.insertHeaders(headers, true)
}
if err != nil {
- glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err)
+ glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
d.cancel()
return
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 18bdb56dd..68c4ca26e 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -45,9 +45,9 @@ var (
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
-func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block) {
+func makeChain(n int, seed byte, parent *types.Block, parentReceipts types.Receipts) ([]common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]types.Receipts) {
// Generate the block chain
- blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
+ blocks, receipts := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// If the block number is multiple of 3, send a bonus transaction to the miner
@@ -73,25 +73,29 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common
blockm := make(map[common.Hash]*types.Block, n+1)
blockm[parent.Hash()] = parent
+ receiptm := make(map[common.Hash]types.Receipts, n+1)
+ receiptm[parent.Hash()] = parentReceipts
+
for i, b := range blocks {
hashes[len(hashes)-i-2] = b.Hash()
headerm[b.Hash()] = b.Header()
blockm[b.Hash()] = b
+ receiptm[b.Hash()] = receipts[i]
}
- return hashes, headerm, blockm
+ return hashes, headerm, blockm, receiptm
}
// makeChainFork creates two chains of length n, such that h1[:f] and
// h2[:f] are different but have a common suffix of length n-f.
-func makeChainFork(n, f int, parent *types.Block) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block) {
+func makeChainFork(n, f int, parent *types.Block, parentReceipts types.Receipts) ([]common.Hash, []common.Hash, map[common.Hash]*types.Header, map[common.Hash]*types.Header, map[common.Hash]*types.Block, map[common.Hash]*types.Block, map[common.Hash]types.Receipts, map[common.Hash]types.Receipts) {
// Create the common suffix
- hashes, headers, blocks := makeChain(n-f, 0, parent)
+ hashes, headers, blocks, receipts := makeChain(n-f, 0, parent, parentReceipts)
// Create the forks
- hashes1, headers1, blocks1 := makeChain(f, 1, blocks[hashes[0]])
+ hashes1, headers1, blocks1, receipts1 := makeChain(f, 1, blocks[hashes[0]], receipts[hashes[0]])
hashes1 = append(hashes1, hashes[1:]...)
- hashes2, headers2, blocks2 := makeChain(f, 2, blocks[hashes[0]])
+ hashes2, headers2, blocks2, receipts2 := makeChain(f, 2, blocks[hashes[0]], receipts[hashes[0]])
hashes2 = append(hashes2, hashes[1:]...)
for hash, header := range headers {
@@ -102,22 +106,28 @@ func makeChainFork(n, f int, parent *types.Block) ([]common.Hash, []common.Hash,
blocks1[hash] = block
blocks2[hash] = block
}
- return hashes1, hashes2, headers1, headers2, blocks1, blocks2
+ for hash, receipt := range receipts {
+ receipts1[hash] = receipt
+ receipts2[hash] = receipt
+ }
+ return hashes1, hashes2, headers1, headers2, blocks1, blocks2, receipts1, receipts2
}
// downloadTester is a test simulator for mocking out local block chain.
type downloadTester struct {
downloader *Downloader
- ownHashes []common.Hash // Hash chain belonging to the tester
- ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
- ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
- ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
- ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
- peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
- peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
- peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
- peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
+ ownHashes []common.Hash // Hash chain belonging to the tester
+ ownHeaders map[common.Hash]*types.Header // Headers belonging to the tester
+ ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester
+ ownReceipts map[common.Hash]types.Receipts // Receipts belonging to the tester
+ ownChainTd map[common.Hash]*big.Int // Total difficulties of the blocks in the local chain
+
+ peerHashes map[string][]common.Hash // Hash chain belonging to different test peers
+ peerHeaders map[string]map[common.Hash]*types.Header // Headers belonging to different test peers
+ peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
+ peerReceipts map[string]map[common.Hash]types.Receipts // Receipts belonging to different test peers
+ peerChainTds map[string]map[common.Hash]*big.Int // Total difficulties of the blocks in the peer chains
lock sync.RWMutex
}
@@ -128,15 +138,16 @@ func newTester(mode SyncMode) *downloadTester {
ownHashes: []common.Hash{genesis.Hash()},
ownHeaders: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
ownBlocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
- ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): genesis.Receipts()},
+ ownReceipts: map[common.Hash]types.Receipts{genesis.Hash(): nil},
ownChainTd: map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
peerHashes: make(map[string][]common.Hash),
peerHeaders: make(map[string]map[common.Hash]*types.Header),
peerBlocks: make(map[string]map[common.Hash]*types.Block),
+ peerReceipts: make(map[string]map[common.Hash]types.Receipts),
peerChainTds: make(map[string]map[common.Hash]*big.Int),
}
tester.downloader = New(mode, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock,
- tester.headHeader, tester.headBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertConfirmedBlocks, tester.dropPeer)
+ tester.headHeader, tester.headBlock, tester.headFastBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.dropPeer)
return tester
}
@@ -197,7 +208,12 @@ func (dl *downloadTester) headHeader() *types.Header {
dl.lock.RLock()
defer dl.lock.RUnlock()
- return dl.getHeader(dl.ownHashes[len(dl.ownHashes)-1])
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if header := dl.getHeader(dl.ownHashes[i]); header != nil {
+ return header
+ }
+ }
+ return nil
}
// headBlock retrieves the current head block from the canonical chain.
@@ -213,6 +229,21 @@ func (dl *downloadTester) headBlock() *types.Block {
return nil
}
+// headFastBlock retrieves the current head fast-sync block from the canonical chain.
+func (dl *downloadTester) headFastBlock() *types.Block {
+ dl.lock.RLock()
+ defer dl.lock.RUnlock()
+
+ for i := len(dl.ownHashes) - 1; i >= 0; i-- {
+ if block := dl.getBlock(dl.ownHashes[i]); block != nil {
+ if _, ok := dl.ownReceipts[block.Hash()]; ok {
+ return block
+ }
+ }
+ }
+ return nil
+}
+
// getTd retrieves the block's total difficulty from the canonical chain.
func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
dl.lock.RLock()
@@ -227,6 +258,9 @@ func (dl *downloadTester) insertHeaders(headers []*types.Header, verify bool) (i
defer dl.lock.Unlock()
for i, header := range headers {
+ if _, ok := dl.ownHeaders[header.Hash()]; ok {
+ continue
+ }
if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
}
@@ -254,33 +288,33 @@ func (dl *downloadTester) insertBlocks(blocks types.Blocks) (int, error) {
return len(blocks), nil
}
-// insertBlocks injects a new batch of blocks into the simulated chain.
-func (dl *downloadTester) insertConfirmedBlocks(blocks types.Blocks, receipts []types.Receipts) (int, error) {
+// insertReceipts injects a new batch of blocks into the simulated chain.
+func (dl *downloadTester) insertReceipts(blocks types.Blocks, receipts []types.Receipts) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
for i := 0; i < len(blocks) && i < len(receipts); i++ {
+ if _, ok := dl.ownHeaders[blocks[i].Hash()]; !ok {
+ return i, errors.New("unknown owner")
+ }
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
}
- dl.ownHashes = append(dl.ownHashes, blocks[i].Hash())
- dl.ownHeaders[blocks[i].Hash()] = blocks[i].Header()
dl.ownBlocks[blocks[i].Hash()] = blocks[i]
- dl.ownReceipts[blocks[i].Hash()] = blocks[i].Receipts()
- dl.ownChainTd[blocks[i].Hash()] = dl.ownChainTd[blocks[i].ParentHash()]
+ dl.ownReceipts[blocks[i].Hash()] = receipts[i]
}
return len(blocks), nil
}
// newPeer registers a new block download source into the downloader.
-func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block) error {
- return dl.newSlowPeer(id, version, hashes, headers, blocks, 0)
+func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, receipts map[common.Hash]types.Receipts) error {
+ return dl.newSlowPeer(id, version, hashes, headers, blocks, receipts, 0)
}
// newSlowPeer registers a new block download source into the downloader, with a
// specific delay time on processing the network packets sent to it, simulating
// potentially slow network IO.
-func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, delay time.Duration) error {
+func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, headers map[common.Hash]*types.Header, blocks map[common.Hash]*types.Block, receipts map[common.Hash]types.Receipts, delay time.Duration) error {
dl.lock.Lock()
defer dl.lock.Unlock()
@@ -302,6 +336,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.peerHeaders[id] = make(map[common.Hash]*types.Header)
dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
+ dl.peerReceipts[id] = make(map[common.Hash]types.Receipts)
dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
for _, hash := range hashes {
@@ -317,6 +352,9 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][block.ParentHash()])
}
}
+ if receipt, ok := receipts[hash]; ok {
+ dl.peerReceipts[id][hash] = receipt
+ }
}
}
return err
@@ -501,15 +539,15 @@ func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func
dl.lock.RLock()
defer dl.lock.RUnlock()
- blocks := dl.peerBlocks[id]
+ receipts := dl.peerReceipts[id]
- receipts := make([][]*types.Receipt, 0, len(hashes))
+ results := make([][]*types.Receipt, 0, len(hashes))
for _, hash := range hashes {
- if block, ok := blocks[hash]; ok {
- receipts = append(receipts, block.Receipts())
+ if receipt, ok := receipts[hash]; ok {
+ results = append(results, receipt)
}
}
- go dl.downloader.DeliverReceipts(id, receipts)
+ go dl.downloader.DeliverReceipts(id, results)
return nil
}
@@ -551,10 +589,10 @@ func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronis
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
- tester.newPeer("peer", protocol, hashes, headers, blocks)
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Synchronise with the peer and make sure all relevant data was retrieved
if err := tester.sync("peer", nil); err != nil {
@@ -575,10 +613,10 @@ func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
- tester.newPeer("peer", protocol, hashes, headers, blocks)
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
@@ -650,11 +688,11 @@ func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
tester := newTester(mode)
- tester.newPeer("fork A", protocol, hashesA, headersA, blocksA)
- tester.newPeer("fork B", protocol, hashesB, headersB, blocksB)
+ tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
+ tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("fork A", nil); err != nil {
@@ -731,10 +769,10 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
if targetBlocks >= MaxHeaderFetch {
targetBlocks = MaxHeaderFetch - 15
}
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
- tester.newPeer("peer", protocol, hashes, headers, blocks)
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Make sure canceling works with a pristine downloader
tester.downloader.cancel()
@@ -764,12 +802,12 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
for i := 0; i < targetPeers; i++ {
id := fmt.Sprintf("peer #%d", i)
- tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks)
+ tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts)
}
// Synchronise with the middle peer and make sure half of the blocks were retrieved
id := fmt.Sprintf("peer #%d", targetPeers/2)
@@ -798,22 +836,21 @@ func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t,
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
// Create peers of every type
tester := newTester(mode)
- tester.newPeer("peer 61", 61, hashes, headers, blocks)
- tester.newPeer("peer 62", 62, hashes, headers, blocks)
- tester.newPeer("peer 63", 63, hashes, headers, blocks)
- tester.newPeer("peer 64", 64, hashes, headers, blocks)
+ tester.newPeer("peer 61", 61, hashes, headers, blocks, receipts)
+ tester.newPeer("peer 62", 62, hashes, headers, blocks, receipts)
+ tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
+ tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts)
// Synchronise with the requestd peer and make sure all blocks were retrieved
if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
- t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
- }
+ assertOwnChain(t, tester, targetBlocks+1)
+
// Check that no peers have been dropped off
for _, version := range []int{61, 62, 63, 64} {
peer := fmt.Sprintf("peer %d", version)
@@ -835,18 +872,18 @@ func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, L
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
- tester.newPeer("peer", protocol, hashes, headers, blocks)
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Instrument the downloader to signal body requests
- bodies, receipts := int32(0), int32(0)
+ bodiesHave, receiptsHave := int32(0), int32(0)
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
- atomic.AddInt32(&bodies, int32(len(headers)))
+ atomic.AddInt32(&bodiesHave, int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
- atomic.AddInt32(&receipts, int32(len(headers)))
+ atomic.AddInt32(&receiptsHave, int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer", nil); err != nil {
@@ -860,15 +897,17 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
if mode != LightSync && block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
bodiesNeeded++
}
- if mode == FastSync && block != genesis && len(block.Receipts()) > 0 {
+ }
+ for _, receipt := range receipts {
+ if mode == FastSync && len(receipt) > 0 {
receiptsNeeded++
}
}
- if int(bodies) != bodiesNeeded {
- t.Errorf("body retrieval count mismatch: have %v, want %v", bodies, bodiesNeeded)
+ if int(bodiesHave) != bodiesNeeded {
+ t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
}
- if int(receipts) != receiptsNeeded {
- t.Errorf("receipt retrieval count mismatch: have %v, want %v", receipts, receiptsNeeded)
+ if int(receiptsHave) != receiptsNeeded {
+ t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
}
}
@@ -884,21 +923,20 @@ func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 6
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
// Attempt a full sync with an attacker feeding gapped headers
- tester.newPeer("attack", protocol, hashes, headers, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
missing := targetBlocks / 2
delete(tester.peerHeaders["attack"], hashes[missing])
- delete(tester.peerBlocks["attack"], hashes[missing])
if err := tester.sync("attack", nil); err == nil {
t.Fatalf("succeeded attacker synchronisation")
}
// Synchronise with the valid peer and make sure sync succeeds
- tester.newPeer("valid", protocol, hashes, headers, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks, receipts)
if err := tester.sync("valid", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -917,20 +955,21 @@ func TestShiftedHeaderAttack64Light(t *testing.T) { testShiftedHeaderAttack(t, 6
func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
tester := newTester(mode)
// Attempt a full sync with an attacker feeding shifted headers
- tester.newPeer("attack", protocol, hashes, headers, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
delete(tester.peerHeaders["attack"], hashes[len(hashes)-2])
delete(tester.peerBlocks["attack"], hashes[len(hashes)-2])
+ delete(tester.peerReceipts["attack"], hashes[len(hashes)-2])
if err := tester.sync("attack", nil); err == nil {
t.Fatalf("succeeded attacker synchronisation")
}
// Synchronise with the valid peer and make sure sync succeeds
- tester.newPeer("valid", protocol, hashes, headers, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks, receipts)
if err := tester.sync("valid", nil); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
@@ -949,24 +988,24 @@ func TestInvalidContentAttack64Light(t *testing.T) { testInvalidContentAttack(t,
func testInvalidContentAttack(t *testing.T, protocol int, mode SyncMode) {
// Create two peers, one feeding invalid block bodies
targetBlocks := 4*blockCacheLimit - 15
- hashes, headers, validBlocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, validBlocks, validReceipts := makeChain(targetBlocks, 0, genesis, nil)
invalidBlocks := make(map[common.Hash]*types.Block)
for hash, block := range validBlocks {
invalidBlocks[hash] = types.NewBlockWithHeader(block.Header())
}
- invalidReceipts := make(map[common.Hash]*types.Block)
- for hash, block := range validBlocks {
- invalidReceipts[hash] = types.NewBlockWithHeader(block.Header()).WithBody(block.Transactions(), block.Uncles())
+ invalidReceipts := make(map[common.Hash]types.Receipts)
+ for hash, _ := range validReceipts {
+ invalidReceipts[hash] = types.Receipts{&types.Receipt{}}
}
tester := newTester(mode)
- tester.newPeer("valid", protocol, hashes, headers, validBlocks)
+ tester.newPeer("valid", protocol, hashes, headers, validBlocks, validReceipts)
if mode != LightSync {
- tester.newPeer("body attack", protocol, hashes, headers, invalidBlocks)
+ tester.newPeer("body attack", protocol, hashes, headers, invalidBlocks, validReceipts)
}
if mode == FastSync {
- tester.newPeer("receipt attack", protocol, hashes, headers, invalidReceipts)
+ tester.newPeer("receipt attack", protocol, hashes, headers, validBlocks, invalidReceipts)
}
// Synchronise with the valid peer (will pull contents from the attacker too)
if err := tester.sync("valid", nil); err != nil {
@@ -995,9 +1034,9 @@ func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttac
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester(mode)
- hashes, headers, blocks := makeChain(0, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
- tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks)
+ tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts)
if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer {
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
@@ -1040,7 +1079,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
for i, tt := range tests {
// Register a new peer and ensure it's presence
id := fmt.Sprintf("test %d", i)
- if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil); err != nil {
+ if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err)
}
if _, ok := tester.peerHashes[id]; !ok {
@@ -1069,7 +1108,7 @@ func TestSyncBoundaries64Light(t *testing.T) { testSyncBoundaries(t, 64, LightSy
func testSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
@@ -1085,7 +1124,7 @@ func testSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Synchronise half the blocks and check initial boundaries
- tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], headers, blocks)
+ tester.newPeer("peer-half", protocol, hashes[targetBlocks/2:], headers, blocks, receipts)
pending := new(sync.WaitGroup)
pending.Add(1)
@@ -1103,7 +1142,7 @@ func testSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
pending.Wait()
// Synchronise all the blocks and check continuation boundaries
- tester.newPeer("peer-full", protocol, hashes, headers, blocks)
+ tester.newPeer("peer-full", protocol, hashes, headers, blocks, receipts)
pending.Add(1)
go func() {
@@ -1134,7 +1173,7 @@ func TestForkedSyncBoundaries64Light(t *testing.T) { testForkedSyncBoundaries(t,
func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
- hashesA, hashesB, headersA, headersB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)
+ hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
@@ -1150,7 +1189,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Synchronise with one of the forks and check boundaries
- tester.newPeer("fork A", protocol, hashesA, headersA, blocksA)
+ tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA)
pending := new(sync.WaitGroup)
pending.Add(1)
@@ -1171,7 +1210,7 @@ func testForkedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
tester.downloader.syncStatsOrigin = tester.downloader.syncStatsHeight
// Synchronise with the second fork and check boundary resets
- tester.newPeer("fork B", protocol, hashesB, headersB, blocksB)
+ tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB)
pending.Add(1)
go func() {
@@ -1202,7 +1241,7 @@ func TestFailedSyncBoundaries64Light(t *testing.T) { testFailedSyncBoundaries(t,
func testFailedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
@@ -1218,10 +1257,11 @@ func testFailedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Attempt a full sync with a faulty peer
- tester.newPeer("faulty", protocol, hashes, headers, blocks)
+ tester.newPeer("faulty", protocol, hashes, headers, blocks, receipts)
missing := targetBlocks / 2
delete(tester.peerHeaders["faulty"], hashes[missing])
delete(tester.peerBlocks["faulty"], hashes[missing])
+ delete(tester.peerReceipts["faulty"], hashes[missing])
pending := new(sync.WaitGroup)
pending.Add(1)
@@ -1240,7 +1280,7 @@ func testFailedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
pending.Wait()
// Synchronise with a good peer and check that the boundary origin remind the same after a failure
- tester.newPeer("valid", protocol, hashes, headers, blocks)
+ tester.newPeer("valid", protocol, hashes, headers, blocks, receipts)
pending.Add(1)
go func() {
@@ -1270,7 +1310,7 @@ func TestFakedSyncBoundaries64Light(t *testing.T) { testFakedSyncBoundaries(t, 6
func testFakedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
// Create a small block chain
targetBlocks := blockCacheLimit - 15
- hashes, headers, blocks := makeChain(targetBlocks+3, 0, genesis)
+ hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
// Set a sync init hook to catch boundary changes
starting := make(chan struct{})
@@ -1286,10 +1326,11 @@ func testFakedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Pristine boundary mismatch: have %v/%v, want %v/%v", origin, latest, 0, 0)
}
// Create and sync with an attacker that promises a higher chain than available
- tester.newPeer("attack", protocol, hashes, headers, blocks)
+ tester.newPeer("attack", protocol, hashes, headers, blocks, receipts)
for i := 1; i < 3; i++ {
delete(tester.peerHeaders["attack"], hashes[i])
delete(tester.peerBlocks["attack"], hashes[i])
+ delete(tester.peerReceipts["attack"], hashes[i])
}
pending := new(sync.WaitGroup)
@@ -1309,7 +1350,7 @@ func testFakedSyncBoundaries(t *testing.T, protocol int, mode SyncMode) {
pending.Wait()
// Synchronise with a good peer and check that the boundary height has been reduced to the true value
- tester.newPeer("valid", protocol, hashes[3:], headers, blocks)
+ tester.newPeer("valid", protocol, hashes[3:], headers, blocks, receipts)
pending.Add(1)
go func() {
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go
index 707d8d758..170a80aba 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/fetcher_test.go
@@ -45,7 +45,7 @@ var (
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
- blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
+ blocks, _ := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// If the block number is multiple of 3, send a bonus transaction to the miner
diff --git a/eth/handler.go b/eth/handler.go
index daa285730..1117cb1b7 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -129,8 +129,9 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP
case LightMode:
syncMode = downloader.LightSync
}
- manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, blockchain.GetBlock,
- blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, nil, manager.removePeer)
+ manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader,
+ blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.GetTd,
+ blockchain.InsertHeaderChain, blockchain.InsertChain, blockchain.InsertReceiptChain, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
@@ -438,28 +439,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
- case p.version >= eth62 && msg.Code == BlockBodiesMsg:
- // A batch of block bodies arrived to one of our previous requests
- var request blockBodiesData
- if err := msg.Decode(&request); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- // Deliver them all to the downloader for queuing
- trasactions := make([][]*types.Transaction, len(request))
- uncles := make([][]*types.Header, len(request))
-
- for i, body := range request {
- trasactions[i] = body.Transactions
- uncles[i] = body.Uncles
- }
- // Filter out any explicitly requested bodies, deliver the rest to the downloader
- if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
- err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
- if err != nil {
- glog.V(logger.Debug).Infoln(err)
- }
- }
-
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -487,6 +466,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendBlockBodiesRLP(bodies)
+ case p.version >= eth62 && msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ trasactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ trasactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ if trasactions, uncles := pm.fetcher.FilterBodies(trasactions, uncles, time.Now()); len(trasactions) > 0 || len(uncles) > 0 {
+ err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+ }
+
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
@@ -550,6 +551,17 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
return p.SendReceiptsRLP(receipts)
+ case p.version >= eth63 && msg.Code == ReceiptsMsg:
+ // A batch of receipts arrived to one of our previous requests
+ var receipts [][]*types.Receipt
+ if err := msg.Decode(&receipts); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
+ glog.V(logger.Debug).Infof("failed to deliver receipts: %v", err)
+ }
+
case msg.Code == NewBlockHashesMsg:
// Retrieve and deseralize the remote new block hashes notification
type announce struct {
diff --git a/eth/helper_test.go b/eth/helper_test.go
index bd65b49f8..ede0e3f15 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -38,7 +38,7 @@ func newTestProtocolManager(mode Mode, blocks int, generator func(int, *core.Blo
blockproc = core.NewBlockProcessor(db, pow, blockchain, evmux)
)
blockchain.SetProcessor(blockproc)
- chain := core.GenerateChain(genesis, db, blocks, generator)
+ chain, _ := core.GenerateChain(genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
diff --git a/eth/protocol.go b/eth/protocol.go
index 0d2b5128d..f2b98a8b1 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -55,7 +55,7 @@ var minimumProtocolVersion = map[Mode]uint{
var ProtocolVersions = []uint{eth64, eth63, eth62, eth61}
// Number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{15, 12, 8, 9}
+var ProtocolLengths = []uint64{19, 17, 8, 9}
const (
NetworkId = 1
diff --git a/miner/worker.go b/miner/worker.go
index 5bce32f21..efe99a2f9 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -313,7 +313,7 @@ func (self *worker) wait() {
self.mux.Post(core.ChainHeadEvent{block})
self.mux.Post(logs)
}
- if err := core.PutBlockReceipts(self.chainDb, block, receipts); err != nil {
+ if err := core.PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
}(block, work.state.Logs(), work.receipts)
diff --git a/rpc/api/eth_args.go b/rpc/api/eth_args.go
index 66c190a51..6aca6a663 100644
--- a/rpc/api/eth_args.go
+++ b/rpc/api/eth_args.go
@@ -838,7 +838,7 @@ func NewLogRes(log *vm.Log) LogRes {
}
l.Address = newHexData(log.Address)
l.Data = newHexData(log.Data)
- l.BlockNumber = newHexNum(log.Number)
+ l.BlockNumber = newHexNum(log.BlockNumber)
l.LogIndex = newHexNum(log.Index)
l.TransactionHash = newHexData(log.TxHash)
l.TransactionIndex = newHexNum(log.TxIndex)