aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/block_validator_test.go2
-rw-r--r--core/blockchain.go12
-rw-r--r--core/blockchain_test.go8
-rw-r--r--core/database_util.go (renamed from core/chain_util.go)178
-rw-r--r--core/database_util_test.go (renamed from core/chain_util_test.go)162
-rw-r--r--core/genesis.go2
-rw-r--r--core/transaction_util.go171
-rw-r--r--eth/backend_test.go4
-rw-r--r--eth/downloader/downloader.go310
-rw-r--r--eth/downloader/downloader_test.go135
-rw-r--r--eth/downloader/queue.go198
-rw-r--r--eth/filters/filter_test.go8
-rw-r--r--eth/sync.go4
-rw-r--r--miner/worker.go6
-rw-r--r--xeth/xeth.go41
15 files changed, 695 insertions, 546 deletions
diff --git a/core/block_validator_test.go b/core/block_validator_test.go
index a0694f067..70953d76d 100644
--- a/core/block_validator_test.go
+++ b/core/block_validator_test.go
@@ -81,7 +81,7 @@ func TestPutReceipt(t *testing.T) {
Index: 0,
}}
- PutReceipts(db, types.Receipts{receipt})
+ WriteReceipts(db, types.Receipts{receipt})
receipt = GetReceipt(db, common.Hash{})
if receipt == nil {
t.Error("expected to get 1 receipt, got none.")
diff --git a/core/blockchain.go b/core/blockchain.go
index b6b00ca04..5e1fc9424 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -972,7 +972,7 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
glog.Fatal(errs[index])
return
}
- if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ if err := WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
atomic.AddInt32(&failed, 1)
glog.Fatal(errs[index])
@@ -1182,7 +1182,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// coalesce logs for later processing
coalescedLogs = append(coalescedLogs, logs...)
- if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ if err := WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
return i, err
}
@@ -1201,11 +1201,11 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
events = append(events, ChainEvent{block, block.Hash(), logs})
// This puts transactions in a extra db for rpc
- if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
+ if err := WriteTransactions(self.chainDb, block); err != nil {
return i, err
}
// store the receipts
- if err := PutReceipts(self.chainDb, receipts); err != nil {
+ if err := WriteReceipts(self.chainDb, receipts); err != nil {
return i, err
}
// Write map map bloom filters
@@ -1294,12 +1294,12 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// insert the block in the canonical way, re-writing history
self.insert(block)
// write canonical receipts and transactions
- if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
+ if err := WriteTransactions(self.chainDb, block); err != nil {
return err
}
receipts := GetBlockReceipts(self.chainDb, block.Hash())
// write receipts
- if err := PutReceipts(self.chainDb, receipts); err != nil {
+ if err := WriteReceipts(self.chainDb, receipts); err != nil {
return err
}
// Write map map bloom filters
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index e5ed66377..f18b5d084 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -937,8 +937,8 @@ func TestChainTxReorgs(t *testing.T) {
// removed tx
for i, tx := range (types.Transactions{pastDrop, freshDrop}) {
- if GetTransaction(db, tx.Hash()) != nil {
- t.Errorf("drop %d: tx found while shouldn't have been", i)
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Errorf("drop %d: tx %v found while shouldn't have been", i, txn)
}
if GetReceipt(db, tx.Hash()) != nil {
t.Errorf("drop %d: receipt found while shouldn't have been", i)
@@ -946,7 +946,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// added tx
for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) {
- if GetTransaction(db, tx.Hash()) == nil {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil {
t.Errorf("add %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
@@ -955,7 +955,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// shared tx
for i, tx := range (types.Transactions{postponed, swapped}) {
- if GetTransaction(db, tx.Hash()) == nil {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil {
t.Errorf("share %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
diff --git a/core/chain_util.go b/core/database_util.go
index ddff381a1..fbcce3e8c 100644
--- a/core/chain_util.go
+++ b/core/database_util.go
@@ -43,11 +43,15 @@ var (
bodySuffix = []byte("-body")
tdSuffix = []byte("-td")
- ExpDiffPeriod = big.NewInt(100000)
- blockHashPre = []byte("block-hash-") // [deprecated by eth/63]
+ txMetaSuffix = []byte{0x01}
+ receiptsPrefix = []byte("receipts-")
+ blockReceiptsPrefix = []byte("receipts-block-")
mipmapPre = []byte("mipmap-log-bloom-")
MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000}
+
+ ExpDiffPeriod = big.NewInt(100000)
+ blockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually]
)
// CalcDifficulty is the difficulty adjustment algorithm. It returns
@@ -234,6 +238,67 @@ func GetBlock(db ethdb.Database, hash common.Hash) *types.Block {
return types.NewBlockWithHeader(header).WithBody(body.Transactions, body.Uncles)
}
+// GetBlockReceipts retrieves the receipts generated by the transactions included
+// in a block given by its hash.
+func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
+ data, _ := db.Get(append(blockReceiptsPrefix, hash[:]...))
+ if len(data) == 0 {
+ return nil
+ }
+ storageReceipts := []*types.ReceiptForStorage{}
+ if err := rlp.DecodeBytes(data, &storageReceipts); err != nil {
+ glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
+ return nil
+ }
+ receipts := make(types.Receipts, len(storageReceipts))
+ for i, receipt := range storageReceipts {
+ receipts[i] = (*types.Receipt)(receipt)
+ }
+ return receipts
+}
+
+// GetTransaction retrieves a specific transaction from the database, along with
+// its added positional metadata.
+func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {
+ // Retrieve the transaction itself from the database
+ data, _ := db.Get(hash.Bytes())
+ if len(data) == 0 {
+ return nil, common.Hash{}, 0, 0
+ }
+ var tx types.Transaction
+ if err := rlp.DecodeBytes(data, &tx); err != nil {
+ return nil, common.Hash{}, 0, 0
+ }
+ // Retrieve the blockchain positional metadata
+ data, _ = db.Get(append(hash.Bytes(), txMetaSuffix...))
+ if len(data) == 0 {
+ return nil, common.Hash{}, 0, 0
+ }
+ var meta struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
+ }
+ if err := rlp.DecodeBytes(data, &meta); err != nil {
+ return nil, common.Hash{}, 0, 0
+ }
+ return &tx, meta.BlockHash, meta.BlockIndex, meta.Index
+}
+
+// GetReceipt returns a receipt by hash
+func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
+ data, _ := db.Get(append(receiptsPrefix, txHash[:]...))
+ if len(data) == 0 {
+ return nil
+ }
+ var receipt types.ReceiptForStorage
+ err := rlp.DecodeBytes(data, &receipt)
+ if err != nil {
+ glog.V(logger.Core).Infoln("GetReceipt err:", err)
+ }
+ return (*types.Receipt)(&receipt)
+}
+
// WriteCanonicalHash stores the canonical hash for the given block number.
func WriteCanonicalHash(db ethdb.Database, hash common.Hash, number uint64) error {
key := append(blockNumPrefix, big.NewInt(int64(number)).Bytes()...)
@@ -329,6 +394,94 @@ func WriteBlock(db ethdb.Database, block *types.Block) error {
return nil
}
+// WriteBlockReceipts stores all the transaction receipts belonging to a block
+// as a single receipt slice. This is used during chain reorganisations for
+// rescheduling dropped transactions.
+func WriteBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
+ // Convert the receipts into their storage form and serialize them
+ storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
+ for i, receipt := range receipts {
+ storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
+ }
+ bytes, err := rlp.EncodeToBytes(storageReceipts)
+ if err != nil {
+ return err
+ }
+ // Store the flattened receipt slice
+ if err := db.Put(append(blockReceiptsPrefix, hash.Bytes()...), bytes); err != nil {
+ glog.Fatalf("failed to store block receipts into database: %v", err)
+ return err
+ }
+ glog.V(logger.Debug).Infof("stored block receipts [%x…]", hash.Bytes()[:4])
+ return nil
+}
+
+// WriteTransactions stores the transactions associated with a specific block
+// into the given database. Beside writing the transaction, the function also
+// stores a metadata entry along with the transaction, detailing the position
+// of this within the blockchain.
+func WriteTransactions(db ethdb.Database, block *types.Block) error {
+ batch := db.NewBatch()
+
+ // Iterate over each transaction and encode it with its metadata
+ for i, tx := range block.Transactions() {
+ // Encode and queue up the transaction for storage
+ data, err := rlp.EncodeToBytes(tx)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(tx.Hash().Bytes(), data); err != nil {
+ return err
+ }
+ // Encode and queue up the transaction metadata for storage
+ meta := struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
+ }{
+ BlockHash: block.Hash(),
+ BlockIndex: block.NumberU64(),
+ Index: uint64(i),
+ }
+ data, err = rlp.EncodeToBytes(meta)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(append(tx.Hash().Bytes(), txMetaSuffix...), data); err != nil {
+ return err
+ }
+ }
+ // Write the scheduled data into the database
+ if err := batch.Write(); err != nil {
+ glog.Fatalf("failed to store transactions into database: %v", err)
+ return err
+ }
+ return nil
+}
+
+// WriteReceipts stores a batch of transaction receipts into the database.
+func WriteReceipts(db ethdb.Database, receipts types.Receipts) error {
+ batch := db.NewBatch()
+
+ // Iterate over all the receipts and queue them for database injection
+ for _, receipt := range receipts {
+ storageReceipt := (*types.ReceiptForStorage)(receipt)
+ data, err := rlp.EncodeToBytes(storageReceipt)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data); err != nil {
+ return err
+ }
+ }
+ // Write the scheduled data into the database
+ if err := batch.Write(); err != nil {
+ glog.Fatalf("failed to store receipts into database: %v", err)
+ return err
+ }
+ return nil
+}
+
// DeleteCanonicalHash removes the number to hash canonical mapping.
func DeleteCanonicalHash(db ethdb.Database, number uint64) {
db.Delete(append(blockNumPrefix, big.NewInt(int64(number)).Bytes()...))
@@ -351,18 +504,35 @@ func DeleteTd(db ethdb.Database, hash common.Hash) {
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db ethdb.Database, hash common.Hash) {
+ DeleteBlockReceipts(db, hash)
DeleteHeader(db, hash)
DeleteBody(db, hash)
DeleteTd(db, hash)
}
-// [deprecated by eth/63]
+// DeleteBlockReceipts removes all receipt data associated with a block hash.
+func DeleteBlockReceipts(db ethdb.Database, hash common.Hash) {
+ db.Delete(append(blockReceiptsPrefix, hash.Bytes()...))
+}
+
+// DeleteTransaction removes all transaction data associated with a hash.
+func DeleteTransaction(db ethdb.Database, hash common.Hash) {
+ db.Delete(hash.Bytes())
+ db.Delete(append(hash.Bytes(), txMetaSuffix...))
+}
+
+// DeleteReceipt removes all receipt data associated with a transaction hash.
+func DeleteReceipt(db ethdb.Database, hash common.Hash) {
+ db.Delete(append(receiptsPrefix, hash.Bytes()...))
+}
+
+// [deprecated by the header/block split, remove eventually]
// GetBlockByHashOld returns the old combined block corresponding to the hash
// or nil if not found. This method is only used by the upgrade mechanism to
// access the old combined block representation. It will be dropped after the
// network transitions to eth/63.
func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block {
- data, _ := db.Get(append(blockHashPre, hash[:]...))
+ data, _ := db.Get(append(blockHashPrefix, hash[:]...))
if len(data) == 0 {
return nil
}
diff --git a/core/chain_util_test.go b/core/database_util_test.go
index 0bbcbbe53..059f1ae9f 100644
--- a/core/chain_util_test.go
+++ b/core/database_util_test.go
@@ -17,6 +17,7 @@
package core
import (
+ "bytes"
"encoding/json"
"io/ioutil"
"math/big"
@@ -341,6 +342,163 @@ func TestHeadStorage(t *testing.T) {
}
}
+// Tests that transactions and associated metadata can be stored and retrieved.
+func TestTransactionStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), big.NewInt(111), big.NewInt(1111), big.NewInt(11111), []byte{0x11, 0x11, 0x11})
+ tx2 := types.NewTransaction(2, common.BytesToAddress([]byte{0x22}), big.NewInt(222), big.NewInt(2222), big.NewInt(22222), []byte{0x22, 0x22, 0x22})
+ tx3 := types.NewTransaction(3, common.BytesToAddress([]byte{0x33}), big.NewInt(333), big.NewInt(3333), big.NewInt(33333), []byte{0x33, 0x33, 0x33})
+ txs := []*types.Transaction{tx1, tx2, tx3}
+
+ block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil, nil)
+
+ // Check that no transactions entries are in a pristine database
+ for i, tx := range txs {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Fatalf("tx #%d [%x]: non existent transaction returned: %v", i, tx.Hash(), txn)
+ }
+ }
+ // Insert all the transactions into the database, and verify contents
+ if err := WriteTransactions(db, block); err != nil {
+ t.Fatalf("failed to write transactions: %v", err)
+ }
+ for i, tx := range txs {
+ if txn, hash, number, index := GetTransaction(db, tx.Hash()); txn == nil {
+ t.Fatalf("tx #%d [%x]: transaction not found", i, tx.Hash())
+ } else {
+ if hash != block.Hash() || number != block.NumberU64() || index != uint64(i) {
+ t.Fatalf("tx #%d [%x]: positional metadata mismatch: have %x/%d/%d, want %x/%v/%v", i, tx.Hash(), hash, number, index, block.Hash(), block.NumberU64(), i)
+ }
+ if tx.String() != txn.String() {
+ t.Fatalf("tx #%d [%x]: transaction mismatch: have %v, want %v", i, tx.Hash(), txn, tx)
+ }
+ }
+ }
+ // Delete the transactions and check purge
+ for i, tx := range txs {
+ DeleteTransaction(db, tx.Hash())
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn)
+ }
+ }
+}
+
+// Tests that receipts can be stored and retrieved.
+func TestReceiptStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ receipt1 := &types.Receipt{
+ PostState: []byte{0x01},
+ CumulativeGasUsed: big.NewInt(1),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x11})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x01, 0x11})},
+ },
+ TxHash: common.BytesToHash([]byte{0x11, 0x11}),
+ ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}),
+ GasUsed: big.NewInt(111111),
+ }
+ receipt2 := &types.Receipt{
+ PostState: []byte{0x02},
+ CumulativeGasUsed: big.NewInt(2),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x22})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x02, 0x22})},
+ },
+ TxHash: common.BytesToHash([]byte{0x22, 0x22}),
+ ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}),
+ GasUsed: big.NewInt(222222),
+ }
+ receipts := []*types.Receipt{receipt1, receipt2}
+
+ // Check that no receipt entries are in a pristine database
+ for i, receipt := range receipts {
+ if r := GetReceipt(db, receipt.TxHash); r != nil {
+ t.Fatalf("receipt #%d [%x]: non existent receipt returned: %v", i, receipt.TxHash, r)
+ }
+ }
+ // Insert all the receipts into the database, and verify contents
+ if err := WriteReceipts(db, receipts); err != nil {
+ t.Fatalf("failed to write receipts: %v", err)
+ }
+ for i, receipt := range receipts {
+ if r := GetReceipt(db, receipt.TxHash); r == nil {
+ t.Fatalf("receipt #%d [%x]: receipt not found", i, receipt.TxHash)
+ } else {
+ rlpHave, _ := rlp.EncodeToBytes(r)
+ rlpWant, _ := rlp.EncodeToBytes(receipt)
+
+ if bytes.Compare(rlpHave, rlpWant) != 0 {
+ t.Fatalf("receipt #%d [%x]: receipt mismatch: have %v, want %v", i, receipt.TxHash, r, receipt)
+ }
+ }
+ }
+ // Delete the receipts and check purge
+ for i, receipt := range receipts {
+ DeleteReceipt(db, receipt.TxHash)
+ if r := GetReceipt(db, receipt.TxHash); r != nil {
+ t.Fatalf("receipt #%d [%x]: deleted receipt returned: %v", i, receipt.TxHash, r)
+ }
+ }
+}
+
+// Tests that receipts associated with a single block can be stored and retrieved.
+func TestBlockReceiptStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ receipt1 := &types.Receipt{
+ PostState: []byte{0x01},
+ CumulativeGasUsed: big.NewInt(1),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x11})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x01, 0x11})},
+ },
+ TxHash: common.BytesToHash([]byte{0x11, 0x11}),
+ ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}),
+ GasUsed: big.NewInt(111111),
+ }
+ receipt2 := &types.Receipt{
+ PostState: []byte{0x02},
+ CumulativeGasUsed: big.NewInt(2),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x22})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x02, 0x22})},
+ },
+ TxHash: common.BytesToHash([]byte{0x22, 0x22}),
+ ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}),
+ GasUsed: big.NewInt(222222),
+ }
+ receipts := []*types.Receipt{receipt1, receipt2}
+
+ // Check that no receipt entries are in a pristine database
+ hash := common.BytesToHash([]byte{0x03, 0x14})
+ if rs := GetBlockReceipts(db, hash); len(rs) != 0 {
+ t.Fatalf("non existent receipts returned: %v", rs)
+ }
+ // Insert the receipt slice into the database and check presence
+ if err := WriteBlockReceipts(db, hash, receipts); err != nil {
+ t.Fatalf("failed to write block receipts: %v", err)
+ }
+ if rs := GetBlockReceipts(db, hash); len(rs) == 0 {
+ t.Fatalf("no receipts returned")
+ } else {
+ for i := 0; i < len(receipts); i++ {
+ rlpHave, _ := rlp.EncodeToBytes(rs[i])
+ rlpWant, _ := rlp.EncodeToBytes(receipts[i])
+
+ if bytes.Compare(rlpHave, rlpWant) != 0 {
+ t.Fatalf("receipt #%d: receipt mismatch: have %v, want %v", i, rs[i], receipts[i])
+ }
+ }
+ }
+ // Delete the receipt slice and check purge
+ DeleteBlockReceipts(db, hash)
+ if rs := GetBlockReceipts(db, hash); len(rs) != 0 {
+ t.Fatalf("deleted receipts returned: %v", rs)
+ }
+}
+
func TestMipmapBloom(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
@@ -425,7 +583,7 @@ func TestMipmapChain(t *testing.T) {
}
// store the receipts
- err := PutReceipts(db, receipts)
+ err := WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -439,7 +597,7 @@ func TestMipmapChain(t *testing.T) {
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/core/genesis.go b/core/genesis.go
index dac5de92f..3fd8f42b0 100644
--- a/core/genesis.go
+++ b/core/genesis.go
@@ -103,7 +103,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block,
if err := WriteBlock(chainDb, block); err != nil {
return nil, err
}
- if err := PutBlockReceipts(chainDb, block.Hash(), nil); err != nil {
+ if err := WriteBlockReceipts(chainDb, block.Hash(), nil); err != nil {
return nil, err
}
if err := WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()); err != nil {
diff --git a/core/transaction_util.go b/core/transaction_util.go
deleted file mode 100644
index e2e5b9aee..000000000
--- a/core/transaction_util.go
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package core
-
-import (
- "fmt"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/syndtr/goleveldb/leveldb"
-)
-
-var (
- receiptsPre = []byte("receipts-")
- blockReceiptsPre = []byte("receipts-block-")
-)
-
-// PutTransactions stores the transactions in the given database
-func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error {
- batch := db.NewBatch()
-
- for i, tx := range block.Transactions() {
- rlpEnc, err := rlp.EncodeToBytes(tx)
- if err != nil {
- return fmt.Errorf("failed encoding tx: %v", err)
- }
-
- batch.Put(tx.Hash().Bytes(), rlpEnc)
-
- var txExtra struct {
- BlockHash common.Hash
- BlockIndex uint64
- Index uint64
- }
- txExtra.BlockHash = block.Hash()
- txExtra.BlockIndex = block.NumberU64()
- txExtra.Index = uint64(i)
- rlpMeta, err := rlp.EncodeToBytes(txExtra)
- if err != nil {
- return fmt.Errorf("failed encoding tx meta data: %v", err)
- }
-
- batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
- }
-
- if err := batch.Write(); err != nil {
- return fmt.Errorf("failed writing tx to db: %v", err)
- }
- return nil
-}
-
-func DeleteTransaction(db ethdb.Database, txHash common.Hash) {
- db.Delete(txHash[:])
-}
-
-func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction {
- data, _ := db.Get(txhash[:])
- if len(data) != 0 {
- var tx types.Transaction
- if err := rlp.DecodeBytes(data, &tx); err != nil {
- return nil
- }
- return &tx
- }
- return nil
-}
-
-// PutReceipts stores the receipts in the current database
-func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
- batch := new(leveldb.Batch)
- _, batchWrite := db.(*ethdb.LDBDatabase)
-
- for _, receipt := range receipts {
- storageReceipt := (*types.ReceiptForStorage)(receipt)
- bytes, err := rlp.EncodeToBytes(storageReceipt)
- if err != nil {
- return err
- }
-
- if batchWrite {
- batch.Put(append(receiptsPre, receipt.TxHash[:]...), bytes)
- } else {
- err = db.Put(append(receiptsPre, receipt.TxHash[:]...), bytes)
- if err != nil {
- return err
- }
- }
- }
- if db, ok := db.(*ethdb.LDBDatabase); ok {
- if err := db.LDB().Write(batch, nil); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// Delete a receipts from the database
-func DeleteReceipt(db ethdb.Database, txHash common.Hash) {
- db.Delete(append(receiptsPre, txHash[:]...))
-}
-
-// GetReceipt returns a receipt by hash
-func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
- data, _ := db.Get(append(receiptsPre, txHash[:]...))
- if len(data) == 0 {
- return nil
- }
- var receipt types.ReceiptForStorage
- err := rlp.DecodeBytes(data, &receipt)
- if err != nil {
- glog.V(logger.Core).Infoln("GetReceipt err:", err)
- }
- return (*types.Receipt)(&receipt)
-}
-
-// GetBlockReceipts returns the receipts generated by the transactions
-// included in block's given hash.
-func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
- data, _ := db.Get(append(blockReceiptsPre, hash[:]...))
- if len(data) == 0 {
- return nil
- }
- rs := []*types.ReceiptForStorage{}
- if err := rlp.DecodeBytes(data, &rs); err != nil {
- glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
- return nil
- }
- receipts := make(types.Receipts, len(rs))
- for i, receipt := range rs {
- receipts[i] = (*types.Receipt)(receipt)
- }
- return receipts
-}
-
-// PutBlockReceipts stores the block's transactions associated receipts
-// and stores them by block hash in a single slice. This is required for
-// forks and chain reorgs
-func PutBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
- rs := make([]*types.ReceiptForStorage, len(receipts))
- for i, receipt := range receipts {
- rs[i] = (*types.ReceiptForStorage)(receipt)
- }
- bytes, err := rlp.EncodeToBytes(rs)
- if err != nil {
- return err
- }
- err = db.Put(append(blockReceiptsPre, hash[:]...), bytes)
- if err != nil {
- return err
- }
- return nil
-}
diff --git a/eth/backend_test.go b/eth/backend_test.go
index 0379fc843..83219de62 100644
--- a/eth/backend_test.go
+++ b/eth/backend_test.go
@@ -32,7 +32,7 @@ func TestMipmapUpgrade(t *testing.T) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -45,7 +45,7 @@ func TestMipmapUpgrade(t *testing.T) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 153427ee4..5fa18a2e3 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -74,7 +74,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer")
@@ -90,6 +89,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
)
@@ -129,7 +129,6 @@ type Downloader struct {
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
- processing int32
notified int32
// Channels
@@ -215,7 +214,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
- return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
+ return atomic.LoadInt32(&d.synchronising) > 0
}
// RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +262,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id)
- case errPendingQueue:
- glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
@@ -290,10 +286,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
- // Abort if the queue still contains some leftover data
- if d.queue.GetHeadResult() != nil {
- return errPendingQueue
- }
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
@@ -335,7 +327,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() {
// reset on error
if err != nil {
- d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -365,23 +356,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent hash and block retrieval algorithm
+ // Initiate the sync using a concurrent hash and block retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, d.mode, 0)
-
- errc := make(chan error, 2)
- go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
- go func() { errc <- d.fetchBlocks61(origin + 1) }()
-
- // If any fetcher fails, cancel the other
- if err := <-errc; err != nil {
- d.cancel()
- <-errc
- return err
- }
- return <-errc
+ return d.spawnSync(
+ func() error { return d.fetchHashes61(p, td, origin+1) },
+ func() error { return d.fetchBlocks61(origin + 1) },
+ )
case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +388,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode {
case LightSync:
pivot = latest
-
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +408,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
}
d.queue.Prepare(origin+1, d.mode, pivot)
-
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 4)
- go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
- go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
-
- // If any fetcher fails, cancel the others
- var fail error
- for i := 0; i < cap(errc); i++ {
- if err := <-errc; err != nil {
- if fail == nil {
- fail = err
- d.cancel()
- }
- }
- }
- return fail
+ return d.spawnSync(
+ func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ )
default:
// Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer
}
- return nil
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers ...func() error) error {
+ var wg sync.WaitGroup
+ errc := make(chan error, len(fetchers)+1)
+ wg.Add(len(fetchers) + 1)
+ go func() { defer wg.Done(); errc <- d.process() }()
+ for _, fn := range fetchers {
+ fn := fn
+ go func() { defer wg.Done(); errc <- fn() }()
+ }
+ // Wait for the first error, then terminate the others.
+ var err error
+ for i := 0; i < len(fetchers)+1; i++ {
+ if i == len(fetchers) {
+ // Close the queue when all fetchers have exited.
+ // This will cause the block processor to end when
+ // it has processed the queue.
+ d.queue.Close()
+ }
+ if err = <-errc; err != nil {
+ break
+ }
+ }
+ d.queue.Close()
+ d.cancel()
+ wg.Wait()
+ return err
}
// cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +469,10 @@ func (d *Downloader) cancel() {
}
}
d.cancelLock.Unlock()
-
- // Reset the queue
- d.queue.Reset()
}
// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
d.cancel()
@@ -495,15 +492,6 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-d.hashCh:
- // Out of bounds hashes received, ignore them
-
case packet := <-d.blockCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
@@ -521,6 +509,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout
+
+ case <-d.hashCh:
+ // Out of bounds hashes received, ignore them
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -571,18 +569,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -631,18 +630,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -676,12 +676,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHashFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id {
@@ -750,6 +744,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1)
return errTimeout
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -774,12 +775,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.cancelCh:
return errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
@@ -800,7 +795,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Promote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- go d.process()
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -826,7 +820,6 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
peer.Demote()
peer.SetBlocksIdle()
glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
- go d.process()
}
}
// Blocks arrived, try to update the progress
@@ -909,6 +902,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable
}
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -941,18 +941,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
}
return headers[0].Number.Uint64(), nil
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1008,18 +1009,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -1068,18 +1070,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1141,12 +1144,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHeaderFetch
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
if packet.PeerId() != p.id {
@@ -1268,6 +1265,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
return nil
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1336,10 +1338,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel()
return
}
- // Processing succeeded, notify state fetcher and processor of continuation
- if d.queue.PendingNodeData() == 0 {
- go d.process()
- } else {
+ // Processing succeeded, notify state fetcher of continuation
+ if d.queue.PendingNodeData() > 0 {
select {
case d.stateWakeCh <- true:
default:
@@ -1348,7 +1348,6 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
-
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
@@ -1391,12 +1390,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
case <-d.cancelCh:
return errCancel
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
@@ -1415,7 +1408,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Promote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
- go d.process()
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
@@ -1441,7 +1433,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
peer.Demote()
setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
- go d.process()
}
}
// Blocks assembled, try to update the progress
@@ -1508,7 +1499,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if progress {
progressed = true
- go d.process()
}
if request == nil {
continue
@@ -1540,51 +1530,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable
}
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents:
-// -
-//
-// The algorithmic flow is as follows:
-// - The `processing` flag is swapped to 1 to ensure singleton access
-// - The current `cancel` channel is retrieved to detect sync abortions
-// - Blocks are iteratively taken from the cache and inserted into the chain
-// - When the cache becomes empty, insertion stops
-// - The `processing` flag is swapped back to 0
-// - A post-exit check is made whether new blocks became available
-// - This step is important: it handles a potential race condition between
-// checking for no more work, and releasing the processing "mutex". In
-// between these state changes, a block may have arrived, but a processing
-// attempt denied, so we need to re-enter to ensure the block isn't left
-// to idle in the cache.
-func (d *Downloader) process() {
- // Make sure only one goroutine is ever allowed to process blocks at once
- if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
- return
- }
- // If the processor just exited, but there are freshly pending items, try to
- // reenter. This is needed because the goroutine spinned up for processing
- // the fresh results might have been rejected entry to to this present thread
- // not yet releasing the `processing` state.
- defer func() {
- if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
- d.process()
- }
- }()
- // Release the lock upon exit (note, before checking for reentry!)
- // the import statistics to zero.
- defer atomic.StoreInt32(&d.processing, 0)
-
- // Repeat the processing as long as there are results to process
+// chain. The type of import operation will depend on the result contents.
+func (d *Downloader) process() error {
+ pivot := d.queue.FastSyncPivot()
for {
- // Fetch the next batch of results
- pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
- results := d.queue.TakeResults()
+ results := d.queue.WaitResults()
if len(results) == 0 {
- return
+ return nil // queue empty
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
@@ -1597,7 +1559,7 @@ func (d *Downloader) process() {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return
+ return errCancelProcessing
}
// Retrieve the a batch of results to import
var (
@@ -1633,8 +1595,7 @@ func (d *Downloader) process() {
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
- d.cancel()
- return
+ return err
}
// Shift the results to the next batch
results = results[items:]
@@ -1685,19 +1646,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items()))
}
}()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
-
+ if cancel == nil {
+ return errNoSyncActive
+ }
select {
case destCh <- packet:
return nil
-
case <-cancel:
return errNoSyncActive
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ef6f74a6b..cfcc8a2ef 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
}
}
dl.lock.RUnlock()
-
- err := dl.downloader.synchronise(id, hash, td, mode)
- for {
- // If the queue is empty and processing stopped, break
- if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
- break
- }
- // Otherwise sleep a bit and retry
- time.Sleep(time.Millisecond)
- }
- return err
+ return dl.downloader.synchronise(id, hash, td, mode)
}
// hasHeader checks if a header is present in the testers canonical chain.
@@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis
func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond)
- tester.lock.RLock()
- tester.downloader.queue.lock.RLock()
+ tester.lock.Lock()
+ tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
}
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
- tester.downloader.queue.lock.RUnlock()
- tester.lock.RUnlock()
+ tester.downloader.queue.lock.Unlock()
+ tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break
@@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(
func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither hashes nor blocks are accepted
@@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers and
// bodies.
func TestInactiveDownloader62(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
@@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t,
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
@@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
- {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
- {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
@@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small block chain
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
}
}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
+func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
+func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
+func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
+func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
+func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+ hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
+ fakeHeads := []*types.Header{{}, {}, {}, {}}
+ for i := 0; i < 200; i++ {
+ tester := newTester()
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ // Whenever the downloader requests headers, flood it with
+ // a lot of unrequested header deliveries.
+ tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ tester.downloader.DeliverHeaders(peer, fakeHeads)
+ deliveriesDone <- struct{}{}
+ }()
+ }
+ // Deliver the actual requested headers.
+ impl := tester.peerGetAbsHeadersFn("peer", 0)
+ go impl(from, count, skip, reverse)
+ // None of the extra deliveries should block.
+ timeout := time.After(5 * time.Second)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+ }
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Errorf("sync failed: %v", err)
+ }
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 1fb5b6e12..584797d7b 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
- lock sync.RWMutex
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
+ lock := new(sync.Mutex)
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
+ active: sync.NewCond(lock),
+ lock: lock,
}
}
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
+ q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
func (q *queue) Idle() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.fastSyncPivot
}
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests
pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests
pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
- q.lock.RLock()
- defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
- // If there are no results pending, return nil
- if len(q.resultCache) == 0 || q.resultCache[0] == nil {
- return nil
- }
- // If the next result is still incomplete, return nil
- if q.resultCache[0].Pending > 0 {
- return nil
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ q.active.Wait()
+ nproc = q.countProcessableItems()
}
- // If the next result is the fast sync pivot...
- if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
- // If the pivot state trie is still being pulled, return nil
- if len(q.stateTaskPool) > 0 {
- return nil
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- if q.PendingNodeData() > 0 {
- return nil
- }
- // If the state is done, but not enough post-pivot headers were verified, stall...
- for i := 0; i < fsHeaderForceVerify; i++ {
- if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
- return nil
- }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
}
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
}
- return q.resultCache[0]
+ return results
}
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Accumulate all available results
- results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Stop if no more results are ready
+ // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
- break
+ return i
}
- // The fast sync pivot block may only be processed after state fetch completes
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- if len(q.stateTaskPool) > 0 {
- break
- }
- if q.PendingNodeData() > 0 {
- break
- }
- // Even is state fetch is done, ensure post-pivot headers passed verifications
- safe := true
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- safe = false
+ // Special handling for the fast-sync pivot block:
+ if q.mode == FastSync {
+ bnum := result.Header.Number.Uint64()
+ if bnum == q.fastSyncPivot {
+ // If the state of the pivot block is not
+ // available yet, we cannot proceed and return 0.
+ //
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ return i
+ }
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+ return i
+ }
}
}
- if !safe {
- break
+ // If we're just the fast sync pivot, stop as well
+ // because the following batch needs different insertion.
+ // This simplifies handling the switchover in d.process.
+ if bnum == q.fastSyncPivot+1 && i > 0 {
+ return i
}
}
- // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
- break
- }
- results = append(results, result)
-
- hash := result.Header.Hash()
- delete(q.blockDonePool, hash)
- delete(q.receiptDonePool, hash)
}
- // Delete the results from the slice and let them be garbage collected
- // without this slice trick the results would stay in memory until nil
- // would be assigned to them.
- copy(q.resultCache, q.resultCache[len(results):])
- for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
- q.resultCache[k] = nil
- }
- q.resultOffset += uint64(len(results))
-
- return results
+ return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
@@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
@@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
+ // Wake up WaitResults
+ q.active.Signal()
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
return nil
-
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errs[0]
-
case len(errs) == len(blocks):
return errStaleDelivery
-
default:
return fmt.Errorf("multiple failures: %v", errs)
}
@@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
+ // Wake up WaitResults
+ q.active.Signal()
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
return failure
-
case useful:
return fmt.Errorf("partial failure: %v", failure)
-
default:
return errStaleDelivery
}
@@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
switch {
case len(errs) == 0:
return nil
-
case len(errs) == len(request.Hashes):
return errStaleDelivery
-
default:
return fmt.Errorf("multiple failures: %v", errs)
}
@@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+ // Wake up WaitResults after the state has been written because it
+ // might be waiting for the pivot block state to get completed.
+ defer q.active.Signal()
+
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index a5418e2e7..5772114b3 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -64,7 +64,7 @@ func BenchmarkMipmaps(b *testing.B) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
b.Fatal(err)
}
@@ -78,7 +78,7 @@ func BenchmarkMipmaps(b *testing.B) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
b.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
b.Fatal("error writing block receipts:", err)
}
}
@@ -163,7 +163,7 @@ func TestFilters(t *testing.T) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -180,7 +180,7 @@ func TestFilters(t *testing.T) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/eth/sync.go b/eth/sync.go
index bbf2abc04..dd8aef8e4 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -175,10 +175,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
}
// If fast sync was enabled, and we synced up, disable it
if pm.fastSync {
- // Wait until all pending imports finish processing
- for pm.downloader.Synchronising() {
- time.Sleep(100 * time.Millisecond)
- }
// Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 {
glog.V(logger.Info).Infof("fast sync complete, auto disabling")
diff --git a/miner/worker.go b/miner/worker.go
index 238f1a4bf..aa0fa85cb 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -305,9 +305,9 @@ func (self *worker) wait() {
// check if canon block and write transactions
if stat == core.CanonStatTy {
// This puts transactions in a extra db for rpc
- core.PutTransactions(self.chainDb, block, block.Transactions())
+ core.WriteTransactions(self.chainDb, block)
// store the receipts
- core.PutReceipts(self.chainDb, work.receipts)
+ core.WriteReceipts(self.chainDb, work.receipts)
// Write map map bloom filters
core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts)
}
@@ -320,7 +320,7 @@ func (self *worker) wait() {
self.mux.Post(core.ChainHeadEvent{block})
self.mux.Post(logs)
}
- if err := core.PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
}(block, work.state.Logs(), work.receipts)
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 243bef0b8..ae9f1fe47 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -322,44 +322,11 @@ func (self *XEth) EthBlockByHash(strHash string) *types.Block {
return block
}
-func (self *XEth) EthTransactionByHash(hash string) (tx *types.Transaction, blhash common.Hash, blnum *big.Int, txi uint64) {
- // Due to increasing return params and need to determine if this is from transaction pool or
- // some chain, this probably needs to be refactored for more expressiveness
- data, _ := self.backend.ChainDb().Get(common.FromHex(hash))
- if len(data) != 0 {
- dtx := new(types.Transaction)
- if err := rlp.DecodeBytes(data, dtx); err != nil {
- glog.V(logger.Error).Infoln(err)
- return
- }
- tx = dtx
- } else { // check pending transactions
- tx = self.backend.TxPool().GetTransaction(common.HexToHash(hash))
- }
-
- // meta
- var txExtra struct {
- BlockHash common.Hash
- BlockIndex uint64
- Index uint64
- }
-
- v, dberr := self.backend.ChainDb().Get(append(common.FromHex(hash), 0x0001))
- // TODO check specifically for ErrNotFound
- if dberr != nil {
- return
+func (self *XEth) EthTransactionByHash(hash string) (*types.Transaction, common.Hash, uint64, uint64) {
+ if tx, hash, number, index := core.GetTransaction(self.backend.ChainDb(), common.HexToHash(hash)); tx != nil {
+ return tx, hash, number, index
}
- r := bytes.NewReader(v)
- err := rlp.Decode(r, &txExtra)
- if err == nil {
- blhash = txExtra.BlockHash
- blnum = big.NewInt(int64(txExtra.BlockIndex))
- txi = txExtra.Index
- } else {
- glog.V(logger.Error).Infoln(err)
- }
-
- return
+ return self.backend.TxPool().GetTransaction(common.HexToHash(hash)), common.Hash{}, 0, 0
}
func (self *XEth) BlockByNumber(num int64) *Block {