diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2016-06-13 21:16:09 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-13 21:16:09 +0800 |
commit | a38be3eb488a349693a9c9905ab015278281f8db (patch) | |
tree | 34fa99ba38f421d9c7ffd308ed544cd5053df228 /eth | |
parent | 73c028c40a4f1336a0ab4b9773be0a9d7719777f (diff) | |
parent | f9917c8c7b6d16daadebd72977e56a8adc0382b0 (diff) | |
download | dexon-a38be3eb488a349693a9c9905ab015278281f8db.tar.gz dexon-a38be3eb488a349693a9c9905ab015278281f8db.tar.zst dexon-a38be3eb488a349693a9c9905ab015278281f8db.zip |
Merge pull request #2455 from zsfelfoldi/chaindb
core: improved chain db performance by using sequential keys
Diffstat (limited to 'eth')
-rw-r--r-- | eth/api.go | 26 | ||||
-rw-r--r-- | eth/backend.go | 118 | ||||
-rw-r--r-- | eth/backend_test.go | 2 | ||||
-rw-r--r-- | eth/db_upgrade.go | 343 | ||||
-rw-r--r-- | eth/filters/api.go | 2 | ||||
-rw-r--r-- | eth/filters/filter.go | 7 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 4 | ||||
-rw-r--r-- | eth/gasprice.go | 2 | ||||
-rw-r--r-- | eth/handler.go | 32 | ||||
-rw-r--r-- | eth/handler_test.go | 10 | ||||
-rw-r--r-- | eth/sync.go | 3 |
11 files changed, 400 insertions, 149 deletions
diff --git a/eth/api.go b/eth/api.go index f5f942c27..3cb6c4d10 100644 --- a/eth/api.go +++ b/eth/api.go @@ -594,7 +594,7 @@ func (s *PublicBlockChainAPI) GetBlockByNumber(blockNr rpc.BlockNumber, fullTx b // GetBlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full // detail, otherwise only the transaction hash is returned. func (s *PublicBlockChainAPI) GetBlockByHash(blockHash common.Hash, fullTx bool) (map[string]interface{}, error) { - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { return s.rpcOutputBlock(block, true, fullTx) } return nil, nil @@ -618,7 +618,7 @@ func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(blockNr rpc.BlockNum // GetUncleByBlockHashAndIndex returns the uncle block for the given block hash and index. When fullTx is true // all transactions in the block are returned in full detail, otherwise only the transaction hash is returned. func (s *PublicBlockChainAPI) GetUncleByBlockHashAndIndex(blockHash common.Hash, index rpc.HexNumber) (map[string]interface{}, error) { - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { uncles := block.Uncles() if index.Int() < 0 || index.Int() >= len(uncles) { glog.V(logger.Debug).Infof("uncle block on index %d not found for block %s", index.Int(), blockHash.Hex()) @@ -640,7 +640,7 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockNumber(blockNr rpc.BlockNumber // GetUncleCountByBlockHash returns number of uncles in the block for the given block hash func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(blockHash common.Hash) *rpc.HexNumber { - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { return rpc.NewHexNumber(len(block.Uncles())) } return nil @@ -814,7 +814,7 @@ func (s *PublicBlockChainAPI) rpcOutputBlock(b *types.Block, inclTx bool, fullTx "stateRoot": b.Root(), "miner": b.Coinbase(), "difficulty": rpc.NewHexNumber(b.Difficulty()), - "totalDifficulty": rpc.NewHexNumber(s.bc.GetTd(b.Hash())), + "totalDifficulty": rpc.NewHexNumber(s.bc.GetTd(b.Hash(), b.NumberU64())), "extraData": fmt.Sprintf("0x%x", b.Extra()), "size": rpc.NewHexNumber(b.Size().Int64()), "gasLimit": rpc.NewHexNumber(b.GasLimit()), @@ -1004,7 +1004,7 @@ func (s *PublicTransactionPoolAPI) GetBlockTransactionCountByNumber(blockNr rpc. // GetBlockTransactionCountByHash returns the number of transactions in the block with the given hash. func (s *PublicTransactionPoolAPI) GetBlockTransactionCountByHash(blockHash common.Hash) *rpc.HexNumber { - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { return rpc.NewHexNumber(len(block.Transactions())) } return nil @@ -1020,7 +1020,7 @@ func (s *PublicTransactionPoolAPI) GetTransactionByBlockNumberAndIndex(blockNr r // GetTransactionByBlockHashAndIndex returns the transaction for the given block hash and index. func (s *PublicTransactionPoolAPI) GetTransactionByBlockHashAndIndex(blockHash common.Hash, index rpc.HexNumber) (*RPCTransaction, error) { - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { return newRPCTransactionFromBlockIndex(block, index.Int()) } return nil, nil @@ -1080,7 +1080,7 @@ func (s *PublicTransactionPoolAPI) GetTransactionByHash(txHash common.Hash) (*RP return nil, nil } - if block := s.bc.GetBlock(blockHash); block != nil { + if block := s.bc.GetBlockByHash(blockHash); block != nil { return newRPCTransaction(block, txHash) } @@ -1705,7 +1705,7 @@ func (api *PrivateDebugAPI) TraceBlockByNumber(number uint64, config *vm.Config) // TraceBlockByHash processes the block by hash. func (api *PrivateDebugAPI) TraceBlockByHash(hash common.Hash, config *vm.Config) BlockTraceResult { // Fetch the block that we aim to reprocess - block := api.eth.BlockChain().GetBlock(hash) + block := api.eth.BlockChain().GetBlockByHash(hash) if block == nil { return BlockTraceResult{Error: fmt.Sprintf("block #%x not found", hash)} } @@ -1745,10 +1745,10 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, config *vm.Config) (b config.Debug = true // make sure debug is set. config.Logger.Collector = collector - if err := core.ValidateHeader(api.config, blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash()), true, false); err != nil { + if err := core.ValidateHeader(api.config, blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash(), block.NumberU64()-1), true, false); err != nil { return false, collector.traces, err } - statedb, err := state.New(blockchain.GetBlock(block.ParentHash()).Root(), api.eth.ChainDb()) + statedb, err := state.New(blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1).Root(), api.eth.ChainDb()) if err != nil { return false, collector.traces, err } @@ -1757,7 +1757,7 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, config *vm.Config) (b if err != nil { return false, collector.traces, err } - if err := validator.ValidateState(block, blockchain.GetBlock(block.ParentHash()), statedb, receipts, usedGas); err != nil { + if err := validator.ValidateState(block, blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1), statedb, receipts, usedGas); err != nil { return false, collector.traces, err } return true, collector.traces, nil @@ -1841,12 +1841,12 @@ func (api *PrivateDebugAPI) TraceTransaction(txHash common.Hash, logger *vm.LogC if tx == nil { return nil, fmt.Errorf("transaction %x not found", txHash) } - block := api.eth.BlockChain().GetBlock(blockHash) + block := api.eth.BlockChain().GetBlockByHash(blockHash) if block == nil { return nil, fmt.Errorf("block %x not found", blockHash) } // Create the state database to mutate and eventually trace - parent := api.eth.BlockChain().GetBlock(block.ParentHash()) + parent := api.eth.BlockChain().GetBlock(block.ParentHash(), block.NumberU64()-1) if parent == nil { return nil, fmt.Errorf("block parent %x not found", block.ParentHash()) } diff --git a/eth/backend.go b/eth/backend.go index bb487650b..006523484 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,7 +18,6 @@ package eth import ( - "bytes" "errors" "fmt" "math/big" @@ -47,7 +46,6 @@ import ( "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ) @@ -104,9 +102,9 @@ type Config struct { } type Ethereum struct { - chainConfig *core.ChainConfig - // Channel for shutting down the ethereum - shutdownChan chan bool + chainConfig *core.ChainConfig + shutdownChan chan bool // Channel for shutting down the ethereum + stopDbUpgrade func() // stop chain db sequential key upgrade // DB interfaces chainDb ethdb.Database // Block chain database @@ -161,6 +159,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if err := addMipmapBloomBins(chainDb); err != nil { return nil, err } + stopDbUpgrade := upgradeSequentialKeys(chainDb) dappDb, err := ctx.OpenDatabase("dapp", config.DatabaseCache, config.DatabaseHandles) if err != nil { @@ -185,7 +184,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { chainDb = config.TestGenesisState } if config.TestGenesisBlock != nil { - core.WriteTd(chainDb, config.TestGenesisBlock.Hash(), config.TestGenesisBlock.Difficulty()) + core.WriteTd(chainDb, config.TestGenesisBlock.Hash(), config.TestGenesisBlock.NumberU64(), config.TestGenesisBlock.Difficulty()) core.WriteBlock(chainDb, config.TestGenesisBlock) core.WriteCanonicalHash(chainDb, config.TestGenesisBlock.Hash(), config.TestGenesisBlock.NumberU64()) core.WriteHeadBlockHash(chainDb, config.TestGenesisBlock.Hash()) @@ -202,6 +201,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eth := &Ethereum{ shutdownChan: make(chan bool), + stopDbUpgrade: stopDbUpgrade, chainDb: chainDb, dappDb: dappDb, eventMux: ctx.EventMux, @@ -238,7 +238,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { // load the genesis block or write a new one if no genesis // block is prenent in the database. - genesis := core.GetBlock(chainDb, core.GetCanonicalHash(chainDb, 0)) + genesis := core.GetBlock(chainDb, core.GetCanonicalHash(chainDb, 0), 0) if genesis == nil { genesis, err = core.WriteDefaultGenesisBlock(chainDb) if err != nil { @@ -415,6 +415,9 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { // Stop implements node.Service, terminating all internal goroutines used by the // Ethereum protocol. func (s *Ethereum) Stop() error { + if s.stopDbUpgrade != nil { + s.stopDbUpgrade() + } s.blockchain.Stop() s.protocolManager.Stop() s.txPool.Stop() @@ -526,104 +529,3 @@ func dagFiles(epoch uint64) (string, string) { dag := fmt.Sprintf("full-R%d-%x", ethashRevision, seedHash[:8]) return dag, "full-R" + dag } - -// upgradeChainDatabase ensures that the chain database stores block split into -// separate header and body entries. -func upgradeChainDatabase(db ethdb.Database) error { - // Short circuit if the head block is stored already as separate header and body - data, err := db.Get([]byte("LastBlock")) - if err != nil { - return nil - } - head := common.BytesToHash(data) - - if block := core.GetBlockByHashOld(db, head); block == nil { - return nil - } - // At least some of the database is still the old format, upgrade (skip the head block!) - glog.V(logger.Info).Info("Old database detected, upgrading...") - - if db, ok := db.(*ethdb.LDBDatabase); ok { - blockPrefix := []byte("block-hash-") - for it := db.NewIterator(); it.Next(); { - // Skip anything other than a combined block - if !bytes.HasPrefix(it.Key(), blockPrefix) { - continue - } - // Skip the head block (merge last to signal upgrade completion) - if bytes.HasSuffix(it.Key(), head.Bytes()) { - continue - } - // Load the block, split and serialize (order!) - block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix))) - - if err := core.WriteTd(db, block.Hash(), block.DeprecatedTd()); err != nil { - return err - } - if err := core.WriteBody(db, block.Hash(), block.Body()); err != nil { - return err - } - if err := core.WriteHeader(db, block.Header()); err != nil { - return err - } - if err := db.Delete(it.Key()); err != nil { - return err - } - } - // Lastly, upgrade the head block, disabling the upgrade mechanism - current := core.GetBlockByHashOld(db, head) - - if err := core.WriteTd(db, current.Hash(), current.DeprecatedTd()); err != nil { - return err - } - if err := core.WriteBody(db, current.Hash(), current.Body()); err != nil { - return err - } - if err := core.WriteHeader(db, current.Header()); err != nil { - return err - } - } - return nil -} - -func addMipmapBloomBins(db ethdb.Database) (err error) { - const mipmapVersion uint = 2 - - // check if the version is set. We ignore data for now since there's - // only one version so we can easily ignore it for now - var data []byte - data, _ = db.Get([]byte("setting-mipmap-version")) - if len(data) > 0 { - var version uint - if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { - return nil - } - } - - defer func() { - if err == nil { - var val []byte - val, err = rlp.EncodeToBytes(mipmapVersion) - if err == nil { - err = db.Put([]byte("setting-mipmap-version"), val) - } - return - } - }() - latestBlock := core.GetBlock(db, core.GetHeadBlockHash(db)) - if latestBlock == nil { // clean database - return - } - - tstart := time.Now() - glog.V(logger.Info).Infoln("upgrading db log bloom bins") - for i := uint64(0); i <= latestBlock.NumberU64(); i++ { - hash := core.GetCanonicalHash(db, i) - if (hash == common.Hash{}) { - return fmt.Errorf("chain db corrupted. Could not find block %d.", i) - } - core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash)) - } - glog.V(logger.Info).Infoln("upgrade completed in", time.Since(tstart)) - return nil -} diff --git a/eth/backend_test.go b/eth/backend_test.go index c2bed879b..cb94adbf0 100644 --- a/eth/backend_test.go +++ b/eth/backend_test.go @@ -61,7 +61,7 @@ func TestMipmapUpgrade(t *testing.T) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { t.Fatalf("failed to insert block number: %v", err) } - if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil { t.Fatal("error writing block receipts:", err) } } diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go new file mode 100644 index 000000000..12de60fe7 --- /dev/null +++ b/eth/db_upgrade.go @@ -0,0 +1,343 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package eth implements the Ethereum protocol. +package eth + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rlp" +) + +var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys") + +// upgradeSequentialKeys checks the chain database version and +// starts a background process to make upgrades if necessary. +// Returns a stop function that blocks until the process has +// been safely stopped. +func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { + data, _ := db.Get(useSequentialKeys) + if len(data) > 0 && data[0] == 42 { + return nil // already converted + } + + if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 { + db.Put(useSequentialKeys, []byte{42}) + return nil // empty database, nothing to do + } + + glog.V(logger.Info).Infof("Upgrading chain database to use sequential keys") + + stopChn := make(chan struct{}) + stoppedChn := make(chan struct{}) + + go func() { + stopFn := func() bool { + select { + case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved + case <-stopChn: + return true + } + return false + } + + err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn) + if err == nil && !stopped { + err, stopped = upgradeSequentialBlocks(db, stopFn) + } + if err == nil && !stopped { + err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) + } + if err == nil && !stopped { + glog.V(logger.Info).Infof("Database conversion successful") + db.Put(useSequentialKeys, []byte{42}) + } + if err != nil { + glog.V(logger.Error).Infof("Database conversion failed: %v", err) + } + close(stoppedChn) + }() + + return func() { + close(stopChn) + <-stoppedChn + } +} + +// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from +// the database, writes them in new format and deletes the old ones if successful. +func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("block-num-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + keyPtr := it.Key() + if len(keyPtr) < 20 { + cnt++ + if cnt%100000 == 0 { + glog.V(logger.Info).Infof("converting %d canonical numbers...", cnt) + } + number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() + newKey := []byte("h12345678n") + binary.BigEndian.PutUint64(newKey[1:9], number) + if err := db.Put(newKey, it.Value()); err != nil { + return err, false + } + if err := db.Delete(keyPtr); err != nil { + return err, false + } + } + + if stopFn() { + return nil, true + } + it.Next() + } + if cnt > 0 { + glog.V(logger.Info).Infof("converted %d canonical numbers...", cnt) + } + return nil, false +} + +// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block +// receipts from the database, writes them in new format and deletes the old ones +// if successful. +func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("block-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + keyPtr := it.Key() + if len(keyPtr) >= 38 { + cnt++ + if cnt%10000 == 0 { + glog.V(logger.Info).Infof("converting %d blocks...", cnt) + } + // convert header, body, td and block receipts + var keyPrefix [38]byte + copy(keyPrefix[:], keyPtr[0:38]) + hash := keyPrefix[6:38] + if err := upgradeSequentialBlockData(db, hash); err != nil { + return err, false + } + // delete old db entries belonging to this hash + for bytes.HasPrefix(it.Key(), keyPrefix[:]) { + if err := db.Delete(it.Key()); err != nil { + return err, false + } + it.Next() + } + if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil { + return err, false + } + } else { + it.Next() + } + + if stopFn() { + return nil, true + } + } + if cnt > 0 { + glog.V(logger.Info).Infof("converted %d blocks...", cnt) + } + return nil, false +} + +// upgradeSequentialOrphanedReceipts removes any old format block receipts from the +// database that did not have a corresponding block +func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("receipts-block-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + // phase 2 already converted receipts belonging to existing + // blocks, just remove if there's anything left + cnt++ + if err := db.Delete(it.Key()); err != nil { + return err, false + } + + if stopFn() { + return nil, true + } + it.Next() + } + if cnt > 0 { + glog.V(logger.Info).Infof("removed %d orphaned block receipts...", cnt) + } + return nil, false +} + +// upgradeSequentialBlockData upgrades the header, body, td and block receipts +// database entries belonging to a single hash (doesn't delete old data). +func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error { + // get old chain data and block number + headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...)) + if len(headerRLP) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { + return err + } + number := header.Number.Uint64() + bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...)) + tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...)) + receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...)) + // store new hash -> number association + encNum := make([]byte, 8) + binary.BigEndian.PutUint64(encNum, number) + if err := db.Put(append([]byte("H"), hash...), encNum); err != nil { + return err + } + // store new chain data + if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil { + return err + } + if len(tdRLP) != 0 { + if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil { + return err + } + } + if len(bodyRLP) != 0 { + if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil { + return err + } + } + if len(receiptsRLP) != 0 { + if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil { + return err + } + } + return nil +} + +// upgradeChainDatabase ensures that the chain database stores block split into +// separate header and body entries. +func upgradeChainDatabase(db ethdb.Database) error { + // Short circuit if the head block is stored already as separate header and body + data, err := db.Get([]byte("LastBlock")) + if err != nil { + return nil + } + head := common.BytesToHash(data) + + if block := core.GetBlockByHashOld(db, head); block == nil { + return nil + } + // At least some of the database is still the old format, upgrade (skip the head block!) + glog.V(logger.Info).Info("Old database detected, upgrading...") + + if db, ok := db.(*ethdb.LDBDatabase); ok { + blockPrefix := []byte("block-hash-") + for it := db.NewIterator(); it.Next(); { + // Skip anything other than a combined block + if !bytes.HasPrefix(it.Key(), blockPrefix) { + continue + } + // Skip the head block (merge last to signal upgrade completion) + if bytes.HasSuffix(it.Key(), head.Bytes()) { + continue + } + // Load the block, split and serialize (order!) + block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix))) + + if err := core.WriteTd(db, block.Hash(), block.NumberU64(), block.DeprecatedTd()); err != nil { + return err + } + if err := core.WriteBody(db, block.Hash(), block.NumberU64(), block.Body()); err != nil { + return err + } + if err := core.WriteHeader(db, block.Header()); err != nil { + return err + } + if err := db.Delete(it.Key()); err != nil { + return err + } + } + // Lastly, upgrade the head block, disabling the upgrade mechanism + current := core.GetBlockByHashOld(db, head) + + if err := core.WriteTd(db, current.Hash(), current.NumberU64(), current.DeprecatedTd()); err != nil { + return err + } + if err := core.WriteBody(db, current.Hash(), current.NumberU64(), current.Body()); err != nil { + return err + } + if err := core.WriteHeader(db, current.Header()); err != nil { + return err + } + } + return nil +} + +func addMipmapBloomBins(db ethdb.Database) (err error) { + const mipmapVersion uint = 2 + + // check if the version is set. We ignore data for now since there's + // only one version so we can easily ignore it for now + var data []byte + data, _ = db.Get([]byte("setting-mipmap-version")) + if len(data) > 0 { + var version uint + if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { + return nil + } + } + + defer func() { + if err == nil { + var val []byte + val, err = rlp.EncodeToBytes(mipmapVersion) + if err == nil { + err = db.Put([]byte("setting-mipmap-version"), val) + } + return + } + }() + latestHash := core.GetHeadBlockHash(db) + latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash)) + if latestBlock == nil { // clean database + return + } + + tstart := time.Now() + glog.V(logger.Info).Infoln("upgrading db log bloom bins") + for i := uint64(0); i <= latestBlock.NumberU64(); i++ { + hash := core.GetCanonicalHash(db, i) + if (hash == common.Hash{}) { + return fmt.Errorf("chain db corrupted. Could not find block %d.", i) + } + core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i)) + } + glog.V(logger.Info).Infoln("upgrade completed in", time.Since(tstart)) + return nil +} diff --git a/eth/filters/api.go b/eth/filters/api.go index 7278e20b9..393019f8b 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -361,7 +361,7 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') { raw = raw[2:] } - if len(raw) != 2 * common.HashLength { + if len(raw) != 2*common.HashLength { return common.Hash{}, errors.New("invalid topic(s)") } if decAddr, err := hex.DecodeString(raw); err == nil { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 469dfba4d..995b588fb 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -72,7 +72,8 @@ func (self *Filter) SetTopics(topics [][]common.Hash) { // Run filters logs with the current parameters set func (self *Filter) Find() vm.Logs { - latestBlock := core.GetBlock(self.db, core.GetHeadBlockHash(self.db)) + latestHash := core.GetHeadBlockHash(self.db) + latestBlock := core.GetBlock(self.db, latestHash, core.GetBlockNumber(self.db, latestHash)) var beginBlockNo uint64 = uint64(self.begin) if self.begin == -1 { beginBlockNo = latestBlock.NumberU64() @@ -127,7 +128,7 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { for i := start; i <= end; i++ { hash := core.GetCanonicalHash(self.db, i) if hash != (common.Hash{}) { - block = core.GetBlock(self.db, hash) + block = core.GetBlock(self.db, hash, i) } else { // block not found return logs } @@ -137,7 +138,7 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { if self.bloomFilter(block) { // Get the logs of the block var ( - receipts = core.GetBlockReceipts(self.db, block.Hash()) + receipts = core.GetBlockReceipts(self.db, block.Hash(), i) unfiltered vm.Logs ) for _, receipt := range receipts { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b0f88ffeb..a95adfce7 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -94,7 +94,7 @@ func BenchmarkMipmaps(b *testing.B) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { b.Fatalf("failed to insert block number: %v", err) } - if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil { b.Fatal("error writing block receipts:", err) } } @@ -196,7 +196,7 @@ func TestFilters(t *testing.T) { if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil { t.Fatalf("failed to insert block number: %v", err) } - if err := core.WriteBlockReceipts(db, block.Hash(), receipts[i]); err != nil { + if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil { t.Fatal("error writing block receipts:", err) } } diff --git a/eth/gasprice.go b/eth/gasprice.go index e0de89e62..ef203f8fe 100644 --- a/eth/gasprice.go +++ b/eth/gasprice.go @@ -166,7 +166,7 @@ func (self *GasPriceOracle) processBlock(block *types.Block) { func (self *GasPriceOracle) lowestPrice(block *types.Block) *big.Int { gasUsed := big.NewInt(0) - receipts := core.GetBlockReceipts(self.eth.ChainDb(), block.Hash()) + receipts := core.GetBlockReceipts(self.eth.ChainDb(), block.Hash(), block.NumberU64()) if len(receipts) > 0 { if cgu := receipts[len(receipts)-1].CumulativeGasUsed; cgu != nil { gasUsed = receipts[len(receipts)-1].CumulativeGasUsed diff --git a/eth/handler.go b/eth/handler.go index 1e4dc1289..47a36cc0b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -152,9 +152,9 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeader, - blockchain.GetBlock, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, - blockchain.GetTd, blockchain.InsertHeaderChain, manager.insertChain, blockchain.InsertReceiptChain, blockchain.Rollback, + manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash, + blockchain.GetBlockByHash, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, + blockchain.GetTdByHash, blockchain.InsertHeaderChain, manager.insertChain, blockchain.InsertReceiptChain, blockchain.Rollback, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { @@ -167,7 +167,7 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, atomic.StoreUint32(&manager.synced, 1) // Mark initial sync done on any fetcher import return manager.insertChain(blocks) } - manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) if blockchain.Genesis().Hash().Hex() == defaultGenesisHash && networkId == 1 { glog.V(logger.Debug).Infoln("Bad Block Reporting is enabled") @@ -382,7 +382,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block, stopping if enough was found - if block := pm.blockchain.GetBlock(hash); block != nil { + if block := pm.blockchain.GetBlockByHash(hash); block != nil { blocks = append(blocks, block) bytes += block.Size() } @@ -425,13 +425,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Retrieve the next header satisfying the query var origin *types.Header if hashMode { - origin = pm.blockchain.GetHeader(query.Origin.Hash) + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) } else { origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } + number := origin.Number.Uint64() headers = append(headers, origin) bytes += estHeaderRlpSize @@ -440,8 +441,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // Hash based traversal towards the genesis block for i := 0; i < int(query.Skip)+1; i++ { - if header := pm.blockchain.GetHeader(query.Origin.Hash); header != nil { + if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil { query.Origin.Hash = header.ParentHash + number-- } else { unknown = true break @@ -602,9 +604,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Retrieve the requested block's receipts, skipping if unknown to us - results := core.GetBlockReceipts(pm.chaindb, hash) + results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash)) if results == nil { - if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { continue } } @@ -697,7 +699,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Update the peers total difficulty if needed, schedule a download if gapped if request.TD.Cmp(p.Td()) > 0 { p.SetTd(request.TD) - td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash()) + currentBlock := pm.blockchain.CurrentBlock() + td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) if request.TD.Cmp(new(big.Int).Add(td, request.Block.Difficulty())) > 0 { go pm.synchronise(p) } @@ -738,8 +741,8 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int - if parent := pm.blockchain.GetBlock(block.ParentHash()); parent != nil { - td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash())) + if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { + td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { glog.V(logger.Error).Infof("propagating dangling block #%d [%x]", block.NumberU64(), hash[:4]) return @@ -807,10 +810,11 @@ type EthNodeInfo struct { // NodeInfo retrieves some protocol metadata about the running host node. func (self *ProtocolManager) NodeInfo() *EthNodeInfo { + currentBlock := self.blockchain.CurrentBlock() return &EthNodeInfo{ Network: self.networkId, - Difficulty: self.blockchain.GetTd(self.blockchain.CurrentBlock().Hash()), + Difficulty: self.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), Genesis: self.blockchain.Genesis().Hash(), - Head: self.blockchain.CurrentBlock().Hash(), + Head: currentBlock.Hash(), } } diff --git a/eth/handler_test.go b/eth/handler_test.go index 9e593f040..8418c28b2 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -91,7 +91,7 @@ func testGetBlockHashes(t *testing.T, protocol int) { // Assemble the hash response we would like to receive resp := make([]common.Hash, tt.result) if len(resp) > 0 { - from := pm.blockchain.GetBlock(tt.origin).NumberU64() - 1 + from := pm.blockchain.GetBlockByHash(tt.origin).NumberU64() - 1 for j := 0; j < len(resp); j++ { resp[j] = pm.blockchain.GetBlockByNumber(uint64(int(from) - j)).Hash() } @@ -204,7 +204,7 @@ func testGetBlocks(t *testing.T, protocol int) { for j, hash := range tt.explicit { hashes = append(hashes, hash) if tt.available[j] && len(blocks) < tt.expected { - blocks = append(blocks, pm.blockchain.GetBlock(hash)) + blocks = append(blocks, pm.blockchain.GetBlockByHash(hash)) } } // Send the hash request and verify the response @@ -339,7 +339,7 @@ func testGetBlockHeaders(t *testing.T, protocol int) { // Collect the headers to expect in the response headers := []*types.Header{} for _, hash := range tt.expect { - headers = append(headers, pm.blockchain.GetBlock(hash).Header()) + headers = append(headers, pm.blockchain.GetBlockByHash(hash).Header()) } // Send the hash request and verify the response p2p.Send(peer.app, 0x03, tt.query) @@ -420,7 +420,7 @@ func testGetBlockBodies(t *testing.T, protocol int) { for j, hash := range tt.explicit { hashes = append(hashes, hash) if tt.available[j] && len(bodies) < tt.expected { - block := pm.blockchain.GetBlock(hash) + block := pm.blockchain.GetBlockByHash(hash) bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()}) } } @@ -572,7 +572,7 @@ func testGetReceipt(t *testing.T, protocol int) { block := pm.blockchain.GetBlockByNumber(i) hashes = append(hashes, block.Hash()) - receipts = append(receipts, core.GetBlockReceipts(pm.chaindb, block.Hash())) + receipts = append(receipts, core.GetBlockReceipts(pm.chaindb, block.Hash(), block.NumberU64())) } // Send the hash request and verify the response p2p.Send(peer.app, 0x0f, hashes) diff --git a/eth/sync.go b/eth/sync.go index 52f7e90e7..23cf18c8d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -162,7 +162,8 @@ func (pm *ProtocolManager) synchronise(peer *peer) { return } // Make sure the peer's TD is higher than our own. If not drop. - td := pm.blockchain.GetTd(pm.blockchain.CurrentBlock().Hash()) + currentBlock := pm.blockchain.CurrentBlock() + td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) if peer.Td().Cmp(td) <= 0 { return } |