aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile67
-rw-r--r--cmd/geth/main.go26
-rw-r--r--cmd/utils/cmd.go10
-rw-r--r--cmd/utils/flags.go2
-rw-r--r--cmd/utils/legalese.go41
-rw-r--r--core/bench_test.go1
-rw-r--r--core/block_processor.go460
-rw-r--r--core/block_validator.go243
-rw-r--r--core/block_validator_test.go (renamed from core/block_processor_test.go)6
-rw-r--r--core/blockchain.go146
-rw-r--r--core/blockchain_test.go147
-rw-r--r--core/chain_makers.go11
-rw-r--r--core/chain_makers_test.go9
-rw-r--r--core/database_util.go (renamed from core/chain_util.go)178
-rw-r--r--core/database_util_test.go (renamed from core/chain_util_test.go)162
-rw-r--r--core/gaspool.go46
-rw-r--r--core/genesis.go2
-rw-r--r--core/state_processor.go107
-rw-r--r--core/transaction_util.go171
-rw-r--r--core/types.go70
-rw-r--r--core/vm/runtime/doc.go (renamed from core/types/common.go)11
-rw-r--r--core/vm/runtime/env.go106
-rw-r--r--core/vm/runtime/runtime.go121
-rw-r--r--core/vm/runtime/runtime_example_test.go (renamed from core/manager.go)26
-rw-r--r--core/vm/runtime/runtime_test.go120
-rw-r--r--crypto/secp256k1/panic_cb.go33
-rw-r--r--crypto/secp256k1/secp256.go122
-rw-r--r--crypto/secp256k1/secp256_test.go15
-rw-r--r--eth/backend.go41
-rw-r--r--eth/backend_test.go4
-rw-r--r--eth/downloader/downloader.go488
-rw-r--r--eth/downloader/downloader_test.go135
-rw-r--r--eth/downloader/peer.go231
-rw-r--r--eth/downloader/queue.go289
-rw-r--r--eth/filters/filter_test.go8
-rw-r--r--eth/gasprice.go2
-rw-r--r--eth/helper_test.go2
-rw-r--r--eth/sync.go4
-rw-r--r--miner/worker.go27
-rw-r--r--rpc/api/debug.go28
-rw-r--r--rpc/api/utils.go2
-rw-r--r--xeth/xeth.go43
42 files changed, 2181 insertions, 1582 deletions
diff --git a/Makefile b/Makefile
index d2b57e13f..41cbc1ce6 100644
--- a/Makefile
+++ b/Makefile
@@ -2,9 +2,17 @@
# with Go source code. If you know what GOPATH is then you probably
# don't need to bother with make.
-.PHONY: geth geth-cross geth-linux geth-darwin geth-windows geth-android evm all test travis-test-with-coverage xgo clean
+.PHONY: geth geth-cross evm all test travis-test-with-coverage xgo clean
+.PHONY: geth-linux geth-linux-arm geth-linux-386 geth-linux-amd64
+.PHONY: geth-darwin geth-darwin-386 geth-darwin-amd64
+.PHONY: geth-windows geth-windows-386 geth-windows-amd64
+.PHONY: geth-android geth-android-16 geth-android-21
+
GOBIN = build/bin
+CROSSDEPS = https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2
+GO ?= latest
+
geth:
build/env.sh go install -v $(shell build/flags.sh) ./cmd/geth
@echo "Done building."
@@ -14,26 +22,67 @@ geth-cross: geth-linux geth-darwin geth-windows geth-android
@echo "Full cross compilation done:"
@ls -l $(GOBIN)/geth-*
-geth-linux: xgo
- build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=linux/* -v $(shell build/flags.sh) ./cmd/geth
+geth-linux: xgo geth-linux-arm geth-linux-386 geth-linux-amd64
@echo "Linux cross compilation done:"
@ls -l $(GOBIN)/geth-linux-*
-geth-darwin: xgo
- build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=darwin/* -v $(shell build/flags.sh) ./cmd/geth
+geth-linux-arm: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=linux/arm -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Linux ARM cross compilation done:"
+ @ls -l $(GOBIN)/geth-linux-* | grep arm
+
+geth-linux-386: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=linux/386 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Linux 386 cross compilation done:"
+ @ls -l $(GOBIN)/geth-linux-* | grep 386
+
+geth-linux-amd64: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=linux/amd64 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Linux amd64 cross compilation done:"
+ @ls -l $(GOBIN)/geth-linux-* | grep amd64
+
+geth-darwin: xgo geth-darwin-386 geth-darwin-amd64
@echo "Darwin cross compilation done:"
@ls -l $(GOBIN)/geth-darwin-*
-geth-windows: xgo
- build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=windows/* -v $(shell build/flags.sh) ./cmd/geth
+geth-darwin-386: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=darwin/386 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Darwin 386 cross compilation done:"
+ @ls -l $(GOBIN)/geth-darwin-* | grep 386
+
+geth-darwin-amd64: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=darwin/amd64 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Darwin amd64 cross compilation done:"
+ @ls -l $(GOBIN)/geth-darwin-* | grep amd64
+
+geth-windows: xgo geth-windows-386 geth-windows-amd64
@echo "Windows cross compilation done:"
@ls -l $(GOBIN)/geth-windows-*
-geth-android: xgo
- build/env.sh $(GOBIN)/xgo --dest=$(GOBIN) --deps=https://gmplib.org/download/gmp/gmp-6.0.0a.tar.bz2 --targets=android-16/*,android-21/* -v $(shell build/flags.sh) ./cmd/geth
+geth-windows-386: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=windows/386 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Windows 386 cross compilation done:"
+ @ls -l $(GOBIN)/geth-windows-* | grep 386
+
+geth-windows-amd64: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=windows/amd64 -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Windows amd64 cross compilation done:"
+ @ls -l $(GOBIN)/geth-windows-* | grep amd64
+
+geth-android: xgo geth-android-16 geth-android-21
@echo "Android cross compilation done:"
@ls -l $(GOBIN)/geth-android-*
+geth-android-16: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=android-16/* -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Android 16 cross compilation done:"
+ @ls -l $(GOBIN)/geth-android-16-*
+
+geth-android-21: xgo
+ build/env.sh $(GOBIN)/xgo --go=$(GO) --dest=$(GOBIN) --deps=$(CROSSDEPS) --targets=android-21/* -v $(shell build/flags.sh) ./cmd/geth
+ @echo "Android 21 cross compilation done:"
+ @ls -l $(GOBIN)/geth-android-21-*
+
evm:
build/env.sh $(GOROOT)/bin/go install -v $(shell build/flags.sh) ./cmd/evm
@echo "Done building."
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index d63d20580..82bc21ab0 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -405,8 +405,6 @@ func makeDefaultExtra() []byte {
}
func run(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx)
cfg.ExtraData = makeExtra(ctx)
@@ -421,8 +419,6 @@ func run(ctx *cli.Context) {
}
func attach(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
var client comms.EthereumClient
var err error
if ctx.Args().Present() {
@@ -454,8 +450,6 @@ func attach(ctx *cli.Context) {
}
func console(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx)
cfg.ExtraData = makeExtra(ctx)
@@ -488,8 +482,6 @@ func console(ctx *cli.Context) {
}
func execJSFiles(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx)
ethereum, err := eth.New(cfg)
if err != nil {
@@ -515,8 +507,6 @@ func execJSFiles(ctx *cli.Context) {
}
func unlockAccount(ctx *cli.Context, am *accounts.Manager, addr string, i int, inputpassphrases []string) (addrHex, auth string, passphrases []string) {
- utils.CheckLegalese(ctx.GlobalString(utils.DataDirFlag.Name))
-
var err error
passphrases = inputpassphrases
addrHex, err = utils.ParamToAddress(addr, am)
@@ -541,16 +531,12 @@ func unlockAccount(ctx *cli.Context, am *accounts.Manager, addr string, i int, i
}
func blockRecovery(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
if len(ctx.Args()) < 1 {
glog.Fatal("recover requires block number or hash")
}
arg := ctx.Args().First()
cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx)
- utils.CheckLegalese(cfg.DataDir)
-
blockDb, err := ethdb.NewLDBDatabase(filepath.Join(cfg.DataDir, "blockchain"), cfg.DatabaseCache)
if err != nil {
glog.Fatalln("could not open db:", err)
@@ -611,8 +597,6 @@ func startEth(ctx *cli.Context, eth *eth.Ethereum) {
}
func accountList(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
am := utils.MakeAccountManager(ctx)
accts, err := am.Accounts()
if err != nil {
@@ -664,8 +648,6 @@ func getPassPhrase(ctx *cli.Context, desc string, confirmation bool, i int, inpu
}
func accountCreate(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
am := utils.MakeAccountManager(ctx)
passphrase, _ := getPassPhrase(ctx, "Your new account is locked with a password. Please give a password. Do not forget this password.", true, 0, nil)
acct, err := am.NewAccount(passphrase)
@@ -676,8 +658,6 @@ func accountCreate(ctx *cli.Context) {
}
func accountUpdate(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
am := utils.MakeAccountManager(ctx)
arg := ctx.Args().First()
if len(arg) == 0 {
@@ -693,8 +673,6 @@ func accountUpdate(ctx *cli.Context) {
}
func importWallet(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
keyfile := ctx.Args().First()
if len(keyfile) == 0 {
utils.Fatalf("keyfile must be given as argument")
@@ -715,8 +693,6 @@ func importWallet(ctx *cli.Context) {
}
func accountImport(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
keyfile := ctx.Args().First()
if len(keyfile) == 0 {
utils.Fatalf("keyfile must be given as argument")
@@ -731,8 +707,6 @@ func accountImport(ctx *cli.Context) {
}
func makedag(ctx *cli.Context) {
- utils.CheckLegalese(utils.MustDataDir(ctx))
-
args := ctx.Args()
wrongArgs := func() {
utils.Fatalf(`Usage: geth makedag <block number> <outputdir>`)
diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go
index 9b75ccab4..5cbb58124 100644
--- a/cmd/utils/cmd.go
+++ b/cmd/utils/cmd.go
@@ -95,16 +95,6 @@ func PromptPassword(prompt string, warnTerm bool) (string, error) {
return input, err
}
-func CheckLegalese(datadir string) {
- // check "first run"
- if !common.FileExist(datadir) {
- r, _ := PromptConfirm(legalese)
- if !r {
- Fatalf("Must accept to continue. Shutting down...\n")
- }
- }
-}
-
// Fatalf formats a message to standard error and exits the program.
// The message is also printed to standard output if standard error
// is redirected to a different file.
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 8335517df..3792dc1e0 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -557,8 +557,6 @@ func MakeChain(ctx *cli.Context) (chain *core.BlockChain, chainDb ethdb.Database
Fatalf("Could not start chainmanager: %v", err)
}
- proc := core.NewBlockProcessor(chainDb, pow, chain, eventMux)
- chain.SetProcessor(proc)
return chain, chainDb
}
diff --git a/cmd/utils/legalese.go b/cmd/utils/legalese.go
deleted file mode 100644
index 09e687c17..000000000
--- a/cmd/utils/legalese.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of go-ethereum.
-//
-// go-ethereum is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// go-ethereum is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
-
-package utils
-
-const (
- legalese = `
-=======================================
-Disclaimer of Liabilites and Warranties
-=======================================
-
-THE USER EXPRESSLY KNOWS AND AGREES THAT THE USER IS USING THE ETHEREUM PLATFORM AT THE USER’S SOLE
-RISK. THE USER REPRESENTS THAT THE USER HAS AN ADEQUATE UNDERSTANDING OF THE RISKS, USAGE AND
-INTRICACIES OF CRYPTOGRAPHIC TOKENS AND BLOCKCHAIN-BASED OPEN SOURCE SOFTWARE, ETH PLATFORM AND ETH.
-THE USER ACKNOWLEDGES AND AGREES THAT, TO THE FULLEST EXTENT PERMITTED BY ANY APPLICABLE LAW, THE
-DISCLAIMERS OF LIABILITY CONTAINED HEREIN APPLY TO ANY AND ALL DAMAGES OR INJURY WHATSOEVER CAUSED
-BY OR RELATED TO RISKS OF, USE OF, OR INABILITY TO USE, ETH OR THE ETHEREUM PLATFORM UNDER ANY CAUSE
-OR ACTION WHATSOEVER OF ANY KIND IN ANY JURISDICTION, INCLUDING, WITHOUT LIMITATION, ACTIONS FOR
-BREACH OF WARRANTY, BREACH OF CONTRACT OR TORT (INCLUDING NEGLIGENCE) AND THAT NEITHER STIFTUNG
-ETHEREUM NOR ETHEREUM TEAM SHALL BE LIABLE FOR ANY INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY OR
-CONSEQUENTIAL DAMAGES, INCLUDING FOR LOSS OF PROFITS, GOODWILL OR DATA. SOME JURISDICTIONS DO NOT
-ALLOW THE EXCLUSION OF CERTAIN WARRANTIES OR THE LIMITATION OR EXCLUSION OF LIABILITY FOR CERTAIN
-TYPES OF DAMAGES. THEREFORE, SOME OF THE ABOVE LIMITATIONS IN THIS SECTION MAY NOT APPLY TO A USER.
-IN PARTICULAR, NOTHING IN THESE TERMS SHALL AFFECT THE STATUTORY RIGHTS OF ANY USER OR EXCLUDE
-INJURY ARISING FROM ANY WILLFUL MISCONDUCT OR FRAUD OF STIFTUNG ETHEREUM.
-
-Do you accept this agreement?`
-)
diff --git a/core/bench_test.go b/core/bench_test.go
index b5eb51803..6fa7659b9 100644
--- a/core/bench_test.go
+++ b/core/bench_test.go
@@ -169,7 +169,6 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
// State and blocks are stored in the same DB.
evmux := new(event.TypeMux)
chainman, _ := NewBlockChain(db, FakePow{}, evmux)
- chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux))
defer chainman.Stop()
b.ReportAllocs()
b.ResetTimer()
diff --git a/core/block_processor.go b/core/block_processor.go
deleted file mode 100644
index e7b2f63e5..000000000
--- a/core/block_processor.go
+++ /dev/null
@@ -1,460 +0,0 @@
-// Copyright 2014 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package core
-
-import (
- "fmt"
- "math/big"
- "sync"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/state"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/core/vm"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/pow"
- "gopkg.in/fatih/set.v0"
-)
-
-const (
- // must be bumped when consensus algorithm is changed, this forces the upgradedb
- // command to be run (forces the blocks to be imported again using the new algorithm)
- BlockChainVersion = 3
-)
-
-type BlockProcessor struct {
- chainDb ethdb.Database
- // Mutex for locking the block processor. Blocks can only be handled one at a time
- mutex sync.Mutex
- // Canonical block chain
- bc *BlockChain
- // non-persistent key/value memory storage
- mem map[string]*big.Int
- // Proof of work used for validating
- Pow pow.PoW
-
- events event.Subscription
-
- eventMux *event.TypeMux
-}
-
-// GasPool tracks the amount of gas available during
-// execution of the transactions in a block.
-// The zero value is a pool with zero gas available.
-type GasPool big.Int
-
-// AddGas makes gas available for execution.
-func (gp *GasPool) AddGas(amount *big.Int) *GasPool {
- i := (*big.Int)(gp)
- i.Add(i, amount)
- return gp
-}
-
-// SubGas deducts the given amount from the pool if enough gas is
-// available and returns an error otherwise.
-func (gp *GasPool) SubGas(amount *big.Int) error {
- i := (*big.Int)(gp)
- if i.Cmp(amount) < 0 {
- return &GasLimitErr{Have: new(big.Int).Set(i), Want: amount}
- }
- i.Sub(i, amount)
- return nil
-}
-
-func (gp *GasPool) String() string {
- return (*big.Int)(gp).String()
-}
-
-func NewBlockProcessor(db ethdb.Database, pow pow.PoW, blockchain *BlockChain, eventMux *event.TypeMux) *BlockProcessor {
- sm := &BlockProcessor{
- chainDb: db,
- mem: make(map[string]*big.Int),
- Pow: pow,
- bc: blockchain,
- eventMux: eventMux,
- }
- return sm
-}
-
-func (sm *BlockProcessor) TransitionState(statedb *state.StateDB, parent, block *types.Block, transientProcess bool) (receipts types.Receipts, err error) {
- gp := new(GasPool).AddGas(block.GasLimit())
- if glog.V(logger.Core) {
- glog.Infof("%x: gas (+ %v)", block.Coinbase(), gp)
- }
-
- // Process the transactions on to parent state
- receipts, err = sm.ApplyTransactions(gp, statedb, block, block.Transactions(), transientProcess)
- if err != nil {
- return nil, err
- }
-
- return receipts, nil
-}
-
-func (self *BlockProcessor) ApplyTransaction(gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *big.Int, transientProcess bool) (*types.Receipt, *big.Int, error) {
- _, gas, err := ApplyMessage(NewEnv(statedb, self.bc, tx, header), tx, gp)
- if err != nil {
- return nil, nil, err
- }
-
- // Update the state with pending changes
- usedGas.Add(usedGas, gas)
- receipt := types.NewReceipt(statedb.IntermediateRoot().Bytes(), usedGas)
- receipt.TxHash = tx.Hash()
- receipt.GasUsed = new(big.Int).Set(gas)
- if MessageCreatesContract(tx) {
- from, _ := tx.From()
- receipt.ContractAddress = crypto.CreateAddress(from, tx.Nonce())
- }
-
- logs := statedb.GetLogs(tx.Hash())
- receipt.Logs = logs
- receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
-
- glog.V(logger.Debug).Infoln(receipt)
-
- // Notify all subscribers
- if !transientProcess {
- go self.eventMux.Post(TxPostEvent{tx})
- go self.eventMux.Post(logs)
- }
-
- return receipt, gas, err
-}
-func (self *BlockProcessor) BlockChain() *BlockChain {
- return self.bc
-}
-
-func (self *BlockProcessor) ApplyTransactions(gp *GasPool, statedb *state.StateDB, block *types.Block, txs types.Transactions, transientProcess bool) (types.Receipts, error) {
- var (
- receipts types.Receipts
- totalUsedGas = big.NewInt(0)
- err error
- cumulativeSum = new(big.Int)
- header = block.Header()
- )
-
- for i, tx := range txs {
- statedb.StartRecord(tx.Hash(), block.Hash(), i)
-
- receipt, txGas, err := self.ApplyTransaction(gp, statedb, header, tx, totalUsedGas, transientProcess)
- if err != nil {
- return nil, err
- }
-
- if err != nil {
- glog.V(logger.Core).Infoln("TX err:", err)
- }
- receipts = append(receipts, receipt)
-
- cumulativeSum.Add(cumulativeSum, new(big.Int).Mul(txGas, tx.GasPrice()))
- }
-
- if block.GasUsed().Cmp(totalUsedGas) != 0 {
- return nil, ValidationError(fmt.Sprintf("gas used error (%v / %v)", block.GasUsed(), totalUsedGas))
- }
-
- if transientProcess {
- go self.eventMux.Post(PendingBlockEvent{block, statedb.Logs()})
- }
-
- return receipts, err
-}
-
-func (sm *BlockProcessor) RetryProcess(block *types.Block) (logs vm.Logs, err error) {
- // Processing a blocks may never happen simultaneously
- sm.mutex.Lock()
- defer sm.mutex.Unlock()
-
- if !sm.bc.HasBlock(block.ParentHash()) {
- return nil, ParentError(block.ParentHash())
- }
- parent := sm.bc.GetBlock(block.ParentHash())
-
- // FIXME Change to full header validation. See #1225
- errch := make(chan bool)
- go func() { errch <- sm.Pow.Verify(block) }()
-
- logs, _, err = sm.processWithParent(block, parent)
- if !<-errch {
- return nil, ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
- }
-
- return logs, err
-}
-
-// Process block will attempt to process the given block's transactions and applies them
-// on top of the block's parent state (given it exists) and will return wether it was
-// successful or not.
-func (sm *BlockProcessor) Process(block *types.Block) (logs vm.Logs, receipts types.Receipts, err error) {
- // Processing a blocks may never happen simultaneously
- sm.mutex.Lock()
- defer sm.mutex.Unlock()
-
- if sm.bc.HasBlock(block.Hash()) {
- if _, err := state.New(block.Root(), sm.chainDb); err == nil {
- return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
- }
- }
- if parent := sm.bc.GetBlock(block.ParentHash()); parent != nil {
- if _, err := state.New(parent.Root(), sm.chainDb); err == nil {
- return sm.processWithParent(block, parent)
- }
- }
- return nil, nil, ParentError(block.ParentHash())
-}
-
-func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs vm.Logs, receipts types.Receipts, err error) {
- // Create a new state based on the parent's root (e.g., create copy)
- state, err := state.New(parent.Root(), sm.chainDb)
- if err != nil {
- return nil, nil, err
- }
- header := block.Header()
- uncles := block.Uncles()
- txs := block.Transactions()
-
- // Block validation
- if err = ValidateHeader(sm.Pow, header, parent.Header(), false, false); err != nil {
- return
- }
-
- // There can be at most two uncles
- if len(uncles) > 2 {
- return nil, nil, ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(uncles))
- }
-
- receipts, err = sm.TransitionState(state, parent, block, false)
- if err != nil {
- return
- }
-
- // Validate the received block's bloom with the one derived from the generated receipts.
- // For valid blocks this should always validate to true.
- rbloom := types.CreateBloom(receipts)
- if rbloom != header.Bloom {
- err = fmt.Errorf("unable to replicate block's bloom=%x", rbloom)
- return
- }
-
- // The transactions Trie's root (R = (Tr [[i, RLP(T1)], [i, RLP(T2)], ... [n, RLP(Tn)]]))
- // can be used by light clients to make sure they've received the correct Txs
- txSha := types.DeriveSha(txs)
- if txSha != header.TxHash {
- err = fmt.Errorf("invalid transaction root hash. received=%x calculated=%x", header.TxHash, txSha)
- return
- }
-
- // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, R1]]))
- receiptSha := types.DeriveSha(receipts)
- if receiptSha != header.ReceiptHash {
- err = fmt.Errorf("invalid receipt root hash. received=%x calculated=%x", header.ReceiptHash, receiptSha)
- return
- }
-
- // Verify UncleHash before running other uncle validations
- unclesSha := types.CalcUncleHash(uncles)
- if unclesSha != header.UncleHash {
- err = fmt.Errorf("invalid uncles root hash. received=%x calculated=%x", header.UncleHash, unclesSha)
- return
- }
-
- // Verify uncles
- if err = sm.VerifyUncles(state, block, parent); err != nil {
- return
- }
- // Accumulate static rewards; block reward, uncle's and uncle inclusion.
- AccumulateRewards(state, header, uncles)
-
- // Commit state objects/accounts to a database batch and calculate
- // the state root. The database is not modified if the root
- // doesn't match.
- root, batch := state.CommitBatch()
- if header.Root != root {
- return nil, nil, fmt.Errorf("invalid merkle root: header=%x computed=%x", header.Root, root)
- }
-
- // Execute the database writes.
- batch.Write()
-
- return state.Logs(), receipts, nil
-}
-
-var (
- big8 = big.NewInt(8)
- big32 = big.NewInt(32)
-)
-
-// AccumulateRewards credits the coinbase of the given block with the
-// mining reward. The total reward consists of the static block reward
-// and rewards for included uncles. The coinbase of each uncle block is
-// also rewarded.
-func AccumulateRewards(statedb *state.StateDB, header *types.Header, uncles []*types.Header) {
- reward := new(big.Int).Set(BlockReward)
- r := new(big.Int)
- for _, uncle := range uncles {
- r.Add(uncle.Number, big8)
- r.Sub(r, header.Number)
- r.Mul(r, BlockReward)
- r.Div(r, big8)
- statedb.AddBalance(uncle.Coinbase, r)
-
- r.Div(BlockReward, big32)
- reward.Add(reward, r)
- }
- statedb.AddBalance(header.Coinbase, reward)
-}
-
-func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *types.Block) error {
- uncles := set.New()
- ancestors := make(map[common.Hash]*types.Block)
- for _, ancestor := range sm.bc.GetBlocksFromHash(block.ParentHash(), 7) {
- ancestors[ancestor.Hash()] = ancestor
- // Include ancestors uncles in the uncle set. Uncles must be unique.
- for _, uncle := range ancestor.Uncles() {
- uncles.Add(uncle.Hash())
- }
- }
- ancestors[block.Hash()] = block
- uncles.Add(block.Hash())
-
- for i, uncle := range block.Uncles() {
- hash := uncle.Hash()
- if uncles.Has(hash) {
- // Error not unique
- return UncleError("uncle[%d](%x) not unique", i, hash[:4])
- }
- uncles.Add(hash)
-
- if ancestors[hash] != nil {
- branch := fmt.Sprintf(" O - %x\n |\n", block.Hash())
- for h := range ancestors {
- branch += fmt.Sprintf(" O - %x\n |\n", h)
- }
- glog.Infoln(branch)
- return UncleError("uncle[%d](%x) is ancestor", i, hash[:4])
- }
-
- if ancestors[uncle.ParentHash] == nil || uncle.ParentHash == parent.Hash() {
- return UncleError("uncle[%d](%x)'s parent is not ancestor (%x)", i, hash[:4], uncle.ParentHash[0:4])
- }
-
- if err := ValidateHeader(sm.Pow, uncle, ancestors[uncle.ParentHash].Header(), true, true); err != nil {
- return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, hash[:4], err))
- }
- }
-
- return nil
-}
-
-// GetBlockReceipts returns the receipts beloniging to the block hash
-func (sm *BlockProcessor) GetBlockReceipts(bhash common.Hash) types.Receipts {
- if block := sm.BlockChain().GetBlock(bhash); block != nil {
- return GetBlockReceipts(sm.chainDb, block.Hash())
- }
-
- return nil
-}
-
-// GetLogs returns the logs of the given block. This method is using a two step approach
-// where it tries to get it from the (updated) method which gets them from the receipts or
-// the depricated way by re-processing the block.
-func (sm *BlockProcessor) GetLogs(block *types.Block) (logs vm.Logs, err error) {
- receipts := GetBlockReceipts(sm.chainDb, block.Hash())
- // coalesce logs
- for _, receipt := range receipts {
- logs = append(logs, receipt.Logs...)
- }
- return logs, nil
-}
-
-// ValidateHeader verifies the validity of a header, relying on the database and
-// POW behind the block processor.
-func (sm *BlockProcessor) ValidateHeader(header *types.Header, checkPow, uncle bool) error {
- // Short circuit if the header's already known or its parent missing
- if sm.bc.HasHeader(header.Hash()) {
- return nil
- }
- if parent := sm.bc.GetHeader(header.ParentHash); parent == nil {
- return ParentError(header.ParentHash)
- } else {
- return ValidateHeader(sm.Pow, header, parent, checkPow, uncle)
- }
-}
-
-// ValidateHeaderWithParent verifies the validity of a header, relying on the database and
-// POW behind the block processor.
-func (sm *BlockProcessor) ValidateHeaderWithParent(header, parent *types.Header, checkPow, uncle bool) error {
- if sm.bc.HasHeader(header.Hash()) {
- return nil
- }
- return ValidateHeader(sm.Pow, header, parent, checkPow, uncle)
-}
-
-// See YP section 4.3.4. "Block Header Validity"
-// Validates a header. Returns an error if the header is invalid.
-func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, checkPow, uncle bool) error {
- if big.NewInt(int64(len(header.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
- return fmt.Errorf("Header extra data too long (%d)", len(header.Extra))
- }
- if uncle {
- if header.Time.Cmp(common.MaxBig) == 1 {
- return BlockTSTooBigErr
- }
- } else {
- if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 {
- return BlockFutureErr
- }
- }
- if header.Time.Cmp(parent.Time) != 1 {
- return BlockEqualTSErr
- }
-
- expd := CalcDifficulty(header.Time.Uint64(), parent.Time.Uint64(), parent.Number, parent.Difficulty)
- if expd.Cmp(header.Difficulty) != 0 {
- return fmt.Errorf("Difficulty check failed for header %v, %v", header.Difficulty, expd)
- }
-
- a := new(big.Int).Set(parent.GasLimit)
- a = a.Sub(a, header.GasLimit)
- a.Abs(a)
- b := new(big.Int).Set(parent.GasLimit)
- b = b.Div(b, params.GasLimitBoundDivisor)
- if !(a.Cmp(b) < 0) || (header.GasLimit.Cmp(params.MinGasLimit) == -1) {
- return fmt.Errorf("GasLimit check failed for header %v (%v > %v)", header.GasLimit, a, b)
- }
-
- num := new(big.Int).Set(parent.Number)
- num.Sub(header.Number, num)
- if num.Cmp(big.NewInt(1)) != 0 {
- return BlockNumberErr
- }
-
- if checkPow {
- // Verify the nonce of the header. Return an error if it's not valid
- if !pow.Verify(types.NewBlockWithHeader(header)) {
- return &BlockNonceErr{Hash: header.Hash(), Number: header.Number, Nonce: header.Nonce.Uint64()}
- }
- }
- return nil
-}
diff --git a/core/block_validator.go b/core/block_validator.go
new file mode 100644
index 000000000..62d096d02
--- /dev/null
+++ b/core/block_validator.go
@@ -0,0 +1,243 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/pow"
+ "gopkg.in/fatih/set.v0"
+)
+
+// BlockValidator is responsible for validating block headers, uncles and
+// processed state.
+//
+// BlockValidator implements Validator.
+type BlockValidator struct {
+ bc *BlockChain // Canonical block chain
+ Pow pow.PoW // Proof of work used for validating
+}
+
+// NewBlockValidator returns a new block validator which is safe for re-use
+func NewBlockValidator(blockchain *BlockChain, pow pow.PoW) *BlockValidator {
+ validator := &BlockValidator{
+ Pow: pow,
+ bc: blockchain,
+ }
+ return validator
+}
+
+// ValidateBlock validates the given block's header and uncles and verifies the
+// the block header's transaction and uncle roots.
+//
+// ValidateBlock does not validate the header's pow. The pow work validated
+// seperately so we can process them in paralel.
+//
+// ValidateBlock also validates and makes sure that any previous state (or present)
+// state that might or might not be present is checked to make sure that fast
+// sync has done it's job proper. This prevents the block validator form accepting
+// false positives where a header is present but the state is not.
+func (v *BlockValidator) ValidateBlock(block *types.Block) error {
+ if v.bc.HasBlock(block.Hash()) {
+ if _, err := state.New(block.Root(), v.bc.chainDb); err == nil {
+ return &KnownBlockError{block.Number(), block.Hash()}
+ }
+ }
+ parent := v.bc.GetBlock(block.ParentHash())
+ if parent == nil {
+ return ParentError(block.ParentHash())
+ }
+ if _, err := state.New(parent.Root(), v.bc.chainDb); err != nil {
+ return ParentError(block.ParentHash())
+ }
+
+ header := block.Header()
+ // validate the block header
+ if err := ValidateHeader(v.Pow, header, parent.Header(), false, false); err != nil {
+ return err
+ }
+ // verify the uncles are correctly rewarded
+ if err := v.VerifyUncles(block, parent); err != nil {
+ return err
+ }
+
+ // Verify UncleHash before running other uncle validations
+ unclesSha := types.CalcUncleHash(block.Uncles())
+ if unclesSha != header.UncleHash {
+ return fmt.Errorf("invalid uncles root hash. received=%x calculated=%x", header.UncleHash, unclesSha)
+ }
+
+ // The transactions Trie's root (R = (Tr [[i, RLP(T1)], [i, RLP(T2)], ... [n, RLP(Tn)]]))
+ // can be used by light clients to make sure they've received the correct Txs
+ txSha := types.DeriveSha(block.Transactions())
+ if txSha != header.TxHash {
+ return fmt.Errorf("invalid transaction root hash. received=%x calculated=%x", header.TxHash, txSha)
+ }
+
+ return nil
+}
+
+// ValidateState validates the various changes that happen after a state
+// transition, such as amount of used gas, the receipt roots and the state root
+// itself. ValidateState returns a database batch if the validation was a succes
+// otherwise nil and an error is returned.
+func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas *big.Int) (err error) {
+ header := block.Header()
+ if block.GasUsed().Cmp(usedGas) != 0 {
+ return ValidationError(fmt.Sprintf("gas used error (%v / %v)", block.GasUsed(), usedGas))
+ }
+ // Validate the received block's bloom with the one derived from the generated receipts.
+ // For valid blocks this should always validate to true.
+ rbloom := types.CreateBloom(receipts)
+ if rbloom != header.Bloom {
+ return fmt.Errorf("unable to replicate block's bloom=%x", rbloom)
+ }
+ // Tre receipt Trie's root (R = (Tr [[H1, R1], ... [Hn, R1]]))
+ receiptSha := types.DeriveSha(receipts)
+ if receiptSha != header.ReceiptHash {
+ return fmt.Errorf("invalid receipt root hash. received=%x calculated=%x", header.ReceiptHash, receiptSha)
+ }
+ // Validate the state root against the received state root and throw
+ // an error if they don't match.
+ if root := statedb.IntermediateRoot(); header.Root != root {
+ return fmt.Errorf("invalid merkle root: header=%x computed=%x", header.Root, root)
+ }
+ return nil
+}
+
+// VerifyUncles verifies the given block's uncles and applies the Ethereum
+// consensus rules to the various block headers included; it will return an
+// error if any of the included uncle headers were invalid. It returns an error
+// if the validation failed.
+func (v *BlockValidator) VerifyUncles(block, parent *types.Block) error {
+ // validate that there at most 2 uncles included in this block
+ if len(block.Uncles()) > 2 {
+ return ValidationError("Block can only contain maximum 2 uncles (contained %v)", len(block.Uncles()))
+ }
+
+ uncles := set.New()
+ ancestors := make(map[common.Hash]*types.Block)
+ for _, ancestor := range v.bc.GetBlocksFromHash(block.ParentHash(), 7) {
+ ancestors[ancestor.Hash()] = ancestor
+ // Include ancestors uncles in the uncle set. Uncles must be unique.
+ for _, uncle := range ancestor.Uncles() {
+ uncles.Add(uncle.Hash())
+ }
+ }
+ ancestors[block.Hash()] = block
+ uncles.Add(block.Hash())
+
+ for i, uncle := range block.Uncles() {
+ hash := uncle.Hash()
+ if uncles.Has(hash) {
+ // Error not unique
+ return UncleError("uncle[%d](%x) not unique", i, hash[:4])
+ }
+ uncles.Add(hash)
+
+ if ancestors[hash] != nil {
+ branch := fmt.Sprintf(" O - %x\n |\n", block.Hash())
+ for h := range ancestors {
+ branch += fmt.Sprintf(" O - %x\n |\n", h)
+ }
+ glog.Infoln(branch)
+ return UncleError("uncle[%d](%x) is ancestor", i, hash[:4])
+ }
+
+ if ancestors[uncle.ParentHash] == nil || uncle.ParentHash == parent.Hash() {
+ return UncleError("uncle[%d](%x)'s parent is not ancestor (%x)", i, hash[:4], uncle.ParentHash[0:4])
+ }
+
+ if err := ValidateHeader(v.Pow, uncle, ancestors[uncle.ParentHash].Header(), true, true); err != nil {
+ return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, hash[:4], err))
+ }
+ }
+
+ return nil
+}
+
+// ValidateHeader validates the given header and, depending on the pow arg,
+// checks the proof of work of the given header. Returns an error if the
+// validation failed.
+func (v *BlockValidator) ValidateHeader(header, parent *types.Header, checkPow bool) error {
+ // Short circuit if the parent is missing.
+ if parent == nil {
+ return ParentError(header.ParentHash)
+ }
+ // Short circuit if the header's already known or its parent missing
+ if v.bc.HasHeader(header.Hash()) {
+ return nil
+ }
+ return ValidateHeader(v.Pow, header, parent, checkPow, false)
+}
+
+// Validates a header. Returns an error if the header is invalid.
+//
+// See YP section 4.3.4. "Block Header Validity"
+func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, checkPow, uncle bool) error {
+ if big.NewInt(int64(len(header.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
+ return fmt.Errorf("Header extra data too long (%d)", len(header.Extra))
+ }
+
+ if uncle {
+ if header.Time.Cmp(common.MaxBig) == 1 {
+ return BlockTSTooBigErr
+ }
+ } else {
+ if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 {
+ return BlockFutureErr
+ }
+ }
+ if header.Time.Cmp(parent.Time) != 1 {
+ return BlockEqualTSErr
+ }
+
+ expd := CalcDifficulty(header.Time.Uint64(), parent.Time.Uint64(), parent.Number, parent.Difficulty)
+ if expd.Cmp(header.Difficulty) != 0 {
+ return fmt.Errorf("Difficulty check failed for header %v, %v", header.Difficulty, expd)
+ }
+
+ a := new(big.Int).Set(parent.GasLimit)
+ a = a.Sub(a, header.GasLimit)
+ a.Abs(a)
+ b := new(big.Int).Set(parent.GasLimit)
+ b = b.Div(b, params.GasLimitBoundDivisor)
+ if !(a.Cmp(b) < 0) || (header.GasLimit.Cmp(params.MinGasLimit) == -1) {
+ return fmt.Errorf("GasLimit check failed for header %v (%v > %v)", header.GasLimit, a, b)
+ }
+
+ num := new(big.Int).Set(parent.Number)
+ num.Sub(header.Number, num)
+ if num.Cmp(big.NewInt(1)) != 0 {
+ return BlockNumberErr
+ }
+
+ if checkPow {
+ // Verify the nonce of the header. Return an error if it's not valid
+ if !pow.Verify(types.NewBlockWithHeader(header)) {
+ return &BlockNonceErr{header.Number, header.Hash(), header.Nonce.Uint64()}
+ }
+ }
+ return nil
+}
diff --git a/core/block_processor_test.go b/core/block_validator_test.go
index 3050456b4..70953d76d 100644
--- a/core/block_processor_test.go
+++ b/core/block_validator_test.go
@@ -30,7 +30,7 @@ import (
"github.com/ethereum/go-ethereum/pow/ezp"
)
-func proc() (*BlockProcessor, *BlockChain) {
+func proc() (Validator, *BlockChain) {
db, _ := ethdb.NewMemDatabase()
var mux event.TypeMux
@@ -39,7 +39,7 @@ func proc() (*BlockProcessor, *BlockChain) {
if err != nil {
fmt.Println(err)
}
- return NewBlockProcessor(db, ezp.New(), blockchain, &mux), blockchain
+ return blockchain.validator, blockchain
}
func TestNumber(t *testing.T) {
@@ -81,7 +81,7 @@ func TestPutReceipt(t *testing.T) {
Index: 0,
}}
- PutReceipts(db, types.Receipts{receipt})
+ WriteReceipts(db, types.Receipts{receipt})
receipt = GetReceipt(db, common.Hash{})
if receipt == nil {
t.Error("expected to get 1 receipt, got none.")
diff --git a/core/blockchain.go b/core/blockchain.go
index cea346e38..5e1fc9424 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -61,17 +62,34 @@ const (
blockCacheLimit = 256
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
+ // must be bumped when consensus algorithm is changed, this forces the upgradedb
+ // command to be run (forces the blocks to be imported again using the new algorithm)
+ BlockChainVersion = 3
)
+// BlockChain represents the canonical chain given a database with a genesis
+// block. The Blockchain manages chain imports, reverts, chain reorganisations.
+//
+// Importing blocks in to the block chain happens according to the set of rules
+// defined by the two stage Validator. Processing of blocks is done using the
+// Processor which processes the included transaction. The validation of the state
+// is done in the second part of the Validator. Failing results in aborting of
+// the import.
+//
+// The BlockChain also helps in returning blocks from **any** chain included
+// in the database as well as blocks that represents the canonical chain. It's
+// important to note that GetBlock can return any block and does not need to be
+// included in the canonical one where as GetBlockByNumber always represents the
+// canonical chain.
type BlockChain struct {
chainDb ethdb.Database
- processor types.BlockProcessor
eventMux *event.TypeMux
genesisBlock *types.Block
// Last known total difficulty
mu sync.RWMutex
chainmu sync.RWMutex
tsmu sync.RWMutex
+ procmu sync.RWMutex
checkpoint int // checkpoint counts towards the new checkpoint
currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
@@ -91,10 +109,15 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup
- pow pow.PoW
- rand *mrand.Rand
+ pow pow.PoW
+ rand *mrand.Rand
+ processor Processor
+ validator Validator
}
+// NewBlockChain returns a fully initialised block chain using information
+// available in the database. It initialiser the default Ethereum Validator and
+// Processor.
func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
headerCache, _ := lru.New(headerCacheLimit)
bodyCache, _ := lru.New(bodyCacheLimit)
@@ -121,6 +144,8 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
return nil, err
}
bc.rand = mrand.New(mrand.NewSource(seed.Int64()))
+ bc.SetValidator(NewBlockValidator(bc, pow))
+ bc.SetProcessor(NewStateProcessor(bc))
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
@@ -292,6 +317,7 @@ func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
return nil
}
+// GasLimit returns the gas limit of the current HEAD block.
func (self *BlockChain) GasLimit() *big.Int {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -299,6 +325,7 @@ func (self *BlockChain) GasLimit() *big.Int {
return self.currentBlock.GasLimit()
}
+// LastBlockHash return the hash of the HEAD block.
func (self *BlockChain) LastBlockHash() common.Hash {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -333,6 +360,8 @@ func (self *BlockChain) CurrentFastBlock() *types.Block {
return self.currentFastBlock
}
+// Status returns status information about the current chain such as the HEAD Td,
+// the HEAD hash and the hash of the genesis block.
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -340,10 +369,38 @@ func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesis
return self.GetTd(self.currentBlock.Hash()), self.currentBlock.Hash(), self.genesisBlock.Hash()
}
-func (self *BlockChain) SetProcessor(proc types.BlockProcessor) {
- self.processor = proc
+// SetProcessor sets the processor required for making state modifications.
+func (self *BlockChain) SetProcessor(processor Processor) {
+ self.procmu.Lock()
+ defer self.procmu.Unlock()
+ self.processor = processor
+}
+
+// SetValidator sets the validator which is used to validate incoming blocks.
+func (self *BlockChain) SetValidator(validator Validator) {
+ self.procmu.Lock()
+ defer self.procmu.Unlock()
+ self.validator = validator
}
+// Validator returns the current validator.
+func (self *BlockChain) Validator() Validator {
+ self.procmu.RLock()
+ defer self.procmu.RUnlock()
+ return self.validator
+}
+
+// Processor returns the current processor.
+func (self *BlockChain) Processor() Processor {
+ self.procmu.RLock()
+ defer self.procmu.RUnlock()
+ return self.processor
+}
+
+// AuxValidator returns the auxiliary validator (Proof of work atm)
+func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }
+
+// State returns a new mutable state based on the current HEAD block.
func (self *BlockChain) State() (*state.StateDB, error) {
return state.New(self.CurrentBlock().Root(), self.chainDb)
}
@@ -606,6 +663,8 @@ func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*type
return uncles
}
+// Stop stops the blockchain service. If any imports are currently in progress
+// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
@@ -758,9 +817,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
var err error
if index == 0 {
- err = self.processor.ValidateHeader(header, checkPow, false)
+ err = self.Validator().ValidateHeader(header, self.GetHeader(header.ParentHash), checkPow)
} else {
- err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false)
+ err = self.Validator().ValidateHeader(header, chain[index-1], checkPow)
}
if err != nil {
errs[index] = err
@@ -913,7 +972,7 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
glog.Fatal(errs[index])
return
}
- if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ if err := WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
atomic.AddInt32(&failed, 1)
glog.Fatal(errs[index])
@@ -1025,9 +1084,10 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// faster than direct delivery and requires much less mutex
// acquiring.
var (
- stats struct{ queued, processed, ignored int }
- events = make([]interface{}, 0, len(chain))
- tstart = time.Now()
+ stats struct{ queued, processed, ignored int }
+ events = make([]interface{}, 0, len(chain))
+ coalescedLogs vm.Logs
+ tstart = time.Now()
nonceChecked = make([]bool, len(chain))
)
@@ -1057,12 +1117,12 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if BadHashes[block.Hash()] {
err := BadHashError(block.Hash())
- blockErr(block, err)
+ reportBlock(block, err)
return i, err
}
- // Call in to the block processor and check for errors. It's likely that if one block fails
- // all others will fail too (unless a known block is returned).
- logs, receipts, err := self.processor.Process(block)
+ // Stage 1 validation of the block using the chain's validator
+ // interface.
+ err := self.Validator().ValidateBlock(block)
if err != nil {
if IsKnownBlockErr(err) {
stats.ignored++
@@ -1089,14 +1149,41 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
continue
}
- blockErr(block, err)
+ reportBlock(block, err)
- go ReportBlock(block, err)
+ return i, err
+ }
+ // Create a new statedb using the parent block and report an
+ // error if it fails.
+ statedb, err := state.New(self.GetBlock(block.ParentHash()).Root(), self.chainDb)
+ if err != nil {
+ reportBlock(block, err)
return i, err
}
- if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
- glog.V(logger.Warn).Infoln("error writing block receipts:", err)
+ // Process block using the parent state as reference point.
+ receipts, logs, usedGas, err := self.processor.Process(block, statedb)
+ if err != nil {
+ reportBlock(block, err)
+ return i, err
+ }
+ // Validate the state using the default validator
+ err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash()), statedb, receipts, usedGas)
+ if err != nil {
+ reportBlock(block, err)
+ return i, err
+ }
+ // Write state changes to database
+ _, err = statedb.Commit()
+ if err != nil {
+ return i, err
+ }
+
+ // coalesce logs for later processing
+ coalescedLogs = append(coalescedLogs, logs...)
+
+ if err := WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ return i, err
}
txcount += len(block.Transactions())
@@ -1105,6 +1192,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if err != nil {
return i, err
}
+
switch status {
case CanonStatTy:
if glog.V(logger.Debug) {
@@ -1113,11 +1201,11 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
events = append(events, ChainEvent{block, block.Hash(), logs})
// This puts transactions in a extra db for rpc
- if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
+ if err := WriteTransactions(self.chainDb, block); err != nil {
return i, err
}
// store the receipts
- if err := PutReceipts(self.chainDb, receipts); err != nil {
+ if err := WriteReceipts(self.chainDb, receipts); err != nil {
return i, err
}
// Write map map bloom filters
@@ -1141,7 +1229,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
start, end := chain[0], chain[len(chain)-1]
glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
}
- go self.postChainEvents(events)
+ go self.postChainEvents(events, coalescedLogs)
return 0, nil
}
@@ -1206,12 +1294,12 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// insert the block in the canonical way, re-writing history
self.insert(block)
// write canonical receipts and transactions
- if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
+ if err := WriteTransactions(self.chainDb, block); err != nil {
return err
}
receipts := GetBlockReceipts(self.chainDb, block.Hash())
// write receipts
- if err := PutReceipts(self.chainDb, receipts); err != nil {
+ if err := WriteReceipts(self.chainDb, receipts); err != nil {
return err
}
// Write map map bloom filters
@@ -1239,7 +1327,9 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
-func (self *BlockChain) postChainEvents(events []interface{}) {
+func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) {
+ // post event logs for further processing
+ self.eventMux.Post(logs)
for _, event := range events {
if event, ok := event.(ChainEvent); ok {
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@@ -1265,9 +1355,13 @@ func (self *BlockChain) update() {
}
}
-func blockErr(block *types.Block, err error) {
+// reportBlock reports the given block and error using the canonical block
+// reporting tool. Reporting the block to the service is handled in a separate
+// goroutine.
+func reportBlock(block *types.Block, err error) {
if glog.V(logger.Error) {
glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex())
glog.Errorf(" %v", err)
}
+ go ReportBlock(block, err)
}
diff --git a/core/blockchain_test.go b/core/blockchain_test.go
index 8ddc5032b..f18b5d084 100644
--- a/core/blockchain_test.go
+++ b/core/blockchain_test.go
@@ -28,6 +28,7 @@ import (
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
@@ -53,31 +54,29 @@ func theBlockChain(db ethdb.Database, t *testing.T) *BlockChain {
WriteTestNetGenesisBlock(db, 0)
blockchain, err := NewBlockChain(db, thePow(), &eventMux)
if err != nil {
- t.Error("failed creating chainmanager:", err)
+ t.Error("failed creating blockchain:", err)
t.FailNow()
return nil
}
- blockMan := NewBlockProcessor(db, nil, blockchain, &eventMux)
- blockchain.SetProcessor(blockMan)
return blockchain
}
// Test fork of length N starting from block i
-func testFork(t *testing.T, processor *BlockProcessor, i, n int, full bool, comparator func(td1, td2 *big.Int)) {
+func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, comparator func(td1, td2 *big.Int)) {
// Copy old chain up to #i into a new db
- db, processor2, err := newCanonical(i, full)
+ db, blockchain2, err := newCanonical(i, full)
if err != nil {
t.Fatal("could not make new canonical in testFork", err)
}
// Assert the chains have the same header/block at #i
var hash1, hash2 common.Hash
if full {
- hash1 = processor.bc.GetBlockByNumber(uint64(i)).Hash()
- hash2 = processor2.bc.GetBlockByNumber(uint64(i)).Hash()
+ hash1 = blockchain.GetBlockByNumber(uint64(i)).Hash()
+ hash2 = blockchain2.GetBlockByNumber(uint64(i)).Hash()
} else {
- hash1 = processor.bc.GetHeaderByNumber(uint64(i)).Hash()
- hash2 = processor2.bc.GetHeaderByNumber(uint64(i)).Hash()
+ hash1 = blockchain.GetHeaderByNumber(uint64(i)).Hash()
+ hash2 = blockchain2.GetHeaderByNumber(uint64(i)).Hash()
}
if hash1 != hash2 {
t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", i, hash2, hash1)
@@ -88,13 +87,13 @@ func testFork(t *testing.T, processor *BlockProcessor, i, n int, full bool, comp
headerChainB []*types.Header
)
if full {
- blockChainB = makeBlockChain(processor2.bc.CurrentBlock(), n, db, forkSeed)
- if _, err := processor2.bc.InsertChain(blockChainB); err != nil {
+ blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, db, forkSeed)
+ if _, err := blockchain2.InsertChain(blockChainB); err != nil {
t.Fatalf("failed to insert forking chain: %v", err)
}
} else {
- headerChainB = makeHeaderChain(processor2.bc.CurrentHeader(), n, db, forkSeed)
- if _, err := processor2.bc.InsertHeaderChain(headerChainB, 1); err != nil {
+ headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, db, forkSeed)
+ if _, err := blockchain2.InsertHeaderChain(headerChainB, 1); err != nil {
t.Fatalf("failed to insert forking chain: %v", err)
}
}
@@ -102,17 +101,17 @@ func testFork(t *testing.T, processor *BlockProcessor, i, n int, full bool, comp
var tdPre, tdPost *big.Int
if full {
- tdPre = processor.bc.GetTd(processor.bc.CurrentBlock().Hash())
- if err := testBlockChainImport(blockChainB, processor); err != nil {
+ tdPre = blockchain.GetTd(blockchain.CurrentBlock().Hash())
+ if err := testBlockChainImport(blockChainB, blockchain); err != nil {
t.Fatalf("failed to import forked block chain: %v", err)
}
- tdPost = processor.bc.GetTd(blockChainB[len(blockChainB)-1].Hash())
+ tdPost = blockchain.GetTd(blockChainB[len(blockChainB)-1].Hash())
} else {
- tdPre = processor.bc.GetTd(processor.bc.CurrentHeader().Hash())
- if err := testHeaderChainImport(headerChainB, processor); err != nil {
+ tdPre = blockchain.GetTd(blockchain.CurrentHeader().Hash())
+ if err := testHeaderChainImport(headerChainB, blockchain); err != nil {
t.Fatalf("failed to import forked header chain: %v", err)
}
- tdPost = processor.bc.GetTd(headerChainB[len(headerChainB)-1].Hash())
+ tdPost = blockchain.GetTd(headerChainB[len(headerChainB)-1].Hash())
}
// Compare the total difficulties of the chains
comparator(tdPre, tdPost)
@@ -127,37 +126,52 @@ func printChain(bc *BlockChain) {
// testBlockChainImport tries to process a chain of blocks, writing them into
// the database if successful.
-func testBlockChainImport(chain []*types.Block, processor *BlockProcessor) error {
+func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
for _, block := range chain {
// Try and process the block
- if _, _, err := processor.Process(block); err != nil {
+ err := blockchain.Validator().ValidateBlock(block)
+ if err != nil {
if IsKnownBlockErr(err) {
continue
}
return err
}
- // Manually insert the block into the database, but don't reorganize (allows subsequent testing)
- processor.bc.mu.Lock()
- WriteTd(processor.chainDb, block.Hash(), new(big.Int).Add(block.Difficulty(), processor.bc.GetTd(block.ParentHash())))
- WriteBlock(processor.chainDb, block)
- processor.bc.mu.Unlock()
+ statedb, err := state.New(blockchain.GetBlock(block.ParentHash()).Root(), blockchain.chainDb)
+ if err != nil {
+ return err
+ }
+ receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb)
+ if err != nil {
+ reportBlock(block, err)
+ return err
+ }
+ err = blockchain.Validator().ValidateState(block, blockchain.GetBlock(block.ParentHash()), statedb, receipts, usedGas)
+ if err != nil {
+ reportBlock(block, err)
+ return err
+ }
+ blockchain.mu.Lock()
+ WriteTd(blockchain.chainDb, block.Hash(), new(big.Int).Add(block.Difficulty(), blockchain.GetTd(block.ParentHash())))
+ WriteBlock(blockchain.chainDb, block)
+ statedb.Commit()
+ blockchain.mu.Unlock()
}
return nil
}
// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
-func testHeaderChainImport(chain []*types.Header, processor *BlockProcessor) error {
+func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {
for _, header := range chain {
// Try and validate the header
- if err := processor.ValidateHeader(header, false, false); err != nil {
+ if err := blockchain.Validator().ValidateHeader(header, blockchain.GetHeader(header.ParentHash), false); err != nil {
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
- processor.bc.mu.Lock()
- WriteTd(processor.chainDb, header.Hash(), new(big.Int).Add(header.Difficulty, processor.bc.GetTd(header.ParentHash)))
- WriteHeader(processor.chainDb, header)
- processor.bc.mu.Unlock()
+ blockchain.mu.Lock()
+ WriteTd(blockchain.chainDb, header.Hash(), new(big.Int).Add(header.Difficulty, blockchain.GetTd(header.ParentHash)))
+ WriteHeader(blockchain.chainDb, header)
+ blockchain.mu.Unlock()
}
return nil
}
@@ -313,19 +327,19 @@ func TestBrokenBlockChain(t *testing.T) { testBrokenChain(t, true) }
func testBrokenChain(t *testing.T, full bool) {
// Make chain starting from genesis
- db, processor, err := newCanonical(10, full)
+ db, blockchain, err := newCanonical(10, full)
if err != nil {
t.Fatalf("failed to make new canonical chain: %v", err)
}
// Create a forked chain, and try to insert with a missing link
if full {
- chain := makeBlockChain(processor.bc.CurrentBlock(), 5, db, forkSeed)[1:]
- if err := testBlockChainImport(chain, processor); err == nil {
+ chain := makeBlockChain(blockchain.CurrentBlock(), 5, db, forkSeed)[1:]
+ if err := testBlockChainImport(chain, blockchain); err == nil {
t.Errorf("broken block chain not reported")
}
} else {
- chain := makeHeaderChain(processor.bc.CurrentHeader(), 5, db, forkSeed)[1:]
- if err := testHeaderChainImport(chain, processor); err == nil {
+ chain := makeHeaderChain(blockchain.CurrentHeader(), 5, db, forkSeed)[1:]
+ if err := testHeaderChainImport(chain, blockchain); err == nil {
t.Errorf("broken header chain not reported")
}
}
@@ -415,9 +429,14 @@ func TestChainMultipleInsertions(t *testing.T) {
type bproc struct{}
-func (bproc) Process(*types.Block) (vm.Logs, types.Receipts, error) { return nil, nil, nil }
-func (bproc) ValidateHeader(*types.Header, bool, bool) error { return nil }
-func (bproc) ValidateHeaderWithParent(*types.Header, *types.Header, bool, bool) error { return nil }
+func (bproc) ValidateBlock(*types.Block) error { return nil }
+func (bproc) ValidateHeader(*types.Header, *types.Header, bool) error { return nil }
+func (bproc) ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas *big.Int) error {
+ return nil
+}
+func (bproc) Process(block *types.Block, statedb *state.StateDB) (types.Receipts, vm.Logs, *big.Int, error) {
+ return nil, nil, nil, nil
+}
func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Header {
blocks := makeBlockChainWithDiff(genesis, d, seed)
@@ -459,7 +478,8 @@ func chm(genesis *types.Block, db ethdb.Database) *BlockChain {
bc.tdCache, _ = lru.New(100)
bc.blockCache, _ = lru.New(100)
bc.futureBlocks, _ = lru.New(100)
- bc.processor = bproc{}
+ bc.SetValidator(bproc{})
+ bc.SetProcessor(bproc{})
bc.ResetWithGenesisBlock(genesis)
return bc
@@ -612,12 +632,10 @@ func TestBlocksInsertNonceError(t *testing.T) { testInsertNonceError(t, true) }
func testInsertNonceError(t *testing.T, full bool) {
for i := 1; i < 25 && !t.Failed(); i++ {
// Create a pristine chain and database
- db, processor, err := newCanonical(0, full)
+ db, blockchain, err := newCanonical(0, full)
if err != nil {
t.Fatalf("failed to create pristine chain: %v", err)
}
- bc := processor.bc
-
// Create and insert a chain with a failing nonce
var (
failAt int
@@ -626,34 +644,33 @@ func testInsertNonceError(t *testing.T, full bool) {
failHash common.Hash
)
if full {
- blocks := makeBlockChain(processor.bc.CurrentBlock(), i, db, 0)
+ blocks := makeBlockChain(blockchain.CurrentBlock(), i, db, 0)
failAt = rand.Int() % len(blocks)
failNum = blocks[failAt].NumberU64()
failHash = blocks[failAt].Hash()
- processor.bc.pow = failPow{failNum}
- processor.Pow = failPow{failNum}
+ blockchain.pow = failPow{failNum}
- failRes, err = processor.bc.InsertChain(blocks)
+ failRes, err = blockchain.InsertChain(blocks)
} else {
- headers := makeHeaderChain(processor.bc.CurrentHeader(), i, db, 0)
+ headers := makeHeaderChain(blockchain.CurrentHeader(), i, db, 0)
failAt = rand.Int() % len(headers)
failNum = headers[failAt].Number.Uint64()
failHash = headers[failAt].Hash()
- processor.bc.pow = failPow{failNum}
- processor.Pow = failPow{failNum}
+ blockchain.pow = failPow{failNum}
+ blockchain.validator = NewBlockValidator(blockchain, failPow{failNum})
- failRes, err = processor.bc.InsertHeaderChain(headers, 1)
+ failRes, err = blockchain.InsertHeaderChain(headers, 1)
}
// Check that the returned error indicates the nonce failure.
if failRes != failAt {
t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt)
}
if !IsBlockNonceErr(err) {
- t.Fatalf("test %d: error mismatch: have %v, want nonce error", i, err)
+ t.Fatalf("test %d: error mismatch: have %v, want nonce error %T", i, err, err)
}
nerr := err.(*BlockNonceErr)
if nerr.Number.Uint64() != failNum {
@@ -665,11 +682,11 @@ func testInsertNonceError(t *testing.T, full bool) {
// Check that all no blocks after the failing block have been inserted.
for j := 0; j < i-failAt; j++ {
if full {
- if block := bc.GetBlockByNumber(failNum + uint64(j)); block != nil {
+ if block := blockchain.GetBlockByNumber(failNum + uint64(j)); block != nil {
t.Errorf("test %d: invalid block in chain: %v", i, block)
}
} else {
- if header := bc.GetHeaderByNumber(failNum + uint64(j)); header != nil {
+ if header := blockchain.GetHeaderByNumber(failNum + uint64(j)); header != nil {
t.Errorf("test %d: invalid header in chain: %v", i, header)
}
}
@@ -711,7 +728,6 @@ func TestFastVsFullChains(t *testing.T) {
WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
- archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
@@ -720,7 +736,6 @@ func TestFastVsFullChains(t *testing.T) {
fastDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
- fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux)))
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@@ -797,7 +812,6 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
- archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
@@ -810,7 +824,6 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
fastDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
- fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux)))
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@@ -830,7 +843,6 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
lightDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(lightDb, GenesisAccount{address, funds})
light, _ := NewBlockChain(lightDb, FakePow{}, new(event.TypeMux))
- light.SetProcessor(NewBlockProcessor(lightDb, FakePow{}, light, new(event.TypeMux)))
if n, err := light.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
@@ -895,9 +907,8 @@ func TestChainTxReorgs(t *testing.T) {
})
// Import the chain. This runs all block validation rules.
evmux := &event.TypeMux{}
- chainman, _ := NewBlockChain(db, FakePow{}, evmux)
- chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux))
- if i, err := chainman.InsertChain(chain); err != nil {
+ blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
+ if i, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert original chain[%d]: %v", i, err)
}
@@ -920,14 +931,14 @@ func TestChainTxReorgs(t *testing.T) {
gen.AddTx(futureAdd) // This transaction will be added after a full reorg
}
})
- if _, err := chainman.InsertChain(chain); err != nil {
+ if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert forked chain: %v", err)
}
// removed tx
for i, tx := range (types.Transactions{pastDrop, freshDrop}) {
- if GetTransaction(db, tx.Hash()) != nil {
- t.Errorf("drop %d: tx found while shouldn't have been", i)
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Errorf("drop %d: tx %v found while shouldn't have been", i, txn)
}
if GetReceipt(db, tx.Hash()) != nil {
t.Errorf("drop %d: receipt found while shouldn't have been", i)
@@ -935,7 +946,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// added tx
for i, tx := range (types.Transactions{pastAdd, freshAdd, futureAdd}) {
- if GetTransaction(db, tx.Hash()) == nil {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil {
t.Errorf("add %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
@@ -944,7 +955,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// shared tx
for i, tx := range (types.Transactions{postponed, swapped}) {
- if GetTransaction(db, tx.Hash()) == nil {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil {
t.Errorf("share %d: expected tx to be found", i)
}
if GetReceipt(db, tx.Hash()) == nil {
diff --git a/core/chain_makers.go b/core/chain_makers.go
index 56e37a0fc..f1ada487f 100644
--- a/core/chain_makers.go
+++ b/core/chain_makers.go
@@ -214,7 +214,7 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header {
// newCanonical creates a chain database, and injects a deterministic canonical
// chain. Depending on the full flag, if creates either a full block chain or a
// header only chain.
-func newCanonical(n int, full bool) (ethdb.Database, *BlockProcessor, error) {
+func newCanonical(n int, full bool) (ethdb.Database, *BlockChain, error) {
// Create te new chain database
db, _ := ethdb.NewMemDatabase()
evmux := &event.TypeMux{}
@@ -223,23 +223,20 @@ func newCanonical(n int, full bool) (ethdb.Database, *BlockProcessor, error) {
genesis, _ := WriteTestNetGenesisBlock(db, 0)
blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
- processor := NewBlockProcessor(db, FakePow{}, blockchain, evmux)
- processor.bc.SetProcessor(processor)
-
// Create and inject the requested chain
if n == 0 {
- return db, processor, nil
+ return db, blockchain, nil
}
if full {
// Full block-chain requested
blocks := makeBlockChain(genesis, n, db, canonicalSeed)
_, err := blockchain.InsertChain(blocks)
- return db, processor, err
+ return db, blockchain, err
}
// Header-only chain requested
headers := makeHeaderChain(genesis.Header(), n, db, canonicalSeed)
_, err := blockchain.InsertHeaderChain(headers, 1)
- return db, processor, err
+ return db, blockchain, err
}
// makeHeaderChain creates a deterministic chain of headers rooted at parent.
diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go
index 7f47cf288..b9c1d89b7 100644
--- a/core/chain_makers_test.go
+++ b/core/chain_makers_test.go
@@ -77,15 +77,14 @@ func ExampleGenerateChain() {
// Import the chain. This runs all block validation rules.
evmux := &event.TypeMux{}
- chainman, _ := NewBlockChain(db, FakePow{}, evmux)
- chainman.SetProcessor(NewBlockProcessor(db, FakePow{}, chainman, evmux))
- if i, err := chainman.InsertChain(chain); err != nil {
+ blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
+ if i, err := blockchain.InsertChain(chain); err != nil {
fmt.Printf("insert error (block %d): %v\n", i, err)
return
}
- state, _ := chainman.State()
- fmt.Printf("last block: #%d\n", chainman.CurrentBlock().Number())
+ state, _ := blockchain.State()
+ fmt.Printf("last block: #%d\n", blockchain.CurrentBlock().Number())
fmt.Println("balance of addr1:", state.GetBalance(addr1))
fmt.Println("balance of addr2:", state.GetBalance(addr2))
fmt.Println("balance of addr3:", state.GetBalance(addr3))
diff --git a/core/chain_util.go b/core/database_util.go
index ddff381a1..fbcce3e8c 100644
--- a/core/chain_util.go
+++ b/core/database_util.go
@@ -43,11 +43,15 @@ var (
bodySuffix = []byte("-body")
tdSuffix = []byte("-td")
- ExpDiffPeriod = big.NewInt(100000)
- blockHashPre = []byte("block-hash-") // [deprecated by eth/63]
+ txMetaSuffix = []byte{0x01}
+ receiptsPrefix = []byte("receipts-")
+ blockReceiptsPrefix = []byte("receipts-block-")
mipmapPre = []byte("mipmap-log-bloom-")
MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000}
+
+ ExpDiffPeriod = big.NewInt(100000)
+ blockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually]
)
// CalcDifficulty is the difficulty adjustment algorithm. It returns
@@ -234,6 +238,67 @@ func GetBlock(db ethdb.Database, hash common.Hash) *types.Block {
return types.NewBlockWithHeader(header).WithBody(body.Transactions, body.Uncles)
}
+// GetBlockReceipts retrieves the receipts generated by the transactions included
+// in a block given by its hash.
+func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
+ data, _ := db.Get(append(blockReceiptsPrefix, hash[:]...))
+ if len(data) == 0 {
+ return nil
+ }
+ storageReceipts := []*types.ReceiptForStorage{}
+ if err := rlp.DecodeBytes(data, &storageReceipts); err != nil {
+ glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
+ return nil
+ }
+ receipts := make(types.Receipts, len(storageReceipts))
+ for i, receipt := range storageReceipts {
+ receipts[i] = (*types.Receipt)(receipt)
+ }
+ return receipts
+}
+
+// GetTransaction retrieves a specific transaction from the database, along with
+// its added positional metadata.
+func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {
+ // Retrieve the transaction itself from the database
+ data, _ := db.Get(hash.Bytes())
+ if len(data) == 0 {
+ return nil, common.Hash{}, 0, 0
+ }
+ var tx types.Transaction
+ if err := rlp.DecodeBytes(data, &tx); err != nil {
+ return nil, common.Hash{}, 0, 0
+ }
+ // Retrieve the blockchain positional metadata
+ data, _ = db.Get(append(hash.Bytes(), txMetaSuffix...))
+ if len(data) == 0 {
+ return nil, common.Hash{}, 0, 0
+ }
+ var meta struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
+ }
+ if err := rlp.DecodeBytes(data, &meta); err != nil {
+ return nil, common.Hash{}, 0, 0
+ }
+ return &tx, meta.BlockHash, meta.BlockIndex, meta.Index
+}
+
+// GetReceipt returns a receipt by hash
+func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
+ data, _ := db.Get(append(receiptsPrefix, txHash[:]...))
+ if len(data) == 0 {
+ return nil
+ }
+ var receipt types.ReceiptForStorage
+ err := rlp.DecodeBytes(data, &receipt)
+ if err != nil {
+ glog.V(logger.Core).Infoln("GetReceipt err:", err)
+ }
+ return (*types.Receipt)(&receipt)
+}
+
// WriteCanonicalHash stores the canonical hash for the given block number.
func WriteCanonicalHash(db ethdb.Database, hash common.Hash, number uint64) error {
key := append(blockNumPrefix, big.NewInt(int64(number)).Bytes()...)
@@ -329,6 +394,94 @@ func WriteBlock(db ethdb.Database, block *types.Block) error {
return nil
}
+// WriteBlockReceipts stores all the transaction receipts belonging to a block
+// as a single receipt slice. This is used during chain reorganisations for
+// rescheduling dropped transactions.
+func WriteBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
+ // Convert the receipts into their storage form and serialize them
+ storageReceipts := make([]*types.ReceiptForStorage, len(receipts))
+ for i, receipt := range receipts {
+ storageReceipts[i] = (*types.ReceiptForStorage)(receipt)
+ }
+ bytes, err := rlp.EncodeToBytes(storageReceipts)
+ if err != nil {
+ return err
+ }
+ // Store the flattened receipt slice
+ if err := db.Put(append(blockReceiptsPrefix, hash.Bytes()...), bytes); err != nil {
+ glog.Fatalf("failed to store block receipts into database: %v", err)
+ return err
+ }
+ glog.V(logger.Debug).Infof("stored block receipts [%x…]", hash.Bytes()[:4])
+ return nil
+}
+
+// WriteTransactions stores the transactions associated with a specific block
+// into the given database. Beside writing the transaction, the function also
+// stores a metadata entry along with the transaction, detailing the position
+// of this within the blockchain.
+func WriteTransactions(db ethdb.Database, block *types.Block) error {
+ batch := db.NewBatch()
+
+ // Iterate over each transaction and encode it with its metadata
+ for i, tx := range block.Transactions() {
+ // Encode and queue up the transaction for storage
+ data, err := rlp.EncodeToBytes(tx)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(tx.Hash().Bytes(), data); err != nil {
+ return err
+ }
+ // Encode and queue up the transaction metadata for storage
+ meta := struct {
+ BlockHash common.Hash
+ BlockIndex uint64
+ Index uint64
+ }{
+ BlockHash: block.Hash(),
+ BlockIndex: block.NumberU64(),
+ Index: uint64(i),
+ }
+ data, err = rlp.EncodeToBytes(meta)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(append(tx.Hash().Bytes(), txMetaSuffix...), data); err != nil {
+ return err
+ }
+ }
+ // Write the scheduled data into the database
+ if err := batch.Write(); err != nil {
+ glog.Fatalf("failed to store transactions into database: %v", err)
+ return err
+ }
+ return nil
+}
+
+// WriteReceipts stores a batch of transaction receipts into the database.
+func WriteReceipts(db ethdb.Database, receipts types.Receipts) error {
+ batch := db.NewBatch()
+
+ // Iterate over all the receipts and queue them for database injection
+ for _, receipt := range receipts {
+ storageReceipt := (*types.ReceiptForStorage)(receipt)
+ data, err := rlp.EncodeToBytes(storageReceipt)
+ if err != nil {
+ return err
+ }
+ if err := batch.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data); err != nil {
+ return err
+ }
+ }
+ // Write the scheduled data into the database
+ if err := batch.Write(); err != nil {
+ glog.Fatalf("failed to store receipts into database: %v", err)
+ return err
+ }
+ return nil
+}
+
// DeleteCanonicalHash removes the number to hash canonical mapping.
func DeleteCanonicalHash(db ethdb.Database, number uint64) {
db.Delete(append(blockNumPrefix, big.NewInt(int64(number)).Bytes()...))
@@ -351,18 +504,35 @@ func DeleteTd(db ethdb.Database, hash common.Hash) {
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db ethdb.Database, hash common.Hash) {
+ DeleteBlockReceipts(db, hash)
DeleteHeader(db, hash)
DeleteBody(db, hash)
DeleteTd(db, hash)
}
-// [deprecated by eth/63]
+// DeleteBlockReceipts removes all receipt data associated with a block hash.
+func DeleteBlockReceipts(db ethdb.Database, hash common.Hash) {
+ db.Delete(append(blockReceiptsPrefix, hash.Bytes()...))
+}
+
+// DeleteTransaction removes all transaction data associated with a hash.
+func DeleteTransaction(db ethdb.Database, hash common.Hash) {
+ db.Delete(hash.Bytes())
+ db.Delete(append(hash.Bytes(), txMetaSuffix...))
+}
+
+// DeleteReceipt removes all receipt data associated with a transaction hash.
+func DeleteReceipt(db ethdb.Database, hash common.Hash) {
+ db.Delete(append(receiptsPrefix, hash.Bytes()...))
+}
+
+// [deprecated by the header/block split, remove eventually]
// GetBlockByHashOld returns the old combined block corresponding to the hash
// or nil if not found. This method is only used by the upgrade mechanism to
// access the old combined block representation. It will be dropped after the
// network transitions to eth/63.
func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block {
- data, _ := db.Get(append(blockHashPre, hash[:]...))
+ data, _ := db.Get(append(blockHashPrefix, hash[:]...))
if len(data) == 0 {
return nil
}
diff --git a/core/chain_util_test.go b/core/database_util_test.go
index 0bbcbbe53..059f1ae9f 100644
--- a/core/chain_util_test.go
+++ b/core/database_util_test.go
@@ -17,6 +17,7 @@
package core
import (
+ "bytes"
"encoding/json"
"io/ioutil"
"math/big"
@@ -341,6 +342,163 @@ func TestHeadStorage(t *testing.T) {
}
}
+// Tests that transactions and associated metadata can be stored and retrieved.
+func TestTransactionStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), big.NewInt(111), big.NewInt(1111), big.NewInt(11111), []byte{0x11, 0x11, 0x11})
+ tx2 := types.NewTransaction(2, common.BytesToAddress([]byte{0x22}), big.NewInt(222), big.NewInt(2222), big.NewInt(22222), []byte{0x22, 0x22, 0x22})
+ tx3 := types.NewTransaction(3, common.BytesToAddress([]byte{0x33}), big.NewInt(333), big.NewInt(3333), big.NewInt(33333), []byte{0x33, 0x33, 0x33})
+ txs := []*types.Transaction{tx1, tx2, tx3}
+
+ block := types.NewBlock(&types.Header{Number: big.NewInt(314)}, txs, nil, nil)
+
+ // Check that no transactions entries are in a pristine database
+ for i, tx := range txs {
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Fatalf("tx #%d [%x]: non existent transaction returned: %v", i, tx.Hash(), txn)
+ }
+ }
+ // Insert all the transactions into the database, and verify contents
+ if err := WriteTransactions(db, block); err != nil {
+ t.Fatalf("failed to write transactions: %v", err)
+ }
+ for i, tx := range txs {
+ if txn, hash, number, index := GetTransaction(db, tx.Hash()); txn == nil {
+ t.Fatalf("tx #%d [%x]: transaction not found", i, tx.Hash())
+ } else {
+ if hash != block.Hash() || number != block.NumberU64() || index != uint64(i) {
+ t.Fatalf("tx #%d [%x]: positional metadata mismatch: have %x/%d/%d, want %x/%v/%v", i, tx.Hash(), hash, number, index, block.Hash(), block.NumberU64(), i)
+ }
+ if tx.String() != txn.String() {
+ t.Fatalf("tx #%d [%x]: transaction mismatch: have %v, want %v", i, tx.Hash(), txn, tx)
+ }
+ }
+ }
+ // Delete the transactions and check purge
+ for i, tx := range txs {
+ DeleteTransaction(db, tx.Hash())
+ if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil {
+ t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn)
+ }
+ }
+}
+
+// Tests that receipts can be stored and retrieved.
+func TestReceiptStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ receipt1 := &types.Receipt{
+ PostState: []byte{0x01},
+ CumulativeGasUsed: big.NewInt(1),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x11})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x01, 0x11})},
+ },
+ TxHash: common.BytesToHash([]byte{0x11, 0x11}),
+ ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}),
+ GasUsed: big.NewInt(111111),
+ }
+ receipt2 := &types.Receipt{
+ PostState: []byte{0x02},
+ CumulativeGasUsed: big.NewInt(2),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x22})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x02, 0x22})},
+ },
+ TxHash: common.BytesToHash([]byte{0x22, 0x22}),
+ ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}),
+ GasUsed: big.NewInt(222222),
+ }
+ receipts := []*types.Receipt{receipt1, receipt2}
+
+ // Check that no receipt entries are in a pristine database
+ for i, receipt := range receipts {
+ if r := GetReceipt(db, receipt.TxHash); r != nil {
+ t.Fatalf("receipt #%d [%x]: non existent receipt returned: %v", i, receipt.TxHash, r)
+ }
+ }
+ // Insert all the receipts into the database, and verify contents
+ if err := WriteReceipts(db, receipts); err != nil {
+ t.Fatalf("failed to write receipts: %v", err)
+ }
+ for i, receipt := range receipts {
+ if r := GetReceipt(db, receipt.TxHash); r == nil {
+ t.Fatalf("receipt #%d [%x]: receipt not found", i, receipt.TxHash)
+ } else {
+ rlpHave, _ := rlp.EncodeToBytes(r)
+ rlpWant, _ := rlp.EncodeToBytes(receipt)
+
+ if bytes.Compare(rlpHave, rlpWant) != 0 {
+ t.Fatalf("receipt #%d [%x]: receipt mismatch: have %v, want %v", i, receipt.TxHash, r, receipt)
+ }
+ }
+ }
+ // Delete the receipts and check purge
+ for i, receipt := range receipts {
+ DeleteReceipt(db, receipt.TxHash)
+ if r := GetReceipt(db, receipt.TxHash); r != nil {
+ t.Fatalf("receipt #%d [%x]: deleted receipt returned: %v", i, receipt.TxHash, r)
+ }
+ }
+}
+
+// Tests that receipts associated with a single block can be stored and retrieved.
+func TestBlockReceiptStorage(t *testing.T) {
+ db, _ := ethdb.NewMemDatabase()
+
+ receipt1 := &types.Receipt{
+ PostState: []byte{0x01},
+ CumulativeGasUsed: big.NewInt(1),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x11})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x01, 0x11})},
+ },
+ TxHash: common.BytesToHash([]byte{0x11, 0x11}),
+ ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}),
+ GasUsed: big.NewInt(111111),
+ }
+ receipt2 := &types.Receipt{
+ PostState: []byte{0x02},
+ CumulativeGasUsed: big.NewInt(2),
+ Logs: vm.Logs{
+ &vm.Log{Address: common.BytesToAddress([]byte{0x22})},
+ &vm.Log{Address: common.BytesToAddress([]byte{0x02, 0x22})},
+ },
+ TxHash: common.BytesToHash([]byte{0x22, 0x22}),
+ ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}),
+ GasUsed: big.NewInt(222222),
+ }
+ receipts := []*types.Receipt{receipt1, receipt2}
+
+ // Check that no receipt entries are in a pristine database
+ hash := common.BytesToHash([]byte{0x03, 0x14})
+ if rs := GetBlockReceipts(db, hash); len(rs) != 0 {
+ t.Fatalf("non existent receipts returned: %v", rs)
+ }
+ // Insert the receipt slice into the database and check presence
+ if err := WriteBlockReceipts(db, hash, receipts); err != nil {
+ t.Fatalf("failed to write block receipts: %v", err)
+ }
+ if rs := GetBlockReceipts(db, hash); len(rs) == 0 {
+ t.Fatalf("no receipts returned")
+ } else {
+ for i := 0; i < len(receipts); i++ {
+ rlpHave, _ := rlp.EncodeToBytes(rs[i])
+ rlpWant, _ := rlp.EncodeToBytes(receipts[i])
+
+ if bytes.Compare(rlpHave, rlpWant) != 0 {
+ t.Fatalf("receipt #%d: receipt mismatch: have %v, want %v", i, rs[i], receipts[i])
+ }
+ }
+ }
+ // Delete the receipt slice and check purge
+ DeleteBlockReceipts(db, hash)
+ if rs := GetBlockReceipts(db, hash); len(rs) != 0 {
+ t.Fatalf("deleted receipts returned: %v", rs)
+ }
+}
+
func TestMipmapBloom(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
@@ -425,7 +583,7 @@ func TestMipmapChain(t *testing.T) {
}
// store the receipts
- err := PutReceipts(db, receipts)
+ err := WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -439,7 +597,7 @@ func TestMipmapChain(t *testing.T) {
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/core/gaspool.go b/core/gaspool.go
new file mode 100644
index 000000000..2ef07c754
--- /dev/null
+++ b/core/gaspool.go
@@ -0,0 +1,46 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import "math/big"
+
+// GasPool tracks the amount of gas available during
+// execution of the transactions in a block.
+// The zero value is a pool with zero gas available.
+type GasPool big.Int
+
+// AddGas makes gas available for execution.
+func (gp *GasPool) AddGas(amount *big.Int) *GasPool {
+ i := (*big.Int)(gp)
+ i.Add(i, amount)
+ return gp
+}
+
+// SubGas deducts the given amount from the pool if enough gas is
+// available and returns an error otherwise.
+func (gp *GasPool) SubGas(amount *big.Int) error {
+ i := (*big.Int)(gp)
+ if i.Cmp(amount) < 0 {
+ return &GasLimitErr{Have: new(big.Int).Set(i), Want: amount}
+ }
+ i.Sub(i, amount)
+ return nil
+}
+
+func (gp *GasPool) String() string {
+ return (*big.Int)(gp).String()
+}
diff --git a/core/genesis.go b/core/genesis.go
index dac5de92f..3fd8f42b0 100644
--- a/core/genesis.go
+++ b/core/genesis.go
@@ -103,7 +103,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block,
if err := WriteBlock(chainDb, block); err != nil {
return nil, err
}
- if err := PutBlockReceipts(chainDb, block.Hash(), nil); err != nil {
+ if err := WriteBlockReceipts(chainDb, block.Hash(), nil); err != nil {
return nil, err
}
if err := WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()); err != nil {
diff --git a/core/state_processor.go b/core/state_processor.go
new file mode 100644
index 000000000..d9c24935d
--- /dev/null
+++ b/core/state_processor.go
@@ -0,0 +1,107 @@
+package core
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+var (
+ big8 = big.NewInt(8)
+ big32 = big.NewInt(32)
+)
+
+type StateProcessor struct {
+ bc *BlockChain
+}
+
+func NewStateProcessor(bc *BlockChain) *StateProcessor {
+ return &StateProcessor{bc}
+}
+
+// Process processes the state changes according to the Ethereum rules by running
+// the transaction messages using the statedb and applying any rewards to both
+// the processor (coinbase) and any included uncles.
+//
+// Process returns the receipts and logs accumulated during the process and
+// returns the amount of gas that was used in the process. If any of the
+// transactions failed to execute due to insufficient gas it will return an error.
+func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB) (types.Receipts, vm.Logs, *big.Int, error) {
+ var (
+ receipts types.Receipts
+ totalUsedGas = big.NewInt(0)
+ err error
+ header = block.Header()
+ allLogs vm.Logs
+ gp = new(GasPool).AddGas(block.GasLimit())
+ )
+
+ for i, tx := range block.Transactions() {
+ statedb.StartRecord(tx.Hash(), block.Hash(), i)
+
+ receipt, logs, _, err := ApplyTransaction(p.bc, gp, statedb, header, tx, totalUsedGas)
+ if err != nil {
+ return nil, nil, totalUsedGas, err
+ }
+ receipts = append(receipts, receipt)
+ allLogs = append(allLogs, logs...)
+ }
+ AccumulateRewards(statedb, header, block.Uncles())
+
+ return receipts, allLogs, totalUsedGas, err
+}
+
+// ApplyTransaction attemps to apply a transaction to the given state database
+// and uses the input parameters for its environment.
+//
+// ApplyTransactions returns the generated receipts and vm logs during the
+// execution of the state transition phase.
+func ApplyTransaction(bc *BlockChain, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *big.Int) (*types.Receipt, vm.Logs, *big.Int, error) {
+ _, gas, err := ApplyMessage(NewEnv(statedb, bc, tx, header), tx, gp)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ // Update the state with pending changes
+ usedGas.Add(usedGas, gas)
+ receipt := types.NewReceipt(statedb.IntermediateRoot().Bytes(), usedGas)
+ receipt.TxHash = tx.Hash()
+ receipt.GasUsed = new(big.Int).Set(gas)
+ if MessageCreatesContract(tx) {
+ from, _ := tx.From()
+ receipt.ContractAddress = crypto.CreateAddress(from, tx.Nonce())
+ }
+
+ logs := statedb.GetLogs(tx.Hash())
+ receipt.Logs = logs
+ receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
+
+ glog.V(logger.Debug).Infoln(receipt)
+
+ return receipt, logs, gas, err
+}
+
+// AccumulateRewards credits the coinbase of the given block with the
+// mining reward. The total reward consists of the static block reward
+// and rewards for included uncles. The coinbase of each uncle block is
+// also rewarded.
+func AccumulateRewards(statedb *state.StateDB, header *types.Header, uncles []*types.Header) {
+ reward := new(big.Int).Set(BlockReward)
+ r := new(big.Int)
+ for _, uncle := range uncles {
+ r.Add(uncle.Number, big8)
+ r.Sub(r, header.Number)
+ r.Mul(r, BlockReward)
+ r.Div(r, big8)
+ statedb.AddBalance(uncle.Coinbase, r)
+
+ r.Div(BlockReward, big32)
+ reward.Add(reward, r)
+ }
+ statedb.AddBalance(header.Coinbase, reward)
+}
diff --git a/core/transaction_util.go b/core/transaction_util.go
deleted file mode 100644
index e2e5b9aee..000000000
--- a/core/transaction_util.go
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package core
-
-import (
- "fmt"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/syndtr/goleveldb/leveldb"
-)
-
-var (
- receiptsPre = []byte("receipts-")
- blockReceiptsPre = []byte("receipts-block-")
-)
-
-// PutTransactions stores the transactions in the given database
-func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error {
- batch := db.NewBatch()
-
- for i, tx := range block.Transactions() {
- rlpEnc, err := rlp.EncodeToBytes(tx)
- if err != nil {
- return fmt.Errorf("failed encoding tx: %v", err)
- }
-
- batch.Put(tx.Hash().Bytes(), rlpEnc)
-
- var txExtra struct {
- BlockHash common.Hash
- BlockIndex uint64
- Index uint64
- }
- txExtra.BlockHash = block.Hash()
- txExtra.BlockIndex = block.NumberU64()
- txExtra.Index = uint64(i)
- rlpMeta, err := rlp.EncodeToBytes(txExtra)
- if err != nil {
- return fmt.Errorf("failed encoding tx meta data: %v", err)
- }
-
- batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
- }
-
- if err := batch.Write(); err != nil {
- return fmt.Errorf("failed writing tx to db: %v", err)
- }
- return nil
-}
-
-func DeleteTransaction(db ethdb.Database, txHash common.Hash) {
- db.Delete(txHash[:])
-}
-
-func GetTransaction(db ethdb.Database, txhash common.Hash) *types.Transaction {
- data, _ := db.Get(txhash[:])
- if len(data) != 0 {
- var tx types.Transaction
- if err := rlp.DecodeBytes(data, &tx); err != nil {
- return nil
- }
- return &tx
- }
- return nil
-}
-
-// PutReceipts stores the receipts in the current database
-func PutReceipts(db ethdb.Database, receipts types.Receipts) error {
- batch := new(leveldb.Batch)
- _, batchWrite := db.(*ethdb.LDBDatabase)
-
- for _, receipt := range receipts {
- storageReceipt := (*types.ReceiptForStorage)(receipt)
- bytes, err := rlp.EncodeToBytes(storageReceipt)
- if err != nil {
- return err
- }
-
- if batchWrite {
- batch.Put(append(receiptsPre, receipt.TxHash[:]...), bytes)
- } else {
- err = db.Put(append(receiptsPre, receipt.TxHash[:]...), bytes)
- if err != nil {
- return err
- }
- }
- }
- if db, ok := db.(*ethdb.LDBDatabase); ok {
- if err := db.LDB().Write(batch, nil); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// Delete a receipts from the database
-func DeleteReceipt(db ethdb.Database, txHash common.Hash) {
- db.Delete(append(receiptsPre, txHash[:]...))
-}
-
-// GetReceipt returns a receipt by hash
-func GetReceipt(db ethdb.Database, txHash common.Hash) *types.Receipt {
- data, _ := db.Get(append(receiptsPre, txHash[:]...))
- if len(data) == 0 {
- return nil
- }
- var receipt types.ReceiptForStorage
- err := rlp.DecodeBytes(data, &receipt)
- if err != nil {
- glog.V(logger.Core).Infoln("GetReceipt err:", err)
- }
- return (*types.Receipt)(&receipt)
-}
-
-// GetBlockReceipts returns the receipts generated by the transactions
-// included in block's given hash.
-func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
- data, _ := db.Get(append(blockReceiptsPre, hash[:]...))
- if len(data) == 0 {
- return nil
- }
- rs := []*types.ReceiptForStorage{}
- if err := rlp.DecodeBytes(data, &rs); err != nil {
- glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
- return nil
- }
- receipts := make(types.Receipts, len(rs))
- for i, receipt := range rs {
- receipts[i] = (*types.Receipt)(receipt)
- }
- return receipts
-}
-
-// PutBlockReceipts stores the block's transactions associated receipts
-// and stores them by block hash in a single slice. This is required for
-// forks and chain reorgs
-func PutBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
- rs := make([]*types.ReceiptForStorage, len(receipts))
- for i, receipt := range receipts {
- rs[i] = (*types.ReceiptForStorage)(receipt)
- }
- bytes, err := rlp.EncodeToBytes(rs)
- if err != nil {
- return err
- }
- err = db.Put(append(blockReceiptsPre, hash[:]...), bytes)
- if err != nil {
- return err
- }
- return nil
-}
diff --git a/core/types.go b/core/types.go
new file mode 100644
index 000000000..027f628b1
--- /dev/null
+++ b/core/types.go
@@ -0,0 +1,70 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/accounts"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+)
+
+// Validator is an interface which defines the standard for block validation.
+//
+// The validator is responsible for validating incoming block or, if desired,
+// validates headers for fast validation.
+//
+// ValidateBlock validates the given block and should return an error if it
+// failed to do so and should be used for "full" validation.
+//
+// ValidateHeader validates the given header and parent and returns an error
+// if it failed to do so.
+//
+// ValidateStack validates the given statedb and optionally the receipts and
+// gas used. The implementor should decide what to do with the given input.
+type Validator interface {
+ ValidateBlock(block *types.Block) error
+ ValidateHeader(header, parent *types.Header, checkPow bool) error
+ ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas *big.Int) error
+}
+
+// Processor is an interface for processing blocks using a given initial state.
+//
+// Process takes the block to be processed and the statedb upon which the
+// initial state is based. It should return the receipts generated, amount
+// of gas used in the process and return an error if any of the internal rules
+// failed.
+type Processor interface {
+ Process(block *types.Block, statedb *state.StateDB) (types.Receipts, vm.Logs, *big.Int, error)
+}
+
+// Backend is an interface defining the basic functionality for an operable node
+// with all the functionality to be a functional, valid Ethereum operator.
+//
+// TODO Remove this
+type Backend interface {
+ AccountManager() *accounts.Manager
+ BlockChain() *BlockChain
+ TxPool() *TxPool
+ ChainDb() ethdb.Database
+ DappDb() ethdb.Database
+ EventMux() *event.TypeMux
+}
diff --git a/core/types/common.go b/core/vm/runtime/doc.go
index fe682f98a..a3b464a7d 100644
--- a/core/types/common.go
+++ b/core/vm/runtime/doc.go
@@ -14,12 +14,5 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-package types
-
-import "github.com/ethereum/go-ethereum/core/vm"
-
-type BlockProcessor interface {
- Process(*Block) (vm.Logs, Receipts, error)
- ValidateHeader(*Header, bool, bool) error
- ValidateHeaderWithParent(*Header, *Header, bool, bool) error
-}
+// Package runtime provides a basic execution model for executing EVM code.
+package runtime
diff --git a/core/vm/runtime/env.go b/core/vm/runtime/env.go
new file mode 100644
index 000000000..22f9ea14d
--- /dev/null
+++ b/core/vm/runtime/env.go
@@ -0,0 +1,106 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package runtime
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/vm"
+)
+
+// Env is a basic runtime environment required for running the EVM.
+type Env struct {
+ depth int
+ state *state.StateDB
+
+ origin common.Address
+ coinbase common.Address
+
+ number *big.Int
+ time *big.Int
+ difficulty *big.Int
+ gasLimit *big.Int
+
+ logs []vm.StructLog
+
+ getHashFn func(uint64) common.Hash
+}
+
+// NewEnv returns a new vm.Environment
+func NewEnv(cfg *Config, state *state.StateDB) vm.Environment {
+ return &Env{
+ state: state,
+ origin: cfg.Origin,
+ coinbase: cfg.Coinbase,
+ number: cfg.BlockNumber,
+ time: cfg.Time,
+ difficulty: cfg.Difficulty,
+ gasLimit: cfg.GasLimit,
+ }
+}
+
+func (self *Env) StructLogs() []vm.StructLog {
+ return self.logs
+}
+
+func (self *Env) AddStructLog(log vm.StructLog) {
+ self.logs = append(self.logs, log)
+}
+
+func (self *Env) Origin() common.Address { return self.origin }
+func (self *Env) BlockNumber() *big.Int { return self.number }
+func (self *Env) Coinbase() common.Address { return self.coinbase }
+func (self *Env) Time() *big.Int { return self.time }
+func (self *Env) Difficulty() *big.Int { return self.difficulty }
+func (self *Env) Db() vm.Database { return self.state }
+func (self *Env) GasLimit() *big.Int { return self.gasLimit }
+func (self *Env) VmType() vm.Type { return vm.StdVmTy }
+func (self *Env) GetHash(n uint64) common.Hash {
+ return self.getHashFn(n)
+}
+func (self *Env) AddLog(log *vm.Log) {
+ self.state.AddLog(log)
+}
+func (self *Env) Depth() int { return self.depth }
+func (self *Env) SetDepth(i int) { self.depth = i }
+func (self *Env) CanTransfer(from common.Address, balance *big.Int) bool {
+ return self.state.GetBalance(from).Cmp(balance) >= 0
+}
+func (self *Env) MakeSnapshot() vm.Database {
+ return self.state.Copy()
+}
+func (self *Env) SetSnapshot(copy vm.Database) {
+ self.state.Set(copy.(*state.StateDB))
+}
+
+func (self *Env) Transfer(from, to vm.Account, amount *big.Int) {
+ core.Transfer(from, to, amount)
+}
+
+func (self *Env) Call(caller vm.ContractRef, addr common.Address, data []byte, gas, price, value *big.Int) ([]byte, error) {
+ return core.Call(self, caller, addr, data, gas, price, value)
+}
+func (self *Env) CallCode(caller vm.ContractRef, addr common.Address, data []byte, gas, price, value *big.Int) ([]byte, error) {
+ return core.CallCode(self, caller, addr, data, gas, price, value)
+}
+
+func (self *Env) Create(caller vm.ContractRef, data []byte, gas, price, value *big.Int) ([]byte, common.Address, error) {
+ return core.Create(self, caller, data, gas, price, value)
+}
diff --git a/core/vm/runtime/runtime.go b/core/vm/runtime/runtime.go
new file mode 100644
index 000000000..dd3aa1b0b
--- /dev/null
+++ b/core/vm/runtime/runtime.go
@@ -0,0 +1,121 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package runtime
+
+import (
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/state"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
+)
+
+// Config is a basic type specifing certain configuration flags for running
+// the EVM.
+type Config struct {
+ Difficulty *big.Int
+ Origin common.Address
+ Coinbase common.Address
+ BlockNumber *big.Int
+ Time *big.Int
+ GasLimit *big.Int
+ GasPrice *big.Int
+ Value *big.Int
+ DisableJit bool // "disable" so it's enabled by default
+ Debug bool
+
+ GetHashFn func(n uint64) common.Hash
+}
+
+// sets defaults on the config
+func setDefaults(cfg *Config) {
+ if cfg.Difficulty == nil {
+ cfg.Difficulty = new(big.Int)
+ }
+ if cfg.Time == nil {
+ cfg.Time = big.NewInt(time.Now().Unix())
+ }
+ if cfg.GasLimit == nil {
+ cfg.GasLimit = new(big.Int).Set(common.MaxBig)
+ }
+ if cfg.GasPrice == nil {
+ cfg.GasPrice = new(big.Int)
+ }
+ if cfg.Value == nil {
+ cfg.Value = new(big.Int)
+ }
+ if cfg.BlockNumber == nil {
+ cfg.BlockNumber = new(big.Int)
+ }
+ if cfg.GetHashFn == nil {
+ cfg.GetHashFn = func(n uint64) common.Hash {
+ return common.BytesToHash(crypto.Sha3([]byte(new(big.Int).SetUint64(n).String())))
+ }
+ }
+}
+
+// Execute executes the code using the input as call data during the execution.
+// It returns the EVM's return value, the new state and an error if it failed.
+//
+// Executes sets up a in memory, temporarily, environment for the execution of
+// the given code. It enabled the JIT by default and make sure that it's restored
+// to it's original state afterwards.
+func Execute(code, input []byte, cfg *Config) ([]byte, *state.StateDB, error) {
+ if cfg == nil {
+ cfg = new(Config)
+ }
+ setDefaults(cfg)
+
+ // defer the call to setting back the original values
+ defer func(debug, forceJit, enableJit bool) {
+ vm.Debug = debug
+ vm.ForceJit = forceJit
+ vm.EnableJit = enableJit
+ }(vm.Debug, vm.ForceJit, vm.EnableJit)
+
+ vm.ForceJit = !cfg.DisableJit
+ vm.EnableJit = !cfg.DisableJit
+ vm.Debug = cfg.Debug
+
+ var (
+ db, _ = ethdb.NewMemDatabase()
+ statedb, _ = state.New(common.Hash{}, db)
+ vmenv = NewEnv(cfg, statedb)
+ sender = statedb.CreateAccount(cfg.Origin)
+ receiver = statedb.CreateAccount(common.StringToAddress("contract"))
+ )
+ // set the receiver's (the executing contract) code for execution.
+ receiver.SetCode(code)
+
+ // Call the code with the given configuration.
+ ret, err := vmenv.Call(
+ sender,
+ receiver.Address(),
+ input,
+ cfg.GasLimit,
+ cfg.GasPrice,
+ cfg.Value,
+ )
+
+ if cfg.Debug {
+ vm.StdErrFormat(vmenv.StructLogs())
+ }
+ return ret, statedb, err
+}
diff --git a/core/manager.go b/core/vm/runtime/runtime_example_test.go
index 289c87c11..b7d0ddc38 100644
--- a/core/manager.go
+++ b/core/vm/runtime/runtime_example_test.go
@@ -14,21 +14,21 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-package core
+package runtime_test
import (
- "github.com/ethereum/go-ethereum/accounts"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/vm/runtime"
)
-// TODO move this to types?
-type Backend interface {
- AccountManager() *accounts.Manager
- BlockProcessor() *BlockProcessor
- BlockChain() *BlockChain
- TxPool() *TxPool
- ChainDb() ethdb.Database
- DappDb() ethdb.Database
- EventMux() *event.TypeMux
+func ExampleExecute() {
+ ret, _, err := runtime.Execute(common.Hex2Bytes("6060604052600a8060106000396000f360606040526008565b00"), nil, nil)
+ if err != nil {
+ fmt.Println(err)
+ }
+ fmt.Println(ret)
+ // Output:
+ // [96 96 96 64 82 96 8 86 91 0]
}
diff --git a/core/vm/runtime/runtime_test.go b/core/vm/runtime/runtime_test.go
new file mode 100644
index 000000000..773a0163e
--- /dev/null
+++ b/core/vm/runtime/runtime_test.go
@@ -0,0 +1,120 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package runtime
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/accounts/abi"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/vm"
+)
+
+func TestDefaults(t *testing.T) {
+ cfg := new(Config)
+ setDefaults(cfg)
+
+ if cfg.Difficulty == nil {
+ t.Error("expected difficulty to be non nil")
+ }
+
+ if cfg.Time == nil {
+ t.Error("expected time to be non nil")
+ }
+ if cfg.GasLimit == nil {
+ t.Error("expected time to be non nil")
+ }
+ if cfg.GasPrice == nil {
+ t.Error("expected time to be non nil")
+ }
+ if cfg.Value == nil {
+ t.Error("expected time to be non nil")
+ }
+ if cfg.GetHashFn == nil {
+ t.Error("expected time to be non nil")
+ }
+ if cfg.BlockNumber == nil {
+ t.Error("expected block number to be non nil")
+ }
+}
+
+func TestEnvironment(t *testing.T) {
+ defer func() {
+ if r := recover(); r != nil {
+ t.Fatalf("crashed with: %v", r)
+ }
+ }()
+
+ Execute([]byte{
+ byte(vm.DIFFICULTY),
+ byte(vm.TIMESTAMP),
+ byte(vm.GASLIMIT),
+ byte(vm.PUSH1),
+ byte(vm.ORIGIN),
+ byte(vm.BLOCKHASH),
+ byte(vm.COINBASE),
+ }, nil, nil)
+}
+
+func TestRestoreDefaults(t *testing.T) {
+ Execute(nil, nil, &Config{Debug: true})
+ if vm.ForceJit {
+ t.Error("expected force jit to be disabled")
+ }
+
+ if vm.Debug {
+ t.Error("expected debug to be disabled")
+ }
+
+ if vm.EnableJit {
+ t.Error("expected jit to be disabled")
+ }
+}
+
+func BenchmarkCall(b *testing.B) {
+ var definition = `[{"constant":true,"inputs":[],"name":"seller","outputs":[{"name":"","type":"address"}],"type":"function"},{"constant":false,"inputs":[],"name":"abort","outputs":[],"type":"function"},{"constant":true,"inputs":[],"name":"value","outputs":[{"name":"","type":"uint256"}],"type":"function"},{"constant":false,"inputs":[],"name":"refund","outputs":[],"type":"function"},{"constant":true,"inputs":[],"name":"buyer","outputs":[{"name":"","type":"address"}],"type":"function"},{"constant":false,"inputs":[],"name":"confirmReceived","outputs":[],"type":"function"},{"constant":true,"inputs":[],"name":"state","outputs":[{"name":"","type":"uint8"}],"type":"function"},{"constant":false,"inputs":[],"name":"confirmPurchase","outputs":[],"type":"function"},{"inputs":[],"type":"constructor"},{"anonymous":false,"inputs":[],"name":"Aborted","type":"event"},{"anonymous":false,"inputs":[],"name":"PurchaseConfirmed","type":"event"},{"anonymous":false,"inputs":[],"name":"ItemReceived","type":"event"},{"anonymous":false,"inputs":[],"name":"Refunded","type":"event"}]`
+
+ var code = common.Hex2Bytes("6060604052361561006c5760e060020a600035046308551a53811461007457806335a063b4146100865780633fa4f245146100a6578063590e1ae3146100af5780637150d8ae146100cf57806373fac6f0146100e1578063c19d93fb146100fe578063d696069714610112575b610131610002565b610133600154600160a060020a031681565b610131600154600160a060020a0390811633919091161461015057610002565b61014660005481565b610131600154600160a060020a039081163391909116146102d557610002565b610133600254600160a060020a031681565b610131600254600160a060020a0333811691161461023757610002565b61014660025460ff60a060020a9091041681565b61013160025460009060ff60a060020a9091041681146101cc57610002565b005b600160a060020a03166060908152602090f35b6060908152602090f35b60025460009060a060020a900460ff16811461016b57610002565b600154600160a060020a03908116908290301631606082818181858883f150506002805460a060020a60ff02191660a160020a179055506040517f72c874aeff0b183a56e2b79c71b46e1aed4dee5e09862134b8821ba2fddbf8bf9250a150565b80546002023414806101dd57610002565b6002805460a060020a60ff021973ffffffffffffffffffffffffffffffffffffffff1990911633171660a060020a1790557fd5d55c8a68912e9a110618df8d5e2e83b8d83211c57a8ddd1203df92885dc881826060a15050565b60025460019060a060020a900460ff16811461025257610002565b60025460008054600160a060020a0390921691606082818181858883f150508354604051600160a060020a0391821694503090911631915082818181858883f150506002805460a060020a60ff02191660a160020a179055506040517fe89152acd703c9d8c7d28829d443260b411454d45394e7995815140c8cbcbcf79250a150565b60025460019060a060020a900460ff1681146102f057610002565b6002805460008054600160a060020a0390921692909102606082818181858883f150508354604051600160a060020a0391821694503090911631915082818181858883f150506002805460a060020a60ff02191660a160020a179055506040517f8616bbbbad963e4e65b1366f1d75dfb63f9e9704bbbf91fb01bec70849906cf79250a15056")
+
+ abi, err := abi.JSON(strings.NewReader(definition))
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ cpurchase, err := abi.Pack("confirmPurchase")
+ if err != nil {
+ b.Fatal(err)
+ }
+ creceived, err := abi.Pack("confirmReceived")
+ if err != nil {
+ b.Fatal(err)
+ }
+ refund, err := abi.Pack("refund")
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ for j := 0; j < 400; j++ {
+ Execute(code, cpurchase, nil)
+ Execute(code, creceived, nil)
+ Execute(code, refund, nil)
+ }
+ }
+}
diff --git a/crypto/secp256k1/panic_cb.go b/crypto/secp256k1/panic_cb.go
new file mode 100644
index 000000000..e0e9034ee
--- /dev/null
+++ b/crypto/secp256k1/panic_cb.go
@@ -0,0 +1,33 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package secp256k1
+
+import "C"
+import "unsafe"
+
+// Callbacks for converting libsecp256k1 internal faults into
+// recoverable Go panics.
+
+//export secp256k1GoPanicIllegal
+func secp256k1GoPanicIllegal(msg *C.char, data unsafe.Pointer) {
+ panic("illegal argument: " + C.GoString(msg))
+}
+
+//export secp256k1GoPanicError
+func secp256k1GoPanicError(msg *C.char, data unsafe.Pointer) {
+ panic("internal error: " + C.GoString(msg))
+}
diff --git a/crypto/secp256k1/secp256.go b/crypto/secp256k1/secp256.go
index a2104016a..41a5608a5 100644
--- a/crypto/secp256k1/secp256.go
+++ b/crypto/secp256k1/secp256.go
@@ -20,11 +20,11 @@ package secp256k1
/*
#cgo CFLAGS: -I./libsecp256k1
-#cgo darwin CFLAGS: -I/usr/local/include -I/opt/pkg/include
+#cgo darwin CFLAGS: -I/usr/local/include
#cgo freebsd CFLAGS: -I/usr/local/include
#cgo linux,arm CFLAGS: -I/usr/local/arm/include
#cgo LDFLAGS: -lgmp
-#cgo darwin LDFLAGS: -L/usr/local/lib -L/opt/pkg/lib
+#cgo darwin LDFLAGS: -L/usr/local/lib
#cgo freebsd LDFLAGS: -L/usr/local/lib
#cgo linux,arm LDFLAGS: -L/usr/local/arm/lib
#define USE_NUM_GMP
@@ -35,11 +35,14 @@ package secp256k1
#define NDEBUG
#include "./libsecp256k1/src/secp256k1.c"
#include "./libsecp256k1/src/modules/recovery/main_impl.h"
+
+typedef void (*callbackFunc) (const char* msg, void* data);
+extern void secp256k1GoPanicIllegal(const char* msg, void* data);
+extern void secp256k1GoPanicError(const char* msg, void* data);
*/
import "C"
import (
- "bytes"
"errors"
"unsafe"
@@ -62,8 +65,16 @@ var context *C.secp256k1_context
func init() {
// around 20 ms on a modern CPU.
context = C.secp256k1_context_create(3) // SECP256K1_START_SIGN | SECP256K1_START_VERIFY
+ C.secp256k1_context_set_illegal_callback(context, C.callbackFunc(C.secp256k1GoPanicIllegal), nil)
+ C.secp256k1_context_set_error_callback(context, C.callbackFunc(C.secp256k1GoPanicError), nil)
}
+var (
+ ErrInvalidMsgLen = errors.New("invalid message length for signature recovery")
+ ErrInvalidSignatureLen = errors.New("invalid signature length")
+ ErrInvalidRecoveryID = errors.New("invalid signature recovery id")
+)
+
func GenerateKeyPair() ([]byte, []byte) {
var seckey []byte = randentropy.GetEntropyCSPRNG(32)
var seckey_ptr *C.uchar = (*C.uchar)(unsafe.Pointer(&seckey[0]))
@@ -177,69 +188,20 @@ func VerifySeckeyValidity(seckey []byte) error {
return nil
}
-func VerifySignatureValidity(sig []byte) bool {
- //64+1
- if len(sig) != 65 {
- return false
- }
- //malleability check, highest bit must be 1
- if (sig[32] & 0x80) == 0x80 {
- return false
- }
- //recovery id check
- if sig[64] >= 4 {
- return false
- }
-
- return true
-}
-
-//for compressed signatures, does not need pubkey
-func VerifySignature(msg []byte, sig []byte, pubkey1 []byte) error {
- if msg == nil || sig == nil || pubkey1 == nil {
- return errors.New("inputs must be non-nil")
- }
- if len(sig) != 65 {
- return errors.New("invalid signature length")
- }
- if len(pubkey1) != 65 {
- return errors.New("Invalid public key length")
- }
-
- //to enforce malleability, highest bit of S must be 0
- //S starts at 32nd byte
- if (sig[32] & 0x80) == 0x80 { //highest bit must be 1
- return errors.New("Signature not malleable")
- }
-
- if sig[64] >= 4 {
- return errors.New("Recover byte invalid")
- }
-
- // if pubkey recovered, signature valid
- pubkey2, err := RecoverPubkey(msg, sig)
- if err != nil {
- return err
- }
- if len(pubkey2) != 65 {
- return errors.New("Invalid recovered public key length")
- }
- if !bytes.Equal(pubkey1, pubkey2) {
- return errors.New("Public key does not match recovered public key")
- }
-
- return nil
-}
-
-// recovers a public key from the signature
+// RecoverPubkey returns the the public key of the signer.
+// msg must be the 32-byte hash of the message to be signed.
+// sig must be a 65-byte compact ECDSA signature containing the
+// recovery id as the last element.
func RecoverPubkey(msg []byte, sig []byte) ([]byte, error) {
- if len(sig) != 65 {
- return nil, errors.New("Invalid signature length")
+ if len(msg) != 32 {
+ return nil, ErrInvalidMsgLen
+ }
+ if err := checkSignature(sig); err != nil {
+ return nil, err
}
msg_ptr := (*C.uchar)(unsafe.Pointer(&msg[0]))
sig_ptr := (*C.uchar)(unsafe.Pointer(&sig[0]))
-
pubkey := make([]byte, 64)
/*
this slice is used for both the recoverable signature and the
@@ -248,17 +210,15 @@ func RecoverPubkey(msg []byte, sig []byte) ([]byte, error) {
pubkey recovery is one bottleneck during load in Ethereum
*/
bytes65 := make([]byte, 65)
-
pubkey_ptr := (*C.secp256k1_pubkey)(unsafe.Pointer(&pubkey[0]))
recoverable_sig_ptr := (*C.secp256k1_ecdsa_recoverable_signature)(unsafe.Pointer(&bytes65[0]))
-
recid := C.int(sig[64])
+
ret := C.secp256k1_ecdsa_recoverable_signature_parse_compact(
context,
recoverable_sig_ptr,
sig_ptr,
recid)
-
if ret == C.int(0) {
return nil, errors.New("Failed to parse signature")
}
@@ -269,20 +229,28 @@ func RecoverPubkey(msg []byte, sig []byte) ([]byte, error) {
recoverable_sig_ptr,
msg_ptr,
)
-
if ret == C.int(0) {
return nil, errors.New("Failed to recover public key")
- } else {
- serialized_pubkey_ptr := (*C.uchar)(unsafe.Pointer(&bytes65[0]))
-
- var output_len C.size_t
- C.secp256k1_ec_pubkey_serialize( // always returns 1
- context,
- serialized_pubkey_ptr,
- &output_len,
- pubkey_ptr,
- 0, // SECP256K1_EC_COMPRESSED
- )
- return bytes65, nil
}
+
+ serialized_pubkey_ptr := (*C.uchar)(unsafe.Pointer(&bytes65[0]))
+ var output_len C.size_t
+ C.secp256k1_ec_pubkey_serialize( // always returns 1
+ context,
+ serialized_pubkey_ptr,
+ &output_len,
+ pubkey_ptr,
+ 0, // SECP256K1_EC_COMPRESSED
+ )
+ return bytes65, nil
+}
+
+func checkSignature(sig []byte) error {
+ if len(sig) != 65 {
+ return ErrInvalidSignatureLen
+ }
+ if sig[64] >= 4 {
+ return ErrInvalidRecoveryID
+ }
+ return nil
}
diff --git a/crypto/secp256k1/secp256_test.go b/crypto/secp256k1/secp256_test.go
index 45c448f3c..cb71ea5e7 100644
--- a/crypto/secp256k1/secp256_test.go
+++ b/crypto/secp256k1/secp256_test.go
@@ -56,6 +56,17 @@ func TestSignatureValidity(t *testing.T) {
}
}
+func TestInvalidRecoveryID(t *testing.T) {
+ _, seckey := GenerateKeyPair()
+ msg := randentropy.GetEntropyCSPRNG(32)
+ sig, _ := Sign(msg, seckey)
+ sig[64] = 99
+ _, err := RecoverPubkey(msg, sig)
+ if err != ErrInvalidRecoveryID {
+ t.Fatalf("got %q, want %q", err, ErrInvalidRecoveryID)
+ }
+}
+
func TestSignAndRecover(t *testing.T) {
pubkey1, seckey := GenerateKeyPair()
msg := randentropy.GetEntropyCSPRNG(32)
@@ -70,10 +81,6 @@ func TestSignAndRecover(t *testing.T) {
if !bytes.Equal(pubkey1, pubkey2) {
t.Errorf("pubkey mismatch: want: %x have: %x", pubkey1, pubkey2)
}
- err = VerifySignature(msg, sig, pubkey1)
- if err != nil {
- t.Errorf("signature verification error: %s", err)
- }
}
func TestRandomMessagesWithSameKey(t *testing.T) {
diff --git a/eth/backend.go b/eth/backend.go
index 761a17a8f..5bd6ac55d 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -231,9 +231,7 @@ type Ethereum struct {
chainDb ethdb.Database // Block chain database
dappDb ethdb.Database // Dapp database
- //*** SERVICES ***
- // State manager for processing new blocks and managing the over all states
- blockProcessor *core.BlockProcessor
+ // Handlers
txPool *core.TxPool
blockchain *core.BlockChain
accountManager *accounts.Manager
@@ -407,8 +405,6 @@ func New(config *Config) (*Ethereum, error) {
newPool := core.NewTxPool(eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
eth.txPool = newPool
- eth.blockProcessor = core.NewBlockProcessor(chainDb, eth.pow, eth.blockchain, eth.EventMux())
- eth.blockchain.SetProcessor(eth.blockProcessor)
if eth.protocolManager, err = NewProtocolManager(config.FastSync, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil {
return nil, err
}
@@ -485,24 +481,23 @@ func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
// func (s *Ethereum) Logger() logger.LogSystem { return s.logger }
-func (s *Ethereum) Name() string { return s.net.Name }
-func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
-func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
-func (s *Ethereum) BlockProcessor() *core.BlockProcessor { return s.blockProcessor }
-func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
-func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper }
-func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
-func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
-func (s *Ethereum) DappDb() ethdb.Database { return s.dappDb }
-func (s *Ethereum) IsListening() bool { return true } // Always listening
-func (s *Ethereum) PeerCount() int { return s.net.PeerCount() }
-func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() }
-func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers }
-func (s *Ethereum) ClientVersion() string { return s.clientVersion }
-func (s *Ethereum) EthVersion() int { return int(s.protocolManager.SubProtocols[0].Version) }
-func (s *Ethereum) NetVersion() int { return s.netVersionId }
-func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
-func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
+func (s *Ethereum) Name() string { return s.net.Name }
+func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
+func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
+func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
+func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper }
+func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
+func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
+func (s *Ethereum) DappDb() ethdb.Database { return s.dappDb }
+func (s *Ethereum) IsListening() bool { return true } // Always listening
+func (s *Ethereum) PeerCount() int { return s.net.PeerCount() }
+func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() }
+func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers }
+func (s *Ethereum) ClientVersion() string { return s.clientVersion }
+func (s *Ethereum) EthVersion() int { return int(s.protocolManager.SubProtocols[0].Version) }
+func (s *Ethereum) NetVersion() int { return s.netVersionId }
+func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
+func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
// Start the ethereum
func (s *Ethereum) Start() error {
diff --git a/eth/backend_test.go b/eth/backend_test.go
index 0379fc843..83219de62 100644
--- a/eth/backend_test.go
+++ b/eth/backend_test.go
@@ -32,7 +32,7 @@ func TestMipmapUpgrade(t *testing.T) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -45,7 +45,7 @@ func TestMipmapUpgrade(t *testing.T) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 153427ee4..c272d05af 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -45,16 +45,17 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request
- hashTTL = 5 * time.Second // [eth/61] Time it takes for a hash request to time out
- blockSoftTTL = 3 * time.Second // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
- blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
- headerTTL = 5 * time.Second // [eth/62] Time it takes for a header request to time out
- bodySoftTTL = 3 * time.Second // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
- bodyHardTTL = 3 * bodySoftTTL // [eth/62] Maximum time allowance before a block body request is considered expired
- receiptSoftTTL = 3 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
- stateSoftTTL = 2 * time.Second // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
- stateHardTTL = 3 * stateSoftTTL // [eth/63] Maximum time allowance before a node data request is considered expired
+ hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
+ blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
+ blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
+
+ headerTTL = 3 * time.Second // [eth/62] Time it takes for a header request to time out
+ bodyTargetRTT = 3 * time.Second / 2 // [eth/62] Target time for completing a block body retrieval request
+ bodyTTL = 3 * bodyTargetRTT // [eth/62] Maximum time allowance before a block body request is considered expired
+ receiptTargetRTT = 3 * time.Second / 2 // [eth/63] Target time for completing a receipt retrieval request
+ receiptTTL = 3 * receiptTargetRTT // [eth/63] Maximum time allowance before a receipt request is considered expired
+ stateTargetRTT = 2 * time.Second / 2 // [eth/63] Target time for completing a state trie retrieval request
+ stateTTL = 3 * stateTargetRTT // [eth/63] Maximum time allowance before a node data request is considered expired
maxQueuedHashes = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
@@ -74,7 +75,6 @@ var (
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active")
- errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer")
@@ -90,6 +90,7 @@ var (
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
+ errCancelProcessing = errors.New("processing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
)
@@ -129,7 +130,6 @@ type Downloader struct {
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
- processing int32
notified int32
// Channels
@@ -215,7 +215,7 @@ func (d *Downloader) Progress() (uint64, uint64, uint64) {
// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
- return atomic.LoadInt32(&d.synchronising) > 0 || atomic.LoadInt32(&d.processing) > 0
+ return atomic.LoadInt32(&d.synchronising) > 0
}
// RegisterPeer injects a new download peer into the set of block source to be
@@ -263,9 +263,6 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id)
- case errPendingQueue:
- glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
@@ -290,10 +287,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
- // Abort if the queue still contains some leftover data
- if d.queue.GetHeadResult() != nil {
- return errPendingQueue
- }
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.peers.Reset()
@@ -335,7 +328,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
defer func() {
// reset on error
if err != nil {
- d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -365,23 +357,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
- // Initiate the sync using a concurrent hash and block retrieval algorithm
+ // Initiate the sync using a concurrent hash and block retrieval algorithm
+ d.queue.Prepare(origin+1, d.mode, 0)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- d.queue.Prepare(origin+1, d.mode, 0)
-
- errc := make(chan error, 2)
- go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
- go func() { errc <- d.fetchBlocks61(origin + 1) }()
-
- // If any fetcher fails, cancel the other
- if err := <-errc; err != nil {
- d.cancel()
- <-errc
- return err
- }
- return <-errc
+ return d.spawnSync(
+ func() error { return d.fetchHashes61(p, td, origin+1) },
+ func() error { return d.fetchBlocks61(origin + 1) },
+ )
case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block
@@ -405,7 +389,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
switch d.mode {
case LightSync:
pivot = latest
-
case FastSync:
// Calculate the new fast/slow sync pivot point
pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
@@ -426,34 +409,51 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
}
d.queue.Prepare(origin+1, d.mode, pivot)
-
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
- errc := make(chan error, 4)
- go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
- go func() { errc <- d.fetchBodies(origin + 1) }() // Bodies are retrieved during normal and fast sync
- go func() { errc <- d.fetchReceipts(origin + 1) }() // Receipts are retrieved during fast sync
- go func() { errc <- d.fetchNodeData() }() // Node state data is retrieved during fast sync
-
- // If any fetcher fails, cancel the others
- var fail error
- for i := 0; i < cap(errc); i++ {
- if err := <-errc; err != nil {
- if fail == nil {
- fail = err
- d.cancel()
- }
- }
- }
- return fail
+ return d.spawnSync(
+ func() error { return d.fetchHeaders(p, td, origin+1) }, // Headers are always retrieved
+ func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync
+ func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
+ func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
+ )
default:
// Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer
}
- return nil
+}
+
+// spawnSync runs d.process and all given fetcher functions to completion in
+// separate goroutines, returning the first error that appears.
+func (d *Downloader) spawnSync(fetchers ...func() error) error {
+ var wg sync.WaitGroup
+ errc := make(chan error, len(fetchers)+1)
+ wg.Add(len(fetchers) + 1)
+ go func() { defer wg.Done(); errc <- d.process() }()
+ for _, fn := range fetchers {
+ fn := fn
+ go func() { defer wg.Done(); errc <- fn() }()
+ }
+ // Wait for the first error, then terminate the others.
+ var err error
+ for i := 0; i < len(fetchers)+1; i++ {
+ if i == len(fetchers) {
+ // Close the queue when all fetchers have exited.
+ // This will cause the block processor to end when
+ // it has processed the queue.
+ d.queue.Close()
+ }
+ if err = <-errc; err != nil {
+ break
+ }
+ }
+ d.queue.Close()
+ d.cancel()
+ wg.Wait()
+ return err
}
// cancel cancels all of the operations and resets the queue. It returns true
@@ -470,12 +470,10 @@ func (d *Downloader) cancel() {
}
}
d.cancelLock.Unlock()
-
- // Reset the queue
- d.queue.Reset()
}
// Terminate interrupts the downloader, canceling all pending operations.
+// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
d.cancel()
@@ -489,21 +487,12 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
- timeout := time.After(blockSoftTTL)
+ timeout := time.After(hashTTL)
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-d.hashCh:
- // Out of bounds hashes received, ignore them
-
case packet := <-d.blockCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
@@ -521,6 +510,16 @@ func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout
+
+ case <-d.hashCh:
+ // Out of bounds hashes received, ignore them
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -571,18 +570,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -631,18 +631,19 @@ func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search hash timeout", p)
+ return 0, errTimeout
+
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search hash timeout", p)
- return 0, errTimeout
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -676,12 +677,6 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHashFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id {
@@ -750,6 +745,13 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1)
return errTimeout
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -774,59 +776,31 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
case <-d.cancelCh:
return errCancelBlockFetch
- case <-d.headerCh:
- // Out of bounds eth/62 block headers received, ignore them
-
- case <-d.bodyCh:
- // Out of bounds eth/62 block bodies received, ignore them
-
case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of blocks, and demote in case of errors
blocks := packet.(*blockPack).blocks
- err := d.queue.DeliverBlocks(peer.id, blocks)
- switch err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above)
- if len(blocks) == 0 {
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
- go d.process()
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of blocks and check chain validity
+ accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: stale delivery", peer)
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ peer.SetBlocksIdle(accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && len(blocks) == 0:
+ glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- peer.SetBlocksIdle()
- glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
- go d.process()
+ glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
}
}
// Blocks arrived, try to update the progress
@@ -859,10 +833,15 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
- for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
+ for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
+ peer.SetBlocksIdle(0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -909,6 +888,13 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable
}
+
+ case <-d.headerCh:
+ case <-d.bodyCh:
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Ignore eth/{62,63} packets because this is eth/61.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -941,18 +927,19 @@ func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
}
return headers[0].Number.Uint64(), nil
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1008,18 +995,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
}
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: head header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: head header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
@@ -1068,18 +1056,19 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
}
start = check
+ case <-timeout:
+ glog.V(logger.Debug).Infof("%v: search header timeout", p)
+ return 0, errTimeout
+
case <-d.bodyCh:
- // Out of bounds block bodies received, ignore them
+ case <-d.stateCh:
+ case <-d.receiptCh:
+ // Out of bounds delivery, ignore
case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
- case <-timeout:
- glog.V(logger.Debug).Infof("%v: search header timeout", p)
- return 0, errTimeout
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1141,12 +1130,6 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
case <-d.cancelCh:
return errCancelHeaderFetch
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-d.headerCh:
// Make sure the active peer is giving us the headers
if packet.PeerId() != p.id {
@@ -1268,6 +1251,11 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
}
}
return nil
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
@@ -1279,14 +1267,14 @@ func (d *Downloader) fetchBodies(from uint64) error {
glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
}
- expire = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireBodies(bodyTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity() }
- setIdle = func(p *peer) { p.SetBodiesIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@@ -1303,14 +1291,14 @@ func (d *Downloader) fetchReceipts(from uint64) error {
glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
}
- expire = func() []string { return d.queue.ExpireReceipts(receiptHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireReceipts(receiptTTL) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity() }
- setIdle = func(p *peer) { p.SetReceiptsIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@@ -1327,7 +1315,7 @@ func (d *Downloader) fetchNodeData() error {
glog.V(logger.Debug).Infof("Downloading node state data")
var (
- deliver = func(packet dataPack) error {
+ deliver = func(packet dataPack) (int, error) {
start := time.Now()
return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(err error, delivered int) {
if err != nil {
@@ -1336,10 +1324,8 @@ func (d *Downloader) fetchNodeData() error {
d.cancel()
return
}
- // Processing succeeded, notify state fetcher and processor of continuation
- if d.queue.PendingNodeData() == 0 {
- go d.process()
- } else {
+ // Processing succeeded, notify state fetcher of continuation
+ if d.queue.PendingNodeData() > 0 {
select {
case d.stateWakeCh <- true:
default:
@@ -1348,19 +1334,18 @@ func (d *Downloader) fetchNodeData() error {
// Log a message to the user and return
d.syncStatsLock.Lock()
defer d.syncStatsLock.Unlock()
-
d.syncStatsStateDone += uint64(delivered)
glog.V(logger.Info).Infof("imported %d state entries in %v: processed %d in total", delivered, time.Since(start), d.syncStatsStateDone)
})
}
- expire = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
+ expire = func() map[string]int { return d.queue.ExpireNodeData(stateTTL) }
throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveNodeData(p, count), false, nil
}
fetch = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
capacity = func(p *peer) int { return p.NodeDataCapacity() }
- setIdle = func(p *peer) { p.SetNodeDataIdle() }
+ setIdle = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
)
err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
@@ -1373,10 +1358,10 @@ func (d *Downloader) fetchNodeData() error {
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
-func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
- expire func() []string, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
+func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
+ expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
- idle func() ([]*peer, int), setIdle func(*peer), kind string) error {
+ idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
@@ -1391,57 +1376,29 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
case <-d.cancelCh:
return errCancel
- case <-d.hashCh:
- // Out of bounds eth/61 hashes received, ignore them
-
- case <-d.blockCh:
- // Out of bounds eth/61 blocks received, ignore them
-
case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
- // Deliver the received chunk of data, and demote in case of errors
- switch err := deliver(packet); err {
- case nil:
- // If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
- if packet.Items() == 0 {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
- break
- }
- // All was successful, promote the peer and potentially start processing
- peer.Promote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
- go d.process()
-
- case errInvalidChain:
- // The hash chain is invalid (blocks are not ordered properly), abort
+ // Deliver the received chunk of data and check chain validity
+ accepted, err := deliver(packet)
+ if err == errInvalidChain {
return err
-
- case errNoFetchesPending:
- // Peer probably timed out with its delivery but came through
- // in the end, demote, but allow to to pull from this peer.
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
-
- case errStaleDelivery:
- // Delivered something completely else than requested, usually
- // caused by a timeout and delivery during a new sync cycle.
- // Don't set it to idle as the original request should still be
- // in flight.
- peer.Demote()
- glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
-
+ }
+ // Unless a peer delivered something completely else than requested (usually
+ // caused by a timed out request which came through in the end), set it to
+ // idle. If the delivery's stale, the peer should have already been idled.
+ if err != errStaleDelivery {
+ setIdle(peer, accepted)
+ }
+ // Issue a log to the user to see what's going on
+ switch {
+ case err == nil && packet.Items() == 0:
+ glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
+ case err == nil:
+ glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
default:
- // Peer did something semi-useful, demote but keep it around
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
- go d.process()
+ glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
}
}
// Blocks assembled, try to update the progress
@@ -1474,11 +1431,15 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
return errNoPeers
}
// Check for fetch request timeouts and demote the responsible peers
- for _, pid := range expire() {
+ for pid, fails := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
- peer.Demote()
- setIdle(peer)
- glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ if fails > 1 {
+ glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
+ setIdle(peer, 0)
+ } else {
+ glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
+ d.dropPeer(pid)
+ }
}
}
// If there's nothing more to fetch, wait or terminate
@@ -1508,7 +1469,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
}
if progress {
progressed = true
- go d.process()
}
if request == nil {
continue
@@ -1540,51 +1500,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable
}
+
+ case <-d.hashCh:
+ case <-d.blockCh:
+ // Ignore eth/61 packets because this is eth/62+.
+ // These can arrive as a late delivery from a previous sync.
}
}
}
// process takes fetch results from the queue and tries to import them into the
-// chain. The type of import operation will depend on the result contents:
-// -
-//
-// The algorithmic flow is as follows:
-// - The `processing` flag is swapped to 1 to ensure singleton access
-// - The current `cancel` channel is retrieved to detect sync abortions
-// - Blocks are iteratively taken from the cache and inserted into the chain
-// - When the cache becomes empty, insertion stops
-// - The `processing` flag is swapped back to 0
-// - A post-exit check is made whether new blocks became available
-// - This step is important: it handles a potential race condition between
-// checking for no more work, and releasing the processing "mutex". In
-// between these state changes, a block may have arrived, but a processing
-// attempt denied, so we need to re-enter to ensure the block isn't left
-// to idle in the cache.
-func (d *Downloader) process() {
- // Make sure only one goroutine is ever allowed to process blocks at once
- if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
- return
- }
- // If the processor just exited, but there are freshly pending items, try to
- // reenter. This is needed because the goroutine spinned up for processing
- // the fresh results might have been rejected entry to to this present thread
- // not yet releasing the `processing` state.
- defer func() {
- if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
- d.process()
- }
- }()
- // Release the lock upon exit (note, before checking for reentry!)
- // the import statistics to zero.
- defer atomic.StoreInt32(&d.processing, 0)
-
- // Repeat the processing as long as there are results to process
+// chain. The type of import operation will depend on the result contents.
+func (d *Downloader) process() error {
+ pivot := d.queue.FastSyncPivot()
for {
- // Fetch the next batch of results
- pivot := d.queue.FastSyncPivot() // Fetch pivot before results to prevent reset race
- results := d.queue.TakeResults()
+ results := d.queue.WaitResults()
if len(results) == 0 {
- return
+ return nil // queue empty
}
if d.chainInsertHook != nil {
d.chainInsertHook(results)
@@ -1597,7 +1529,7 @@ func (d *Downloader) process() {
for len(results) != 0 {
// Check for any termination requests
if atomic.LoadInt32(&d.interrupt) == 1 {
- return
+ return errCancelProcessing
}
// Retrieve the a batch of results to import
var (
@@ -1633,8 +1565,7 @@ func (d *Downloader) process() {
}
if err != nil {
glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
- d.cancel()
- return
+ return err
}
// Shift the results to the next batch
results = results[items:]
@@ -1685,19 +1616,16 @@ func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, i
dropMeter.Mark(int64(packet.Items()))
}
}()
- // Make sure the downloader is active
- if atomic.LoadInt32(&d.synchronising) == 0 {
- return errNoSyncActive
- }
// Deliver or abort if the sync is canceled while queuing
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
-
+ if cancel == nil {
+ return errNoSyncActive
+ }
select {
case destCh <- packet:
return nil
-
case <-cancel:
return errNoSyncActive
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index ef6f74a6b..cfcc8a2ef 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -169,17 +169,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
}
}
dl.lock.RUnlock()
-
- err := dl.downloader.synchronise(id, hash, td, mode)
- for {
- // If the queue is empty and processing stopped, break
- if dl.downloader.queue.Idle() && atomic.LoadInt32(&dl.downloader.processing) == 0 {
- break
- }
- // Otherwise sleep a bit and retry
- time.Sleep(time.Millisecond)
- }
- return err
+ return dl.downloader.synchronise(id, hash, td, mode)
}
// hasHeader checks if a header is present in the testers canonical chain.
@@ -701,6 +691,8 @@ func TestCanonicalSynchronisation64Fast(t *testing.T) { testCanonicalSynchronis
func TestCanonicalSynchronisation64Light(t *testing.T) { testCanonicalSynchronisation(t, 64, LightSync) }
func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -725,6 +717,8 @@ func TestThrottling64Full(t *testing.T) { testThrottling(t, 64, FullSync) }
func TestThrottling64Fast(t *testing.T) { testThrottling(t, 64, FastSync) }
func testThrottling(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long block chain to download and the tester
targetBlocks := 8 * blockCacheLimit
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -757,8 +751,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
for start := time.Now(); time.Since(start) < time.Second; {
time.Sleep(25 * time.Millisecond)
- tester.lock.RLock()
- tester.downloader.queue.lock.RLock()
+ tester.lock.Lock()
+ tester.downloader.queue.lock.Lock()
cached = len(tester.downloader.queue.blockDonePool)
if mode == FastSync {
if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached {
@@ -769,8 +763,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
}
frozen = int(atomic.LoadUint32(&blocked))
retrieved = len(tester.ownBlocks)
- tester.downloader.queue.lock.RUnlock()
- tester.lock.RUnlock()
+ tester.downloader.queue.lock.Unlock()
+ tester.lock.Unlock()
if cached == blockCacheLimit || retrieved+cached+frozen == targetBlocks+1 {
break
@@ -810,6 +804,8 @@ func TestForkedSynchronisation64Fast(t *testing.T) { testForkedSynchronisation(
func TestForkedSynchronisation64Light(t *testing.T) { testForkedSynchronisation(t, 64, LightSync) }
func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a long enough forked chain
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -833,6 +829,7 @@ func testForkedSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither hashes nor blocks are accepted
@@ -847,6 +844,7 @@ func TestInactiveDownloader61(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers and
// bodies.
func TestInactiveDownloader62(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -861,6 +859,7 @@ func TestInactiveDownloader62(t *testing.T) {
// Tests that an inactive downloader will not accept incoming block headers,
// bodies and receipts.
func TestInactiveDownloader63(t *testing.T) {
+ t.Parallel()
tester := newTester()
// Check that neither block headers nor bodies are accepted
@@ -885,6 +884,8 @@ func TestCancel64Fast(t *testing.T) { testCancel(t, 64, FastSync) }
func TestCancel64Light(t *testing.T) { testCancel(t, 64, LightSync) }
func testCancel(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download and the tester
targetBlocks := blockCacheLimit - 15
if targetBlocks >= MaxHashFetch {
@@ -923,6 +924,8 @@ func TestMultiSynchronisation64Fast(t *testing.T) { testMultiSynchronisation(t,
func TestMultiSynchronisation64Light(t *testing.T) { testMultiSynchronisation(t, 64, LightSync) }
func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create various peers with various parts of the chain
targetPeers := 8
targetBlocks := targetPeers*blockCacheLimit - 15
@@ -950,6 +953,8 @@ func TestMultiProtoSynchronisation64Fast(t *testing.T) { testMultiProtoSync(t,
func TestMultiProtoSynchronisation64Light(t *testing.T) { testMultiProtoSync(t, 64, LightSync) }
func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -986,6 +991,8 @@ func TestEmptyShortCircuit64Fast(t *testing.T) { testEmptyShortCircuit(t, 64, F
func TestEmptyShortCircuit64Light(t *testing.T) { testEmptyShortCircuit(t, 64, LightSync) }
func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a block chain to download
targetBlocks := 2*blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1037,6 +1044,8 @@ func TestMissingHeaderAttack64Fast(t *testing.T) { testMissingHeaderAttack(t, 6
func TestMissingHeaderAttack64Light(t *testing.T) { testMissingHeaderAttack(t, 64, LightSync) }
func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1188,6 +1197,8 @@ func TestHighTDStarvationAttack64Fast(t *testing.T) { testHighTDStarvationAttac
func TestHighTDStarvationAttack64Light(t *testing.T) { testHighTDStarvationAttack(t, 64, LightSync) }
func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
tester := newTester()
hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil)
@@ -1209,25 +1220,26 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
result error
drop bool
}{
- {nil, false}, // Sync succeeded, all is well
- {errBusy, false}, // Sync is already in progress, no problem
- {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
- {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
- {errStallingPeer, true}, // Peer was detected to be stalling, drop it
- {errNoPeers, false}, // No peers to download from, soft race, no issue
- {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
- {errTimeout, true}, // No hashes received in due time, drop the peer
- {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
- {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
- {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
- {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
- {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
- {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
- {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
- {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
- {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {nil, false}, // Sync succeeded, all is well
+ {errBusy, false}, // Sync is already in progress, no problem
+ {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
+ {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
+ {errStallingPeer, true}, // Peer was detected to be stalling, drop it
+ {errNoPeers, false}, // No peers to download from, soft race, no issue
+ {errTimeout, true}, // No hashes received in due time, drop the peer
+ {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
+ {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
+ {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
+ {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
+ {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
+ {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
+ {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
+ {errCancelProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
tester := newTester()
@@ -1261,6 +1273,8 @@ func TestSyncProgress64Fast(t *testing.T) { testSyncProgress(t, 64, FastSync) }
func TestSyncProgress64Light(t *testing.T) { testSyncProgress(t, 64, LightSync) }
func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1331,6 +1345,8 @@ func TestForkedSyncProgress64Fast(t *testing.T) { testForkedSyncProgress(t, 64,
func TestForkedSyncProgress64Light(t *testing.T) { testForkedSyncProgress(t, 64, LightSync) }
func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a forked chain to simulate origin revertal
common, fork := MaxHashFetch, 2*MaxHashFetch
hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil)
@@ -1404,6 +1420,8 @@ func TestFailedSyncProgress64Fast(t *testing.T) { testFailedSyncProgress(t, 64,
func TestFailedSyncProgress64Light(t *testing.T) { testFailedSyncProgress(t, 64, LightSync) }
func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small enough block chain to download
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil)
@@ -1478,6 +1496,8 @@ func TestFakedSyncProgress64Fast(t *testing.T) { testFakedSyncProgress(t, 64, F
func TestFakedSyncProgress64Light(t *testing.T) { testFakedSyncProgress(t, 64, LightSync) }
func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+
// Create a small block chain
targetBlocks := blockCacheLimit - 15
hashes, headers, blocks, receipts := makeChain(targetBlocks+3, 0, genesis, nil)
@@ -1541,3 +1561,50 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
t.Fatalf("Final progress mismatch: have %v/%v/%v, want 0-%v/%v/%v", origin, current, latest, targetBlocks, targetBlocks, targetBlocks)
}
}
+
+// This test reproduces an issue where unexpected deliveries would
+// block indefinitely if they arrived at the right time.
+func TestDeliverHeadersHang62(t *testing.T) { testDeliverHeadersHang(t, 62, FullSync) }
+func TestDeliverHeadersHang63Full(t *testing.T) { testDeliverHeadersHang(t, 63, FullSync) }
+func TestDeliverHeadersHang63Fast(t *testing.T) { testDeliverHeadersHang(t, 63, FastSync) }
+func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64, FullSync) }
+func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
+func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
+
+func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
+ t.Parallel()
+ hashes, headers, blocks, receipts := makeChain(5, 0, genesis, nil)
+ fakeHeads := []*types.Header{{}, {}, {}, {}}
+ for i := 0; i < 200; i++ {
+ tester := newTester()
+ tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
+ // Whenever the downloader requests headers, flood it with
+ // a lot of unrequested header deliveries.
+ tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error {
+ deliveriesDone := make(chan struct{}, 500)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ peer := fmt.Sprintf("fake-peer%d", i)
+ go func() {
+ tester.downloader.DeliverHeaders(peer, fakeHeads)
+ deliveriesDone <- struct{}{}
+ }()
+ }
+ // Deliver the actual requested headers.
+ impl := tester.peerGetAbsHeadersFn("peer", 0)
+ go impl(from, count, skip, reverse)
+ // None of the extra deliveries should block.
+ timeout := time.After(5 * time.Second)
+ for i := 0; i < cap(deliveriesDone); i++ {
+ select {
+ case <-deliveriesDone:
+ case <-timeout:
+ panic("blocked")
+ }
+ }
+ return nil
+ }
+ if err := tester.sync("peer", nil, mode); err != nil {
+ t.Errorf("sync failed: %v", err)
+ }
+ }
+}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 9ba6dabbd..80f08b68f 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -30,8 +30,10 @@ import (
"github.com/ethereum/go-ethereum/common"
)
-// Maximum number of entries allowed on the list or lacking items.
-const maxLackingHashes = 4096
+const (
+ maxLackingHashes = 4096 // Maximum number of entries allowed on the list or lacking items
+ throughputImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
+)
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
@@ -59,18 +61,16 @@ type peer struct {
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
- rep int32 // Simple peer reputation
- blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
- receiptCapacity int32 // Number of receipts allowed to fetch per request
- stateCapacity int32 // Number of node data pieces allowed to fetch per request
+ blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
+ receiptThroughput float64 // Number of receipts measured to be retrievable per second
+ stateThroughput float64 // Number of node data pieces measured to be retrievable per second
blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
- lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
- lackingLock sync.RWMutex // Lock protecting the lacking hashes list
+ lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -84,6 +84,7 @@ type peer struct {
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies
+ lock sync.RWMutex
}
// newPeer create a new downloader peer, with specific hash and block retrieval
@@ -93,12 +94,9 @@ func newPeer(id string, version int, head common.Hash,
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{
- id: id,
- head: head,
- blockCapacity: 1,
- receiptCapacity: 1,
- stateCapacity: 1,
- lacking: make(map[common.Hash]struct{}),
+ id: id,
+ head: head,
+ lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -117,15 +115,18 @@ func newPeer(id string, version int, head common.Hash,
// Reset clears the internal state of a peer entity.
func (p *peer) Reset() {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
- atomic.StoreInt32(&p.blockCapacity, 1)
- atomic.StoreInt32(&p.receiptCapacity, 1)
- atomic.StoreInt32(&p.stateCapacity, 1)
+ atomic.StoreInt32(&p.stateIdle, 0)
+
+ p.blockThroughput = 0
+ p.receiptThroughput = 0
+ p.stateThroughput = 0
- p.lackingLock.Lock()
p.lacking = make(map[common.Hash]struct{})
- p.lackingLock.Unlock()
}
// Fetch61 sends a block retrieval request to the remote peer.
@@ -216,107 +217,86 @@ func (p *peer) FetchNodeData(request *fetchRequest) error {
return nil
}
-// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBlocksIdle() {
- p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
+// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
+// requests. Its estimated block retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBlocksIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its block body retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetBodiesIdle() {
- p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBodyFetch, &p.blockCapacity, &p.blockIdle)
+// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
+// requests. Its estimated body retrieval throughput is updated with that measured
+// just now.
+func (p *peer) SetBodiesIdle(delivered int) {
+ p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
}
-// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its receipt retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) SetReceiptsIdle() {
- p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
+// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
+// retrieval requests. Its estimated receipt retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetReceiptsIdle(delivered int) {
+ p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
}
-// SetNodeDataIdle sets the peer to idle, allowing it to execute new retrieval
-// requests. Its node data retrieval allowance will also be updated either up- or
-// downwards, depending on whether the previous fetch completed in time.
-func (p *peer) SetNodeDataIdle() {
- p.setIdle(p.stateStarted, stateSoftTTL, stateSoftTTL, MaxStateFetch, &p.stateCapacity, &p.stateIdle)
+// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
+// data retrieval requests. Its estimated state retrieval throughput is updated
+// with that measured just now.
+func (p *peer) SetNodeDataIdle(delivered int) {
+ p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
-// Its data retrieval allowance will also be updated either up- or downwards,
-// depending on whether the previous fetch completed in time.
-func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
- // Update the peer's download allowance based on previous performance
- scale := 2.0
- if time.Since(started) > softTTL {
- scale = 0.5
- if time.Since(started) > hardTTL {
- scale = 1 / float64(maxFetch) // reduces capacity to 1
- }
- }
- for {
- // Calculate the new download bandwidth allowance
- prev := atomic.LoadInt32(capacity)
- next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
-
- // Try to update the old value
- if atomic.CompareAndSwapInt32(capacity, prev, next) {
- // If we're having problems at 1 capacity, try to find better peers
- if next == 1 {
- p.Demote()
- }
- break
- }
+// Its estimated retrieval throughput is updated with that measured just now.
+func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
+ // Irrelevant of the scaling, make sure the peer ends up idle
+ defer atomic.StoreInt32(idle, 0)
+
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum
+ if delivered == 0 {
+ *throughput = 0
+ return
}
- // Set the peer to idle to allow further fetch requests
- atomic.StoreInt32(idle, 0)
+ // Otherwise update the throughput with a new measurement
+ measured := float64(delivered) / (float64(time.Since(started)+1) / float64(time.Second)) // +1 (ns) to ensure non-zero divisor
+ *throughput = (1-throughputImpact)*(*throughput) + throughputImpact*measured
}
// BlockCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
+// previously discovered throughput.
func (p *peer) BlockCapacity() int {
- return int(atomic.LoadInt32(&p.blockCapacity))
-}
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-// ReceiptCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) ReceiptCapacity() int {
- return int(atomic.LoadInt32(&p.receiptCapacity))
+ return int(math.Max(1, math.Min(p.blockThroughput*float64(blockTargetRTT)/float64(time.Second), float64(MaxBlockFetch))))
}
-// NodeDataCapacity retrieves the peers block download allowance based on its
-// previously discovered bandwidth capacity.
-func (p *peer) NodeDataCapacity() int {
- return int(atomic.LoadInt32(&p.stateCapacity))
-}
+// ReceiptCapacity retrieves the peers receipt download allowance based on its
+// previously discovered throughput.
+func (p *peer) ReceiptCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-// Promote increases the peer's reputation.
-func (p *peer) Promote() {
- atomic.AddInt32(&p.rep, 1)
+ return int(math.Max(1, math.Min(p.receiptThroughput*float64(receiptTargetRTT)/float64(time.Second), float64(MaxReceiptFetch))))
}
-// Demote decreases the peer's reputation or leaves it at 0.
-func (p *peer) Demote() {
- for {
- // Calculate the new reputation value
- prev := atomic.LoadInt32(&p.rep)
- next := prev / 2
+// NodeDataCapacity retrieves the peers state download allowance based on its
+// previously discovered throughput.
+func (p *peer) NodeDataCapacity() int {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
- // Try to update the old value
- if atomic.CompareAndSwapInt32(&p.rep, prev, next) {
- return
- }
- }
+ return int(math.Max(1, math.Min(p.stateThroughput*float64(stateTargetRTT)/float64(time.Second), float64(MaxStateFetch))))
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.
func (p *peer) MarkLacking(hash common.Hash) {
- p.lackingLock.Lock()
- defer p.lackingLock.Unlock()
+ p.lock.Lock()
+ defer p.lock.Unlock()
for len(p.lacking) >= maxLackingHashes {
for drop, _ := range p.lacking {
@@ -330,8 +310,8 @@ func (p *peer) MarkLacking(hash common.Hash) {
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it).
func (p *peer) Lacks(hash common.Hash) bool {
- p.lackingLock.RLock()
- defer p.lackingLock.RUnlock()
+ p.lock.RLock()
+ defer p.lock.RUnlock()
_, ok := p.lacking[hash]
return ok
@@ -339,13 +319,13 @@ func (p *peer) Lacks(hash common.Hash) bool {
// String implements fmt.Stringer.
func (p *peer) String() string {
- p.lackingLock.RLock()
- defer p.lackingLock.RUnlock()
+ p.lock.RLock()
+ defer p.lock.RUnlock()
return fmt.Sprintf("Peer %s [%s]", p.id,
- fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
- fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
- fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
+ fmt.Sprintf("blocks %3.2f/s, ", p.blockThroughput)+
+ fmt.Sprintf("receipts %3.2f/s, ", p.receiptThroughput)+
+ fmt.Sprintf("states %3.2f/s, ", p.stateThroughput)+
fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
@@ -377,6 +357,10 @@ func (ps *peerSet) Reset() {
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
+//
+// The method also sets the starting throughput values of the new peer to the
+// average of all existing peers, to give it a realistic change of being used
+// for data retrievals.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
@@ -384,6 +368,20 @@ func (ps *peerSet) Register(p *peer) error {
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
+ if len(ps.peers) > 0 {
+ p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0
+
+ for _, peer := range ps.peers {
+ peer.lock.RLock()
+ p.blockThroughput += peer.blockThroughput
+ p.receiptThroughput += peer.receiptThroughput
+ p.stateThroughput += peer.stateThroughput
+ peer.lock.RUnlock()
+ }
+ p.blockThroughput /= float64(len(ps.peers))
+ p.receiptThroughput /= float64(len(ps.peers))
+ p.stateThroughput /= float64(len(ps.peers))
+ }
ps.peers[p.id] = p
return nil
}
@@ -435,7 +433,12 @@ func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(61, 61, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(61, 61, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -444,7 +447,12 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
- return ps.idlePeers(62, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.blockThroughput
+ }
+ return ps.idlePeers(62, 64, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -453,7 +461,12 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.receiptThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -462,12 +475,18 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
- return ps.idlePeers(63, 64, idle)
+ throughput := func(p *peer) float64 {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ return p.stateThroughput
+ }
+ return ps.idlePeers(63, 64, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
-func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool) ([]*peer, int) {
+// The resulting set of peers are sorted by their measure throughput.
+func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
@@ -482,7 +501,7 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer)
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
- if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
+ if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i]
}
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 1fb5b6e12..1e55560db 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -101,11 +101,14 @@ type queue struct {
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
- lock sync.RWMutex
+ lock *sync.Mutex
+ active *sync.Cond
+ closed bool
}
// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(stateDb ethdb.Database) *queue {
+ lock := new(sync.Mutex)
return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
@@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue {
statePendPool: make(map[string]*fetchRequest),
stateDatabase: stateDb,
resultCache: make([]*fetchResult, blockCacheLimit),
+ active: sync.NewCond(lock),
+ lock: lock,
}
}
@@ -133,6 +138,7 @@ func (q *queue) Reset() {
q.stateSchedLock.Lock()
defer q.stateSchedLock.Unlock()
+ q.closed = false
q.mode = FullSync
q.fastSyncPivot = 0
@@ -162,18 +168,27 @@ func (q *queue) Reset() {
q.resultOffset = 0
}
+// Close marks the end of the sync, unblocking WaitResults.
+// It may be called even if the queue is already closed.
+func (q *queue) Close() {
+ q.lock.Lock()
+ q.closed = true
+ q.lock.Unlock()
+ q.active.Broadcast()
+}
+
// PendingBlocks retrieves the number of block (body) requests pending for retrieval.
func (q *queue) PendingBlocks() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) PendingReceipts() int {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.receiptTaskQueue.Size()
}
@@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int {
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.blockPendPool) > 0
}
@@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool {
// InFlightReceipts retrieves whether there are receipt fetch requests currently
// in flight.
func (q *queue) InFlightReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.receiptPendPool) > 0
}
@@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool {
// InFlightNodeData retrieves whether there are node data entry fetch requests
// currently in flight.
func (q *queue) InFlightNodeData() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
}
@@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool {
// Idle returns if the queue is fully idle or has some data still inside. This
// method is used by the tester to detect termination events.
func (q *queue) Idle() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
@@ -237,8 +252,8 @@ func (q *queue) Idle() bool {
// FastSyncPivot retrieves the currently used fast sync pivot point.
func (q *queue) FastSyncPivot() uint64 {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
return q.fastSyncPivot
}
@@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 {
// ShouldThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ShouldThrottleBlocks() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight block (body) requests
pending := 0
@@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool {
// ShouldThrottleReceipts checks if the download should be throttled (active receipt
// fetches exceed block cache).
func (q *queue) ShouldThrottleReceipts() bool {
- q.lock.RLock()
- defer q.lock.RUnlock()
+ q.lock.Lock()
+ defer q.lock.Unlock()
// Calculate the currently in-flight receipt requests
pending := 0
@@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
return inserts
}
-// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
-// been downloaded yet (or simply non existent).
-func (q *queue) GetHeadResult() *fetchResult {
- q.lock.RLock()
- defer q.lock.RUnlock()
+// WaitResults retrieves and permanently removes a batch of fetch
+// results from the cache. the result slice will be empty if the queue
+// has been closed.
+func (q *queue) WaitResults() []*fetchResult {
+ q.lock.Lock()
+ defer q.lock.Unlock()
- // If there are no results pending, return nil
- if len(q.resultCache) == 0 || q.resultCache[0] == nil {
- return nil
- }
- // If the next result is still incomplete, return nil
- if q.resultCache[0].Pending > 0 {
- return nil
+ nproc := q.countProcessableItems()
+ for nproc == 0 && !q.closed {
+ q.active.Wait()
+ nproc = q.countProcessableItems()
}
- // If the next result is the fast sync pivot...
- if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot {
- // If the pivot state trie is still being pulled, return nil
- if len(q.stateTaskPool) > 0 {
- return nil
+ results := make([]*fetchResult, nproc)
+ copy(results, q.resultCache[:nproc])
+ if len(results) > 0 {
+ // Mark results as done before dropping them from the cache.
+ for _, result := range results {
+ hash := result.Header.Hash()
+ delete(q.blockDonePool, hash)
+ delete(q.receiptDonePool, hash)
}
- if q.PendingNodeData() > 0 {
- return nil
- }
- // If the state is done, but not enough post-pivot headers were verified, stall...
- for i := 0; i < fsHeaderForceVerify; i++ {
- if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil {
- return nil
- }
+ // Delete the results from the cache and clear the tail.
+ copy(q.resultCache, q.resultCache[nproc:])
+ for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ {
+ q.resultCache[i] = nil
}
+ // Advance the expected block number of the first cache entry.
+ q.resultOffset += uint64(nproc)
}
- return q.resultCache[0]
+ return results
}
-// TakeResults retrieves and permanently removes a batch of fetch results from
-// the cache.
-func (q *queue) TakeResults() []*fetchResult {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Accumulate all available results
- results := []*fetchResult{}
+// countProcessableItems counts the processable items.
+func (q *queue) countProcessableItems() int {
for i, result := range q.resultCache {
- // Stop if no more results are ready
+ // Don't process incomplete or unavailable items.
if result == nil || result.Pending > 0 {
- break
+ return i
}
- // The fast sync pivot block may only be processed after state fetch completes
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot {
- if len(q.stateTaskPool) > 0 {
- break
- }
- if q.PendingNodeData() > 0 {
- break
- }
- // Even is state fetch is done, ensure post-pivot headers passed verifications
- safe := true
- for j := 0; j < fsHeaderForceVerify; j++ {
- if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
- safe = false
+ // Special handling for the fast-sync pivot block:
+ if q.mode == FastSync {
+ bnum := result.Header.Number.Uint64()
+ if bnum == q.fastSyncPivot {
+ // If the state of the pivot block is not
+ // available yet, we cannot proceed and return 0.
+ //
+ // Stop before processing the pivot block to ensure that
+ // resultCache has space for fsHeaderForceVerify items. Not
+ // doing this could leave us unable to download the required
+ // amount of headers.
+ if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
+ return i
+ }
+ for j := 0; j < fsHeaderForceVerify; j++ {
+ if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil {
+ return i
+ }
}
}
- if !safe {
- break
+ // If we're just the fast sync pivot, stop as well
+ // because the following batch needs different insertion.
+ // This simplifies handling the switchover in d.process.
+ if bnum == q.fastSyncPivot+1 && i > 0 {
+ return i
}
}
- // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion
- if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 {
- break
- }
- results = append(results, result)
-
- hash := result.Header.Hash()
- delete(q.blockDonePool, hash)
- delete(q.receiptDonePool, hash)
- }
- // Delete the results from the slice and let them be garbage collected
- // without this slice trick the results would stay in memory until nil
- // would be assigned to them.
- copy(q.resultCache, q.resultCache[len(results):])
- for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
- q.resultCache[k] = nil
}
- q.resultOffset += uint64(len(results))
-
- return results
+ return len(q.resultCache)
}
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
@@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
// If we're the first to request this task, initialise the result container
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
+ common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
+ if progress {
+ // Wake WaitResults, resultCache was modified
+ q.active.Signal()
+ }
// Assemble and return the block download request
if len(send) == 0 {
return nil, progress, nil
@@ -700,7 +703,7 @@ func (q *queue) Revoke(peerId string) {
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) []string {
+func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -709,7 +712,7 @@ func (q *queue) ExpireBlocks(timeout time.Duration) []string {
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBodies(timeout time.Duration) []string {
+func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -718,7 +721,7 @@ func (q *queue) ExpireBodies(timeout time.Duration) []string {
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireReceipts(timeout time.Duration) []string {
+func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -727,7 +730,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) []string {
// ExpireNodeData checks for in flight node data requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireNodeData(timeout time.Duration) []string {
+func (q *queue) ExpireNodeData(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
@@ -737,12 +740,12 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string {
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
//
-// Note, this method expects the queue lock to be already held for writing. The
+// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string {
+func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
- peers := []string{}
+ expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
// Update the metrics with the timeout
@@ -755,25 +758,32 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
- peers = append(peers, id)
+ // Add the peer to the expiry report along the the number of failed requests
+ expirations := len(request.Hashes)
+ if expirations < len(request.Headers) {
+ expirations = len(request.Headers)
+ }
+ expiries[id] = expirations
}
}
// Remove the expired requests from the pending pool
- for _, id := range peers {
+ for id, _ := range expiries {
delete(pendPool, id)
}
- return peers
+ return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
+// DeliverBlocks injects a block retrieval response into the download queue. The
+// method returns the number of blocks accepted from the delivery and also wakes
+// any threads waiting for data delivery.
+func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
@@ -785,7 +795,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
}
}
// Iterate over the downloaded blocks and add each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
@@ -808,29 +818,33 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
delete(request.Hashes, hash)
delete(q.hashPool, hash)
+ accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return errs[0]
-
+ return accepted, errs[0]
case len(errs) == len(blocks):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverBodies injects a block body retrieval response into the results queue.
-func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
+// The method returns the number of blocks bodies accepted from the delivery and
+// also wakes any threads waiting for data delivery.
+func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -846,7 +860,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
-func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
+// The method returns the number of transaction receipts accepted from the delivery
+// and also wakes any threads waiting for data delivery.
+func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@@ -865,12 +881,14 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
-func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
- donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
+func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
+ pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
+ results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
+
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
@@ -883,8 +901,9 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
}
// Assemble each of the results with their headers and retrieved data parts
var (
- failure error
- useful bool
+ accepted int
+ failure error
+ useful bool
)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@@ -904,6 +923,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
useful = true
+ accepted++
// Clean up a successful fetch
request.Headers[i] = nil
@@ -915,28 +935,32 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
+ // Wake up WaitResults
+ if accepted > 0 {
+ q.active.Signal()
+ }
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
- return failure
-
+ return accepted, failure
case useful:
- return fmt.Errorf("partial failure: %v", failure)
-
+ return accepted, fmt.Errorf("partial failure: %v", failure)
default:
- return errStaleDelivery
+ return accepted, errStaleDelivery
}
}
// DeliverNodeData injects a node state data retrieval response into the queue.
-func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) error {
+// The method returns the number of node state entries originally requested, and
+// the number of them actually accepted from the delivery.
+func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the data was never requested
request := q.statePendPool[id]
if request == nil {
- return errNoFetchesPending
+ return 0, errNoFetchesPending
}
stateReqTimer.UpdateSince(request.Time)
delete(q.statePendPool, id)
@@ -948,10 +972,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
}
// Iterate over the downloaded data and verify each of them
- errs := make([]error, 0)
+ accepted, errs := 0, make([]error, 0)
process := []trie.SyncResult{}
for _, blob := range data {
- // Skip any blocks that were not requested
+ // Skip any state trie entires that were not requested
hash := common.BytesToHash(crypto.Sha3(blob))
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
@@ -959,6 +983,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
}
// Inject the next state trie item into the processing queue
process = append(process, trie.SyncResult{hash, blob})
+ accepted++
delete(request.Hashes, hash)
delete(q.stateTaskPool, hash)
@@ -976,19 +1001,21 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If none of the data items were good, it's a stale delivery
switch {
case len(errs) == 0:
- return nil
-
+ return accepted, nil
case len(errs) == len(request.Hashes):
- return errStaleDelivery
-
+ return accepted, errStaleDelivery
default:
- return fmt.Errorf("multiple failures: %v", errs)
+ return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// deliverNodeData is the asynchronous node data processor that injects a batch
// of sync results into the state scheduler.
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) {
+ // Wake up WaitResults after the state has been written because it
+ // might be waiting for the pivot block state to get completed.
+ defer q.active.Signal()
+
// Process results one by one to permit task fetches in between
for i, result := range results {
q.stateSchedLock.Lock()
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index a5418e2e7..5772114b3 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -64,7 +64,7 @@ func BenchmarkMipmaps(b *testing.B) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
b.Fatal(err)
}
@@ -78,7 +78,7 @@ func BenchmarkMipmaps(b *testing.B) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
b.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
b.Fatal("error writing block receipts:", err)
}
}
@@ -163,7 +163,7 @@ func TestFilters(t *testing.T) {
}
// store the receipts
- err := core.PutReceipts(db, receipts)
+ err := core.WriteReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
@@ -180,7 +180,7 @@ func TestFilters(t *testing.T) {
if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
- if err := core.PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
+ if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}
diff --git a/eth/gasprice.go b/eth/gasprice.go
index b752c22dd..e0de89e62 100644
--- a/eth/gasprice.go
+++ b/eth/gasprice.go
@@ -166,7 +166,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) {
func (self *GasPriceOracle) lowestPrice(block *types.Block) *big.Int {
gasUsed := big.NewInt(0)
- receipts := self.eth.BlockProcessor().GetBlockReceipts(block.Hash())
+ receipts := core.GetBlockReceipts(self.eth.ChainDb(), block.Hash())
if len(receipts) > 0 {
if cgu := receipts[len(receipts)-1].CumulativeGasUsed; cgu != nil {
gasUsed = receipts[len(receipts)-1].CumulativeGasUsed
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 65fccf7b4..bbd1fb818 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -35,9 +35,7 @@ func newTestProtocolManager(fastSync bool, blocks int, generator func(int, *core
db, _ = ethdb.NewMemDatabase()
genesis = core.WriteGenesisBlockForTesting(db, core.GenesisAccount{testBankAddress, testBankFunds})
blockchain, _ = core.NewBlockChain(db, pow, evmux)
- blockproc = core.NewBlockProcessor(db, pow, blockchain, evmux)
)
- blockchain.SetProcessor(blockproc)
chain, _ := core.GenerateChain(genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
diff --git a/eth/sync.go b/eth/sync.go
index bbf2abc04..dd8aef8e4 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -175,10 +175,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
}
// If fast sync was enabled, and we synced up, disable it
if pm.fastSync {
- // Wait until all pending imports finish processing
- for pm.downloader.Synchronising() {
- time.Sleep(100 * time.Millisecond)
- }
// Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 {
glog.V(logger.Info).Infof("fast sync complete, auto disabling")
diff --git a/miner/worker.go b/miner/worker.go
index 2d072ef60..aa0fa85cb 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -100,7 +100,7 @@ type worker struct {
eth core.Backend
chain *core.BlockChain
- proc *core.BlockProcessor
+ proc core.Validator
chainDb ethdb.Database
coinbase common.Address
@@ -131,7 +131,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
recv: make(chan *Result, resultQueueSize),
gasPrice: new(big.Int),
chain: eth.BlockChain(),
- proc: eth.BlockProcessor(),
+ proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
txQueue: make(map[common.Hash]*types.Transaction),
@@ -244,7 +244,7 @@ func (self *worker) update() {
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
- self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.proc)
+ self.current.commitTransactions(types.Transactions{ev.Tx}, self.gasPrice, self.chain)
self.currentMu.Unlock()
}
}
@@ -290,7 +290,9 @@ func (self *worker) wait() {
glog.V(logger.Error).Infoln("Invalid block found during mining")
continue
}
- if err := core.ValidateHeader(self.eth.BlockProcessor().Pow, block.Header(), parent.Header(), true, false); err != nil && err != core.BlockFutureErr {
+
+ auxValidator := self.eth.BlockChain().AuxValidator()
+ if err := core.ValidateHeader(auxValidator, block.Header(), parent.Header(), true, false); err != nil && err != core.BlockFutureErr {
glog.V(logger.Error).Infoln("Invalid header on mined block:", err)
continue
}
@@ -303,9 +305,9 @@ func (self *worker) wait() {
// check if canon block and write transactions
if stat == core.CanonStatTy {
// This puts transactions in a extra db for rpc
- core.PutTransactions(self.chainDb, block, block.Transactions())
+ core.WriteTransactions(self.chainDb, block)
// store the receipts
- core.PutReceipts(self.chainDb, work.receipts)
+ core.WriteReceipts(self.chainDb, work.receipts)
// Write map map bloom filters
core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts)
}
@@ -318,7 +320,7 @@ func (self *worker) wait() {
self.mux.Post(core.ChainHeadEvent{block})
self.mux.Post(logs)
}
- if err := core.PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
+ if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
}(block, work.state.Logs(), work.receipts)
@@ -516,7 +518,7 @@ func (self *worker) commitNewWork() {
transactions := append(singleTxOwner, multiTxOwner...)
*/
- work.commitTransactions(transactions, self.gasPrice, self.proc)
+ work.commitTransactions(transactions, self.gasPrice, self.chain)
self.eth.TxPool().RemoveTransactions(work.lowGasTxs)
// compute uncles for the new block.
@@ -575,9 +577,8 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil
}
-func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, proc *core.BlockProcessor) {
+func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
gp := new(core.GasPool).AddGas(env.header.GasLimit)
-
for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from, _ := tx.From()
@@ -615,7 +616,7 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
- err := env.commitTransaction(tx, proc, gp)
+ err := env.commitTransaction(tx, bc, gp)
switch {
case core.IsGasLimitErr(err):
// ignore the transactor so no nonce errors will be thrown for this account
@@ -635,9 +636,9 @@ func (env *Work) commitTransactions(transactions types.Transactions, gasPrice *b
}
}
-func (env *Work) commitTransaction(tx *types.Transaction, proc *core.BlockProcessor, gp *core.GasPool) error {
+func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) error {
snap := env.state.Copy()
- receipt, _, err := proc.ApplyTransaction(gp, env.state, env.header, tx, env.header.GasUsed, true)
+ receipt, _, _, err := core.ApplyTransaction(bc, gp, env.state, env.header, tx, env.header.GasUsed)
if err != nil {
env.state.Set(snap)
return err
diff --git a/rpc/api/debug.go b/rpc/api/debug.go
index d2cbc7f19..a6faa335e 100644
--- a/rpc/api/debug.go
+++ b/rpc/api/debug.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/ethereum/ethash"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth"
@@ -166,11 +167,30 @@ func (self *debugApi) ProcessBlock(req *shared.Request) (interface{}, error) {
defer func() { vm.Debug = old }()
vm.Debug = true
- _, err := self.ethereum.BlockProcessor().RetryProcess(block)
- if err == nil {
- return true, nil
+ var (
+ blockchain = self.ethereum.BlockChain()
+ validator = blockchain.Validator()
+ processor = blockchain.Processor()
+ )
+
+ err := core.ValidateHeader(blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash()), true, false)
+ if err != nil {
+ return false, err
}
- return false, err
+ statedb, err := state.New(blockchain.GetBlock(block.ParentHash()).Root(), self.ethereum.ChainDb())
+ if err != nil {
+ return false, err
+ }
+ receipts, _, usedGas, err := processor.Process(block, statedb)
+ if err != nil {
+ return false, err
+ }
+ err = validator.ValidateState(block, blockchain.GetBlock(block.ParentHash()), statedb, receipts, usedGas)
+ if err != nil {
+ return false, err
+ }
+
+ return true, nil
}
func (self *debugApi) SeedHash(req *shared.Request) (interface{}, error) {
diff --git a/rpc/api/utils.go b/rpc/api/utils.go
index 5a3ade46b..8351e88d3 100644
--- a/rpc/api/utils.go
+++ b/rpc/api/utils.go
@@ -130,7 +130,7 @@ var (
},
"shh": []string{
"post",
- "newIdentify",
+ "newIdentity",
"hasIdentity",
"newGroup",
"addToGroup",
diff --git a/xeth/xeth.go b/xeth/xeth.go
index 04fdea1d2..19c42a9a3 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -322,44 +322,11 @@ func (self *XEth) EthBlockByHash(strHash string) *types.Block {
return block
}
-func (self *XEth) EthTransactionByHash(hash string) (tx *types.Transaction, blhash common.Hash, blnum *big.Int, txi uint64) {
- // Due to increasing return params and need to determine if this is from transaction pool or
- // some chain, this probably needs to be refactored for more expressiveness
- data, _ := self.backend.ChainDb().Get(common.FromHex(hash))
- if len(data) != 0 {
- dtx := new(types.Transaction)
- if err := rlp.DecodeBytes(data, dtx); err != nil {
- glog.V(logger.Error).Infoln(err)
- return
- }
- tx = dtx
- } else { // check pending transactions
- tx = self.backend.TxPool().GetTransaction(common.HexToHash(hash))
- }
-
- // meta
- var txExtra struct {
- BlockHash common.Hash
- BlockIndex uint64
- Index uint64
- }
-
- v, dberr := self.backend.ChainDb().Get(append(common.FromHex(hash), 0x0001))
- // TODO check specifically for ErrNotFound
- if dberr != nil {
- return
+func (self *XEth) EthTransactionByHash(hash string) (*types.Transaction, common.Hash, uint64, uint64) {
+ if tx, hash, number, index := core.GetTransaction(self.backend.ChainDb(), common.HexToHash(hash)); tx != nil {
+ return tx, hash, number, index
}
- r := bytes.NewReader(v)
- err := rlp.Decode(r, &txExtra)
- if err == nil {
- blhash = txExtra.BlockHash
- blnum = big.NewInt(int64(txExtra.BlockIndex))
- txi = txExtra.Index
- } else {
- glog.V(logger.Error).Infoln(err)
- }
-
- return
+ return self.backend.TxPool().GetTransaction(common.HexToHash(hash)), common.Hash{}, 0, 0
}
func (self *XEth) BlockByNumber(num int64) *Block {
@@ -379,7 +346,7 @@ func (self *XEth) CurrentBlock() *types.Block {
}
func (self *XEth) GetBlockReceipts(bhash common.Hash) types.Receipts {
- return self.backend.BlockProcessor().GetBlockReceipts(bhash)
+ return core.GetBlockReceipts(self.backend.ChainDb(), bhash)
}
func (self *XEth) GetTxReceipt(txhash common.Hash) *types.Receipt {