aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_manager.go')
-rw-r--r--core/chain_manager.go355
1 files changed, 188 insertions, 167 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 3b9b7517b..fc1922b3b 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -11,15 +11,19 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/hashicorp/golang-lru"
"github.com/rcrowley/go-metrics"
+ "github.com/syndtr/goleveldb/leveldb"
)
var (
@@ -33,35 +37,40 @@ var (
)
const (
- blockCacheLimit = 10000
+ blockCacheLimit = 256
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
)
-func CalcDifficulty(block, parent *types.Header) *big.Int {
+// CalcDifficulty is the difficulty adjustment algorithm. It returns
+// the difficulty that a new block b should have when created at time
+// given the parent block's time and difficulty.
+func CalcDifficulty(time int64, parentTime int64, parentDiff *big.Int) *big.Int {
diff := new(big.Int)
-
- adjust := new(big.Int).Div(parent.Difficulty, params.DifficultyBoundDivisor)
- if big.NewInt(int64(block.Time)-int64(parent.Time)).Cmp(params.DurationLimit) < 0 {
- diff.Add(parent.Difficulty, adjust)
+ adjust := new(big.Int).Div(parentDiff, params.DifficultyBoundDivisor)
+ if big.NewInt(time-parentTime).Cmp(params.DurationLimit) < 0 {
+ diff.Add(parentDiff, adjust)
} else {
- diff.Sub(parent.Difficulty, adjust)
+ diff.Sub(parentDiff, adjust)
}
-
if diff.Cmp(params.MinimumDifficulty) < 0 {
return params.MinimumDifficulty
}
-
return diff
}
+// CalcTD computes the total difficulty of block.
func CalcTD(block, parent *types.Block) *big.Int {
if parent == nil {
return block.Difficulty()
}
- return new(big.Int).Add(parent.Td, block.Header().Difficulty)
+ d := block.Difficulty()
+ d.Add(d, parent.Td)
+ return d
}
+// CalcGasLimit computes the gas limit of the next block after parent.
+// The result may be modified by the caller.
func CalcGasLimit(parent *types.Block) *big.Int {
decay := new(big.Int).Div(parent.GasLimit(), params.GasLimitBoundDivisor)
contrib := new(big.Int).Mul(parent.GasUsed(), big.NewInt(3))
@@ -71,11 +80,11 @@ func CalcGasLimit(parent *types.Block) *big.Int {
gl := new(big.Int).Sub(parent.GasLimit(), decay)
gl = gl.Add(gl, contrib)
gl = gl.Add(gl, big.NewInt(1))
- gl = common.BigMax(gl, params.MinGasLimit)
+ gl.Set(common.BigMax(gl, params.MinGasLimit))
if gl.Cmp(params.GenesisGasLimit) < 0 {
- gl2 := new(big.Int).Add(parent.GasLimit(), decay)
- return common.BigMin(params.GenesisGasLimit, gl2)
+ gl.Add(parent.GasLimit(), decay)
+ gl.Set(common.BigMin(gl, params.GenesisGasLimit))
}
return gl
}
@@ -100,8 +109,9 @@ type ChainManager struct {
transState *state.StateDB
txState *state.ManagedState
- cache *BlockCache
- futureBlocks *BlockCache
+ cache *lru.Cache // cache is the LRU caching
+ futureBlocks *lru.Cache // future blocks are blocks added for later processing
+ pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db
quit chan struct{}
// procInterrupt must be atomically called
@@ -112,13 +122,14 @@ type ChainManager struct {
}
func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) {
+ cache, _ := lru.New(blockCacheLimit)
bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
genesisBlock: GenesisBlock(42, stateDb),
eventMux: mux,
quit: make(chan struct{}),
- cache: NewBlockCache(blockCacheLimit),
+ cache: cache,
pow: pow,
}
// Check the genesis block given to the chain manager. If the genesis block mismatches block number 0
@@ -147,7 +158,7 @@ func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow
// Take ownership of this particular state
bc.txState = state.ManageState(bc.State().Copy())
- bc.futureBlocks = NewBlockCache(maxFutureBlocks)
+ bc.futureBlocks, _ = lru.New(maxFutureBlocks)
bc.makeCache()
go bc.update()
@@ -159,11 +170,11 @@ func (bc *ChainManager) SetHead(head *types.Block) {
bc.mu.Lock()
defer bc.mu.Unlock()
- for block := bc.currentBlock; block != nil && block.Hash() != head.Hash(); block = bc.GetBlock(block.Header().ParentHash) {
+ for block := bc.currentBlock; block != nil && block.Hash() != head.Hash(); block = bc.GetBlock(block.ParentHash()) {
bc.removeBlock(block)
}
- bc.cache = NewBlockCache(blockCacheLimit)
+ bc.cache, _ = lru.New(blockCacheLimit)
bc.currentBlock = head
bc.makeCache()
@@ -251,65 +262,23 @@ func (bc *ChainManager) setLastState() {
}
func (bc *ChainManager) makeCache() {
- if bc.cache == nil {
- bc.cache = NewBlockCache(blockCacheLimit)
- }
+ bc.cache, _ = lru.New(blockCacheLimit)
// load in last `blockCacheLimit` - 1 blocks. Last block is the current.
- ancestors := bc.GetAncestors(bc.currentBlock, blockCacheLimit-1)
- ancestors = append(ancestors, bc.currentBlock)
- for _, block := range ancestors {
- bc.cache.Push(block)
+ bc.cache.Add(bc.genesisBlock.Hash(), bc.genesisBlock)
+ for _, block := range bc.GetBlocksFromHash(bc.currentBlock.Hash(), blockCacheLimit) {
+ bc.cache.Add(block.Hash(), block)
}
}
-// Block creation & chain handling
-func (bc *ChainManager) NewBlock(coinbase common.Address) *types.Block {
- bc.mu.RLock()
- defer bc.mu.RUnlock()
-
- var (
- root common.Hash
- parentHash common.Hash
- )
-
- if bc.currentBlock != nil {
- root = bc.currentBlock.Header().Root
- parentHash = bc.lastBlockHash
- }
-
- block := types.NewBlock(
- parentHash,
- coinbase,
- root,
- common.BigPow(2, 32),
- 0,
- nil)
- block.SetUncles(nil)
- block.SetTransactions(nil)
- block.SetReceipts(nil)
-
- parent := bc.currentBlock
- if parent != nil {
- header := block.Header()
- header.Difficulty = CalcDifficulty(block.Header(), parent.Header())
- header.Number = new(big.Int).Add(parent.Header().Number, common.Big1)
- header.GasLimit = CalcGasLimit(parent)
- }
-
- return block
-}
-
func (bc *ChainManager) Reset() {
bc.mu.Lock()
defer bc.mu.Unlock()
- for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.Header().ParentHash) {
+ for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.ParentHash()) {
bc.removeBlock(block)
}
- if bc.cache == nil {
- bc.cache = NewBlockCache(blockCacheLimit)
- }
+ bc.cache, _ = lru.New(blockCacheLimit)
// Prepare the genesis block
bc.write(bc.genesisBlock)
@@ -328,7 +297,7 @@ func (bc *ChainManager) ResetWithGenesisBlock(gb *types.Block) {
bc.mu.Lock()
defer bc.mu.Unlock()
- for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.Header().ParentHash) {
+ for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.ParentHash()) {
bc.removeBlock(block)
}
@@ -393,15 +362,20 @@ func (bc *ChainManager) insert(block *types.Block) {
}
func (bc *ChainManager) write(block *types.Block) {
- enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
- key := append(blockHashPre, block.Hash().Bytes()...)
- err := bc.blockDb.Put(key, enc)
- if err != nil {
- glog.Fatal("db write fail:", err)
- }
+ tstart := time.Now()
+
+ go func() {
+ enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+ key := append(blockHashPre, block.Hash().Bytes()...)
+ err := bc.blockDb.Put(key, enc)
+ if err != nil {
+ glog.Fatal("db write fail:", err)
+ }
+ }()
- // Push block to cache
- bc.cache.Push(block)
+ if glog.V(logger.Debug) {
+ glog.Infof("wrote block #%v %s. Took %v\n", block.Number(), common.PP(block.Hash().Bytes()), time.Since(tstart))
+ }
}
// Accessors
@@ -411,6 +385,16 @@ func (bc *ChainManager) Genesis() *types.Block {
// Block fetching methods
func (bc *ChainManager) HasBlock(hash common.Hash) bool {
+ if bc.cache.Contains(hash) {
+ return true
+ }
+
+ if bc.pendingBlocks != nil {
+ if _, exist := bc.pendingBlocks.Get(hash); exist {
+ return true
+ }
+ }
+
data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...))
return len(data) != 0
}
@@ -437,11 +421,15 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) (
}
func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
- /*
- if block := self.cache.Get(hash); block != nil {
- return block
+ if block, ok := self.cache.Get(hash); ok {
+ return block.(*types.Block)
+ }
+
+ if self.pendingBlocks != nil {
+ if block, _ := self.pendingBlocks.Get(hash); block != nil {
+ return block.(*types.Block)
}
- */
+ }
data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...))
if len(data) == 0 {
@@ -452,6 +440,10 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block {
glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err)
return nil
}
+
+ // Add the block to the cache
+ self.cache.Add(hash, (*types.Block)(&block))
+
return (*types.Block)(&block)
}
@@ -463,6 +455,19 @@ func (self *ChainManager) GetBlockByNumber(num uint64) *types.Block {
}
+// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
+func (self *ChainManager) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
+ for i := 0; i < n; i++ {
+ block := self.GetBlock(hash)
+ if block == nil {
+ break
+ }
+ blocks = append(blocks, block)
+ hash = block.ParentHash()
+ }
+ return
+}
+
// non blocking version
func (self *ChainManager) getBlockByNumber(num uint64) *types.Block {
key, _ := self.blockDb.Get(append(blockNumPre, big.NewInt(int64(num)).Bytes()...))
@@ -482,45 +487,12 @@ func (self *ChainManager) GetUnclesInChain(block *types.Block, length int) (uncl
return
}
-func (self *ChainManager) GetAncestors(block *types.Block, length int) (blocks []*types.Block) {
- for i := 0; i < length; i++ {
- block = self.GetBlock(block.ParentHash())
- if block == nil {
- break
- }
-
- blocks = append(blocks, block)
- }
-
- return
-}
-
// setTotalDifficulty updates the TD of the chain manager. Note, this function
// assumes that the `mu` mutex is held!
func (bc *ChainManager) setTotalDifficulty(td *big.Int) {
bc.td = new(big.Int).Set(td)
}
-func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
- parent := self.GetBlock(block.Header().ParentHash)
- if parent == nil {
- return nil, fmt.Errorf("Unable to calculate total diff without known parent %x", block.Header().ParentHash)
- }
-
- parentTd := parent.Td
-
- uncleDiff := new(big.Int)
- for _, uncle := range block.Uncles() {
- uncleDiff = uncleDiff.Add(uncleDiff, uncle.Difficulty)
- }
-
- td := new(big.Int)
- td = td.Add(parentTd, uncleDiff)
- td = td.Add(td, block.Header().Difficulty)
-
- return td, nil
-}
-
func (bc *ChainManager) Stop() {
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)
@@ -538,16 +510,94 @@ type queueEvent struct {
}
func (self *ChainManager) procFutureBlocks() {
- var blocks []*types.Block
- self.futureBlocks.Each(func(i int, block *types.Block) {
- blocks = append(blocks, block)
- })
+ blocks := make([]*types.Block, self.futureBlocks.Len())
+ for i, hash := range self.futureBlocks.Keys() {
+ block, _ := self.futureBlocks.Get(hash)
+ blocks[i] = block.(*types.Block)
+ }
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks)
self.InsertChain(blocks)
}
}
+func (self *ChainManager) enqueueForWrite(block *types.Block) {
+ self.pendingBlocks.Add(block.Hash(), block)
+}
+
+func (self *ChainManager) flushQueuedBlocks() {
+ db, batchWrite := self.blockDb.(*ethdb.LDBDatabase)
+ batch := new(leveldb.Batch)
+ for _, key := range self.pendingBlocks.Keys() {
+ b, _ := self.pendingBlocks.Get(key)
+ block := b.(*types.Block)
+
+ enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block))
+ key := append(blockHashPre, block.Hash().Bytes()...)
+ if batchWrite {
+ batch.Put(key, rle.Compress(enc))
+ } else {
+ self.blockDb.Put(key, enc)
+ }
+ }
+
+ if batchWrite {
+ db.LDB().Write(batch, nil)
+ }
+}
+
+type writeStatus byte
+
+const (
+ nonStatTy writeStatus = iota
+ canonStatTy
+ splitStatTy
+ sideStatTy
+)
+
+func (self *ChainManager) WriteBlock(block *types.Block) (status writeStatus, err error) {
+ self.wg.Add(1)
+ defer self.wg.Done()
+
+ cblock := self.currentBlock
+ // Compare the TD of the last known block in the canonical chain to make sure it's greater.
+ // At this point it's possible that a different chain (fork) becomes the new canonical chain.
+ if block.Td.Cmp(self.Td()) > 0 {
+ // chain fork
+ if block.ParentHash() != cblock.Hash() {
+ // during split we merge two different chains and create the new canonical chain
+ err := self.merge(cblock, block)
+ if err != nil {
+ return nonStatTy, err
+ }
+
+ status = splitStatTy
+ }
+
+ self.mu.Lock()
+ self.setTotalDifficulty(block.Td)
+ self.insert(block)
+ self.mu.Unlock()
+
+ self.setTransState(state.New(block.Root(), self.stateDb))
+ self.txState.SetState(state.New(block.Root(), self.stateDb))
+
+ status = canonStatTy
+ } else {
+ status = sideStatTy
+ }
+
+ // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
+ // not in the canonical chain.
+ self.mu.Lock()
+ self.enqueueForWrite(block)
+ self.mu.Unlock()
+ // Delete from future blocks
+ self.futureBlocks.Remove(block.Hash())
+
+ return
+}
+
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
@@ -557,6 +607,8 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock()
defer self.chainmu.Unlock()
+ self.pendingBlocks, _ = lru.New(len(chain))
+
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
@@ -574,6 +626,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// Start the parallel nonce verifier.
go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
defer close(nonceQuit)
+ defer self.flushQueuedBlocks()
txcount := 0
for i, block := range chain {
@@ -621,15 +674,13 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
}
- block.SetQueued(true)
- self.futureBlocks.Push(block)
+ self.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
}
- if IsParentErr(err) && self.futureBlocks.Has(block.ParentHash()) {
- block.SetQueued(true)
- self.futureBlocks.Push(block)
+ if IsParentErr(err) && self.futureBlocks.Contains(block.ParentHash()) {
+ self.futureBlocks.Add(block.Hash(), block)
stats.queued++
continue
}
@@ -641,59 +692,29 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
txcount += len(block.Transactions())
- cblock := self.currentBlock
- // Compare the TD of the last known block in the canonical chain to make sure it's greater.
- // At this point it's possible that a different chain (fork) becomes the new canonical chain.
- if block.Td.Cmp(self.Td()) > 0 {
- // chain fork
- if block.ParentHash() != cblock.Hash() {
- // during split we merge two different chains and create the new canonical chain
- err := self.merge(cblock, block)
- if err != nil {
- return i, err
- }
-
- queue[i] = ChainSplitEvent{block, logs}
- queueEvent.splitCount++
- }
-
- self.mu.Lock()
- self.setTotalDifficulty(block.Td)
- self.insert(block)
- self.mu.Unlock()
-
- jsonlogger.LogJson(&logger.EthChainNewHead{
- BlockHash: block.Hash().Hex(),
- BlockNumber: block.Number(),
- ChainHeadHash: cblock.Hash().Hex(),
- BlockPrevHash: block.ParentHash().Hex(),
- })
-
- self.setTransState(state.New(block.Root(), self.stateDb))
- self.txState.SetState(state.New(block.Root(), self.stateDb))
-
- queue[i] = ChainEvent{block, block.Hash(), logs}
- queueEvent.canonicalCount++
-
+ // write the block to the chain and get the status
+ status, err := self.WriteBlock(block)
+ if err != nil {
+ return i, err
+ }
+ switch status {
+ case canonStatTy:
if glog.V(logger.Debug) {
glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
- } else {
+ queue[i] = ChainEvent{block, block.Hash(), logs}
+ queueEvent.canonicalCount++
+ case sideStatTy:
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
-
queue[i] = ChainSideEvent{block, logs}
queueEvent.sideCount++
+ case splitStatTy:
+ queue[i] = ChainSplitEvent{block, logs}
+ queueEvent.splitCount++
}
- // Write block to database. Eventually we'll have to improve on this and throw away blocks that are
- // not in the canonical chain.
- self.write(block)
- // Delete from future blocks
- self.futureBlocks.Delete(block.Hash())
-
stats.processed++
- blockInsertTimer.UpdateSince(bstart)
}
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
@@ -752,9 +773,9 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, e
}
}
- if glog.V(logger.Info) {
+ if glog.V(logger.Debug) {
commonHash := commonBlock.Hash()
- glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
+ glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
}
return newChain, nil