diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-07-15 00:39:53 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-15 00:39:53 +0800 |
commit | 0ff35e170d1b913082313089d13e3e6d5490839b (patch) | |
tree | 42b8eafa61c6e5894768c41a97d51e4e5427b50f /eth | |
parent | 8d6a5a3581ce6221786eb464bfef7e8c31e7ad95 (diff) | |
download | dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.gz dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.zst dexon-0ff35e170d1b913082313089d13e3e6d5490839b.zip |
core: remove redundant storage of transactions and receipts (#14801)
* core: remove redundant storage of transactions and receipts
* core, eth, internal: new transaction schema usage polishes
* eth: implement upgrade mechanism for db deduplication
* core, eth: drop old sequential key db upgrader
* eth: close last iterator on successful db upgrage
* core: prefix the lookup entries to make their purpose clearer
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 6 | ||||
-rw-r--r-- | eth/backend_test.go | 9 | ||||
-rw-r--r-- | eth/db_upgrade.go | 279 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 12 |
4 files changed, 84 insertions, 222 deletions
diff --git a/eth/backend.go b/eth/backend.go index 78478e86e..c7df517c0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -59,8 +59,8 @@ type LesServer interface { type Ethereum struct { chainConfig *params.ChainConfig // Channel for shutting down the service - shutdownChan chan bool // Channel for shutting down the ethereum - stopDbUpgrade func() // stop chain db sequential key upgrade + shutdownChan chan bool // Channel for shutting down the ethereum + stopDbUpgrade func() error // stop chain db sequential key upgrade // Handlers txPool *core.TxPool blockchain *core.BlockChain @@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if err != nil { return nil, err } - stopDbUpgrade := upgradeSequentialKeys(chainDb) + stopDbUpgrade := upgradeDeduplicateData(chainDb) chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr diff --git a/eth/backend_test.go b/eth/backend_test.go index f60e3214c..4351b24cf 100644 --- a/eth/backend_test.go +++ b/eth/backend_test.go @@ -33,24 +33,15 @@ func TestMipmapUpgrade(t *testing.T) { genesis := new(core.Genesis).MustCommit(db) chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) { - var receipts types.Receipts switch i { case 1: receipt := types.NewReceipt(nil, new(big.Int)) receipt.Logs = []*types.Log{{Address: addr}} gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} case 2: receipt := types.NewReceipt(nil, new(big.Int)) receipt.Logs = []*types.Log{{Address: addr}} gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} - } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) } }) for i, block := range chain { diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go index 82cdd7e55..90111b2b3 100644 --- a/eth/db_upgrade.go +++ b/eth/db_upgrade.go @@ -19,237 +19,120 @@ package eth import ( "bytes" - "encoding/binary" "fmt" - "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys") +var deduplicateData = []byte("dbUpgrade_20170714deduplicateData") -// upgradeSequentialKeys checks the chain database version and +// upgradeDeduplicateData checks the chain database version and // starts a background process to make upgrades if necessary. // Returns a stop function that blocks until the process has // been safely stopped. -func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { - data, _ := db.Get(useSequentialKeys) +func upgradeDeduplicateData(db ethdb.Database) func() error { + // If the database is already converted or empty, bail out + data, _ := db.Get(deduplicateData) if len(data) > 0 && data[0] == 42 { - return nil // already converted + return nil } - if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 { - db.Put(useSequentialKeys, []byte{42}) - return nil // empty database, nothing to do + db.Put(deduplicateData, []byte{42}) + return nil } - - log.Warn("Upgrading chain database to use sequential keys") - - stopChn := make(chan struct{}) - stoppedChn := make(chan struct{}) + // Start the deduplication upgrade on a new goroutine + log.Warn("Upgrading database to use lookup entries") + stop := make(chan chan error) go func() { - stopFn := func() bool { - select { - case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved - case <-stopChn: - return true - } - return false - } - - err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn) - if err == nil && !stopped { - err, stopped = upgradeSequentialBlocks(db, stopFn) - } - if err == nil && !stopped { - err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) - } - if err == nil && !stopped { - log.Info("Database conversion successful") - db.Put(useSequentialKeys, []byte{42}) - } - if err != nil { - log.Error("Database conversion failed", "err", err) - } - close(stoppedChn) - }() - - return func() { - close(stopChn) - <-stoppedChn - } -} - -// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from -// the database, writes them in new format and deletes the old ones if successful. -func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-num-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) < 20 { - cnt++ - if cnt%100000 == 0 { + // Create an iterator to read the entire database and covert old lookup entires + it := db.(*ethdb.LDBDatabase).NewIterator() + defer func() { + if it != nil { it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting canonical numbers", "count", cnt) } - number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() - newKey := []byte("h12345678n") - binary.BigEndian.PutUint64(newKey[1:9], number) - if err := db.Put(newKey, it.Value()); err != nil { - return err, false + }() + + var ( + converted uint64 + failed error + ) + for failed == nil && it.Next() { + // Skip any entries that don't look like old transaction meta entires (<hash>0x01) + key := it.Key() + if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 { + continue } - if err := db.Delete(keyPtr); err != nil { - return err, false + // Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>) + var meta struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 } - } - - if stopFn() { - return nil, true - } - it.Next() - } - if cnt > 0 { - log.Info("converted canonical numbers", "count", cnt) - } - return nil, false -} - -// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block -// receipts from the database, writes them in new format and deletes the old ones -// if successful. -func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) >= 38 { - cnt++ - if cnt%10000 == 0 { - it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting blocks", "count", cnt) + if err := rlp.DecodeBytes(it.Value(), &meta); err != nil { + continue } - // convert header, body, td and block receipts - var keyPrefix [38]byte - copy(keyPrefix[:], keyPtr[0:38]) - hash := keyPrefix[6:38] - if err := upgradeSequentialBlockData(db, hash); err != nil { - return err, false - } - // delete old db entries belonging to this hash - for bytes.HasPrefix(it.Key(), keyPrefix[:]) { - if err := db.Delete(it.Key()); err != nil { - return err, false + // Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix)) + hash := key[:common.HashLength] + + if hash[0] == byte('l') { + // Potential clash, the "old" `hash` must point to a live transaction. + if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) { + continue } - it.Next() } - if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil { - return err, false + // Convert the old metadata to a new lookup entry, delete duplicate data + if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry + if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data + if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data + if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata + break + } + } + } } - } else { - it.Next() - } - - if stopFn() { - return nil, true - } - } - if cnt > 0 { - log.Info("Converted blocks", "count", cnt) - } - return nil, false -} + // Bump the conversion counter, and recreate the iterator occasionally to + // avoid too high memory consumption. + converted++ + if converted%100000 == 0 { + it.Release() + it = db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(key) -// upgradeSequentialOrphanedReceipts removes any old format block receipts from the -// database that did not have a corresponding block -func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("receipts-block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer it.Release() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - // phase 2 already converted receipts belonging to existing - // blocks, just remove if there's anything left - cnt++ - if err := db.Delete(it.Key()); err != nil { - return err, false + log.Info("Deduplicating database entries", "deduped", converted) + } + // Check for termination, or continue after a bit of a timeout + select { + case errc := <-stop: + errc <- nil + return + case <-time.After(time.Microsecond * 100): + } } - - if stopFn() { - return nil, true + // Upgrade finished, mark a such and terminate + if failed == nil { + log.Info("Database deduplication successful", "deduped", converted) + db.Put(deduplicateData, []byte{42}) + } else { + log.Error("Database deduplication failed", "deduped", converted, "err", failed) } - it.Next() - } - if cnt > 0 { - log.Info("Removed orphaned block receipts", "count", cnt) - } - return nil, false -} + it.Release() + it = nil -// upgradeSequentialBlockData upgrades the header, body, td and block receipts -// database entries belonging to a single hash (doesn't delete old data). -func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error { - // get old chain data and block number - headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...)) - if len(headerRLP) == 0 { - return nil - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { - return err - } - number := header.Number.Uint64() - bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...)) - tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...)) - receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...)) - // store new hash -> number association - encNum := make([]byte, 8) - binary.BigEndian.PutUint64(encNum, number) - if err := db.Put(append([]byte("H"), hash...), encNum); err != nil { - return err - } - // store new chain data - if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil { - return err - } - if len(tdRLP) != 0 { - if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil { - return err - } - } - if len(bodyRLP) != 0 { - if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil { - return err - } - } - if len(receiptsRLP) != 0 { - if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil { - return err - } + errc := <-stop + errc <- failed + }() + // Assembly the cancellation callback + return func() error { + errc := make(chan error) + stop <- errc + return <-errc } - return nil } func addMipmapBloomBins(db ethdb.Database) (err error) { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index cd5e7cafd..b6cfd4bbc 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -82,12 +82,6 @@ func BenchmarkMipmaps(b *testing.B) { gen.AddUncheckedReceipt(receipt) } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - b.Fatal(err) - } core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { @@ -183,12 +177,6 @@ func TestFilters(t *testing.T) { gen.AddUncheckedReceipt(receipt) receipts = types.Receipts{receipt} } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) - } // i is used as block number for the writes but since the i // starts at 0 and block 0 (genesis) is already present increment // by one |