From a1d9ef48c505ab4314ca8e3ee1fc272032da3034 Mon Sep 17 00:00:00 2001 From: Jeffrey Wilcke Date: Mon, 19 Oct 2015 16:08:17 +0200 Subject: core, eth, rpc: split out block validator and state processor This removes the burden on a single object to take care of all validation and state processing. Now instead the validation is done by the `core.BlockValidator` (`types.Validator`) that takes care of both header and uncle validation through the `ValidateBlock` method and state validation through the `ValidateState` method. The state processing is done by a new object `core.StateProcessor` (`types.Processor`) and accepts a new state as input and uses that to process the given block's transactions (and uncles for rewords) to calculate the state root for the next block (P_n + 1). --- core/blockchain.go | 134 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 20 deletions(-) (limited to 'core/blockchain.go') diff --git a/core/blockchain.go b/core/blockchain.go index cea346e38..b6b00ca04 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -61,17 +62,34 @@ const ( blockCacheLimit = 256 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 + // must be bumped when consensus algorithm is changed, this forces the upgradedb + // command to be run (forces the blocks to be imported again using the new algorithm) + BlockChainVersion = 3 ) +// BlockChain represents the canonical chain given a database with a genesis +// block. The Blockchain manages chain imports, reverts, chain reorganisations. +// +// Importing blocks in to the block chain happens according to the set of rules +// defined by the two stage Validator. Processing of blocks is done using the +// Processor which processes the included transaction. The validation of the state +// is done in the second part of the Validator. Failing results in aborting of +// the import. +// +// The BlockChain also helps in returning blocks from **any** chain included +// in the database as well as blocks that represents the canonical chain. It's +// important to note that GetBlock can return any block and does not need to be +// included in the canonical one where as GetBlockByNumber always represents the +// canonical chain. type BlockChain struct { chainDb ethdb.Database - processor types.BlockProcessor eventMux *event.TypeMux genesisBlock *types.Block // Last known total difficulty mu sync.RWMutex chainmu sync.RWMutex tsmu sync.RWMutex + procmu sync.RWMutex checkpoint int // checkpoint counts towards the new checkpoint currentHeader *types.Header // Current head of the header chain (may be above the block chain!) @@ -91,10 +109,15 @@ type BlockChain struct { procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup - pow pow.PoW - rand *mrand.Rand + pow pow.PoW + rand *mrand.Rand + processor Processor + validator Validator } +// NewBlockChain returns a fully initialised block chain using information +// available in the database. It initialiser the default Ethereum Validator and +// Processor. func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) { headerCache, _ := lru.New(headerCacheLimit) bodyCache, _ := lru.New(bodyCacheLimit) @@ -121,6 +144,8 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl return nil, err } bc.rand = mrand.New(mrand.NewSource(seed.Int64())) + bc.SetValidator(NewBlockValidator(bc, pow)) + bc.SetProcessor(NewStateProcessor(bc)) bc.genesisBlock = bc.GetBlockByNumber(0) if bc.genesisBlock == nil { @@ -292,6 +317,7 @@ func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { return nil } +// GasLimit returns the gas limit of the current HEAD block. func (self *BlockChain) GasLimit() *big.Int { self.mu.RLock() defer self.mu.RUnlock() @@ -299,6 +325,7 @@ func (self *BlockChain) GasLimit() *big.Int { return self.currentBlock.GasLimit() } +// LastBlockHash return the hash of the HEAD block. func (self *BlockChain) LastBlockHash() common.Hash { self.mu.RLock() defer self.mu.RUnlock() @@ -333,6 +360,8 @@ func (self *BlockChain) CurrentFastBlock() *types.Block { return self.currentFastBlock } +// Status returns status information about the current chain such as the HEAD Td, +// the HEAD hash and the hash of the genesis block. func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) { self.mu.RLock() defer self.mu.RUnlock() @@ -340,10 +369,38 @@ func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesis return self.GetTd(self.currentBlock.Hash()), self.currentBlock.Hash(), self.genesisBlock.Hash() } -func (self *BlockChain) SetProcessor(proc types.BlockProcessor) { - self.processor = proc +// SetProcessor sets the processor required for making state modifications. +func (self *BlockChain) SetProcessor(processor Processor) { + self.procmu.Lock() + defer self.procmu.Unlock() + self.processor = processor +} + +// SetValidator sets the validator which is used to validate incoming blocks. +func (self *BlockChain) SetValidator(validator Validator) { + self.procmu.Lock() + defer self.procmu.Unlock() + self.validator = validator } +// Validator returns the current validator. +func (self *BlockChain) Validator() Validator { + self.procmu.RLock() + defer self.procmu.RUnlock() + return self.validator +} + +// Processor returns the current processor. +func (self *BlockChain) Processor() Processor { + self.procmu.RLock() + defer self.procmu.RUnlock() + return self.processor +} + +// AuxValidator returns the auxiliary validator (Proof of work atm) +func (self *BlockChain) AuxValidator() pow.PoW { return self.pow } + +// State returns a new mutable state based on the current HEAD block. func (self *BlockChain) State() (*state.StateDB, error) { return state.New(self.CurrentBlock().Root(), self.chainDb) } @@ -606,6 +663,8 @@ func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*type return uncles } +// Stop stops the blockchain service. If any imports are currently in progress +// it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return @@ -758,9 +817,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) var err error if index == 0 { - err = self.processor.ValidateHeader(header, checkPow, false) + err = self.Validator().ValidateHeader(header, self.GetHeader(header.ParentHash), checkPow) } else { - err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false) + err = self.Validator().ValidateHeader(header, chain[index-1], checkPow) } if err != nil { errs[index] = err @@ -1025,9 +1084,10 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { // faster than direct delivery and requires much less mutex // acquiring. var ( - stats struct{ queued, processed, ignored int } - events = make([]interface{}, 0, len(chain)) - tstart = time.Now() + stats struct{ queued, processed, ignored int } + events = make([]interface{}, 0, len(chain)) + coalescedLogs vm.Logs + tstart = time.Now() nonceChecked = make([]bool, len(chain)) ) @@ -1057,12 +1117,12 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { if BadHashes[block.Hash()] { err := BadHashError(block.Hash()) - blockErr(block, err) + reportBlock(block, err) return i, err } - // Call in to the block processor and check for errors. It's likely that if one block fails - // all others will fail too (unless a known block is returned). - logs, receipts, err := self.processor.Process(block) + // Stage 1 validation of the block using the chain's validator + // interface. + err := self.Validator().ValidateBlock(block) if err != nil { if IsKnownBlockErr(err) { stats.ignored++ @@ -1089,14 +1149,41 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { continue } - blockErr(block, err) + reportBlock(block, err) - go ReportBlock(block, err) + return i, err + } + // Create a new statedb using the parent block and report an + // error if it fails. + statedb, err := state.New(self.GetBlock(block.ParentHash()).Root(), self.chainDb) + if err != nil { + reportBlock(block, err) return i, err } + // Process block using the parent state as reference point. + receipts, logs, usedGas, err := self.processor.Process(block, statedb) + if err != nil { + reportBlock(block, err) + return i, err + } + // Validate the state using the default validator + err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash()), statedb, receipts, usedGas) + if err != nil { + reportBlock(block, err) + return i, err + } + // Write state changes to database + _, err = statedb.Commit() + if err != nil { + return i, err + } + + // coalesce logs for later processing + coalescedLogs = append(coalescedLogs, logs...) + if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil { - glog.V(logger.Warn).Infoln("error writing block receipts:", err) + return i, err } txcount += len(block.Transactions()) @@ -1105,6 +1192,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { if err != nil { return i, err } + switch status { case CanonStatTy: if glog.V(logger.Debug) { @@ -1141,7 +1229,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { start, end := chain[0], chain[len(chain)-1] glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) } - go self.postChainEvents(events) + go self.postChainEvents(events, coalescedLogs) return 0, nil } @@ -1239,7 +1327,9 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // postChainEvents iterates over the events generated by a chain insertion and // posts them into the event mux. -func (self *BlockChain) postChainEvents(events []interface{}) { +func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) { + // post event logs for further processing + self.eventMux.Post(logs) for _, event := range events { if event, ok := event.(ChainEvent); ok { // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long @@ -1265,9 +1355,13 @@ func (self *BlockChain) update() { } } -func blockErr(block *types.Block, err error) { +// reportBlock reports the given block and error using the canonical block +// reporting tool. Reporting the block to the service is handled in a separate +// goroutine. +func reportBlock(block *types.Block, err error) { if glog.V(logger.Error) { glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex()) glog.Errorf(" %v", err) } + go ReportBlock(block, err) } -- cgit