diff options
author | zsfelfoldi <zsfelfoldi@gmail.com> | 2016-04-05 21:22:04 +0800 |
---|---|---|
committer | zsfelfoldi <zsfelfoldi@gmail.com> | 2016-06-07 22:38:56 +0800 |
commit | f9917c8c7b6d16daadebd72977e56a8adc0382b0 (patch) | |
tree | eb11524c618a44dbd499918761ba176a9addba58 /eth/db_upgrade.go | |
parent | 5a458da42ae9a6525989c2d4515c6fa573ba8f8c (diff) | |
download | go-tangerine-f9917c8c7b6d16daadebd72977e56a8adc0382b0.tar.gz go-tangerine-f9917c8c7b6d16daadebd72977e56a8adc0382b0.tar.zst go-tangerine-f9917c8c7b6d16daadebd72977e56a8adc0382b0.zip |
core: improved chainDb using sequential keys
Diffstat (limited to 'eth/db_upgrade.go')
-rw-r--r-- | eth/db_upgrade.go | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go new file mode 100644 index 000000000..12de60fe7 --- /dev/null +++ b/eth/db_upgrade.go @@ -0,0 +1,343 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package eth implements the Ethereum protocol. +package eth + +import ( + "bytes" + "encoding/binary" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/rlp" +) + +var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys") + +// upgradeSequentialKeys checks the chain database version and +// starts a background process to make upgrades if necessary. +// Returns a stop function that blocks until the process has +// been safely stopped. +func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { + data, _ := db.Get(useSequentialKeys) + if len(data) > 0 && data[0] == 42 { + return nil // already converted + } + + if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 { + db.Put(useSequentialKeys, []byte{42}) + return nil // empty database, nothing to do + } + + glog.V(logger.Info).Infof("Upgrading chain database to use sequential keys") + + stopChn := make(chan struct{}) + stoppedChn := make(chan struct{}) + + go func() { + stopFn := func() bool { + select { + case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved + case <-stopChn: + return true + } + return false + } + + err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn) + if err == nil && !stopped { + err, stopped = upgradeSequentialBlocks(db, stopFn) + } + if err == nil && !stopped { + err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) + } + if err == nil && !stopped { + glog.V(logger.Info).Infof("Database conversion successful") + db.Put(useSequentialKeys, []byte{42}) + } + if err != nil { + glog.V(logger.Error).Infof("Database conversion failed: %v", err) + } + close(stoppedChn) + }() + + return func() { + close(stopChn) + <-stoppedChn + } +} + +// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from +// the database, writes them in new format and deletes the old ones if successful. +func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("block-num-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + keyPtr := it.Key() + if len(keyPtr) < 20 { + cnt++ + if cnt%100000 == 0 { + glog.V(logger.Info).Infof("converting %d canonical numbers...", cnt) + } + number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() + newKey := []byte("h12345678n") + binary.BigEndian.PutUint64(newKey[1:9], number) + if err := db.Put(newKey, it.Value()); err != nil { + return err, false + } + if err := db.Delete(keyPtr); err != nil { + return err, false + } + } + + if stopFn() { + return nil, true + } + it.Next() + } + if cnt > 0 { + glog.V(logger.Info).Infof("converted %d canonical numbers...", cnt) + } + return nil, false +} + +// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block +// receipts from the database, writes them in new format and deletes the old ones +// if successful. +func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("block-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + keyPtr := it.Key() + if len(keyPtr) >= 38 { + cnt++ + if cnt%10000 == 0 { + glog.V(logger.Info).Infof("converting %d blocks...", cnt) + } + // convert header, body, td and block receipts + var keyPrefix [38]byte + copy(keyPrefix[:], keyPtr[0:38]) + hash := keyPrefix[6:38] + if err := upgradeSequentialBlockData(db, hash); err != nil { + return err, false + } + // delete old db entries belonging to this hash + for bytes.HasPrefix(it.Key(), keyPrefix[:]) { + if err := db.Delete(it.Key()); err != nil { + return err, false + } + it.Next() + } + if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil { + return err, false + } + } else { + it.Next() + } + + if stopFn() { + return nil, true + } + } + if cnt > 0 { + glog.V(logger.Info).Infof("converted %d blocks...", cnt) + } + return nil, false +} + +// upgradeSequentialOrphanedReceipts removes any old format block receipts from the +// database that did not have a corresponding block +func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { + prefix := []byte("receipts-block-") + it := db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(prefix) + cnt := 0 + for bytes.HasPrefix(it.Key(), prefix) { + // phase 2 already converted receipts belonging to existing + // blocks, just remove if there's anything left + cnt++ + if err := db.Delete(it.Key()); err != nil { + return err, false + } + + if stopFn() { + return nil, true + } + it.Next() + } + if cnt > 0 { + glog.V(logger.Info).Infof("removed %d orphaned block receipts...", cnt) + } + return nil, false +} + +// upgradeSequentialBlockData upgrades the header, body, td and block receipts +// database entries belonging to a single hash (doesn't delete old data). +func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error { + // get old chain data and block number + headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...)) + if len(headerRLP) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { + return err + } + number := header.Number.Uint64() + bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...)) + tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...)) + receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...)) + // store new hash -> number association + encNum := make([]byte, 8) + binary.BigEndian.PutUint64(encNum, number) + if err := db.Put(append([]byte("H"), hash...), encNum); err != nil { + return err + } + // store new chain data + if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil { + return err + } + if len(tdRLP) != 0 { + if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil { + return err + } + } + if len(bodyRLP) != 0 { + if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil { + return err + } + } + if len(receiptsRLP) != 0 { + if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil { + return err + } + } + return nil +} + +// upgradeChainDatabase ensures that the chain database stores block split into +// separate header and body entries. +func upgradeChainDatabase(db ethdb.Database) error { + // Short circuit if the head block is stored already as separate header and body + data, err := db.Get([]byte("LastBlock")) + if err != nil { + return nil + } + head := common.BytesToHash(data) + + if block := core.GetBlockByHashOld(db, head); block == nil { + return nil + } + // At least some of the database is still the old format, upgrade (skip the head block!) + glog.V(logger.Info).Info("Old database detected, upgrading...") + + if db, ok := db.(*ethdb.LDBDatabase); ok { + blockPrefix := []byte("block-hash-") + for it := db.NewIterator(); it.Next(); { + // Skip anything other than a combined block + if !bytes.HasPrefix(it.Key(), blockPrefix) { + continue + } + // Skip the head block (merge last to signal upgrade completion) + if bytes.HasSuffix(it.Key(), head.Bytes()) { + continue + } + // Load the block, split and serialize (order!) + block := core.GetBlockByHashOld(db, common.BytesToHash(bytes.TrimPrefix(it.Key(), blockPrefix))) + + if err := core.WriteTd(db, block.Hash(), block.NumberU64(), block.DeprecatedTd()); err != nil { + return err + } + if err := core.WriteBody(db, block.Hash(), block.NumberU64(), block.Body()); err != nil { + return err + } + if err := core.WriteHeader(db, block.Header()); err != nil { + return err + } + if err := db.Delete(it.Key()); err != nil { + return err + } + } + // Lastly, upgrade the head block, disabling the upgrade mechanism + current := core.GetBlockByHashOld(db, head) + + if err := core.WriteTd(db, current.Hash(), current.NumberU64(), current.DeprecatedTd()); err != nil { + return err + } + if err := core.WriteBody(db, current.Hash(), current.NumberU64(), current.Body()); err != nil { + return err + } + if err := core.WriteHeader(db, current.Header()); err != nil { + return err + } + } + return nil +} + +func addMipmapBloomBins(db ethdb.Database) (err error) { + const mipmapVersion uint = 2 + + // check if the version is set. We ignore data for now since there's + // only one version so we can easily ignore it for now + var data []byte + data, _ = db.Get([]byte("setting-mipmap-version")) + if len(data) > 0 { + var version uint + if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion { + return nil + } + } + + defer func() { + if err == nil { + var val []byte + val, err = rlp.EncodeToBytes(mipmapVersion) + if err == nil { + err = db.Put([]byte("setting-mipmap-version"), val) + } + return + } + }() + latestHash := core.GetHeadBlockHash(db) + latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash)) + if latestBlock == nil { // clean database + return + } + + tstart := time.Now() + glog.V(logger.Info).Infoln("upgrading db log bloom bins") + for i := uint64(0); i <= latestBlock.NumberU64(); i++ { + hash := core.GetCanonicalHash(db, i) + if (hash == common.Hash{}) { + return fmt.Errorf("chain db corrupted. Could not find block %d.", i) + } + core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i)) + } + glog.V(logger.Info).Infoln("upgrade completed in", time.Since(tstart)) + return nil +} |