diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-07-15 00:39:53 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-07-15 00:39:53 +0800 |
commit | 0ff35e170d1b913082313089d13e3e6d5490839b (patch) | |
tree | 42b8eafa61c6e5894768c41a97d51e4e5427b50f | |
parent | 8d6a5a3581ce6221786eb464bfef7e8c31e7ad95 (diff) | |
download | dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.gz dexon-0ff35e170d1b913082313089d13e3e6d5490839b.tar.zst dexon-0ff35e170d1b913082313089d13e3e6d5490839b.zip |
core: remove redundant storage of transactions and receipts (#14801)
* core: remove redundant storage of transactions and receipts
* core, eth, internal: new transaction schema usage polishes
* eth: implement upgrade mechanism for db deduplication
* core, eth: drop old sequential key db upgrader
* eth: close last iterator on successful db upgrage
* core: prefix the lookup entries to make their purpose clearer
-rw-r--r-- | accounts/abi/bind/backends/simulated.go | 3 | ||||
-rw-r--r-- | core/blockchain.go | 33 | ||||
-rw-r--r-- | core/blockchain_test.go | 8 | ||||
-rw-r--r-- | core/database_util.go | 194 | ||||
-rw-r--r-- | core/database_util_test.go | 74 | ||||
-rw-r--r-- | eth/backend.go | 6 | ||||
-rw-r--r-- | eth/backend_test.go | 9 | ||||
-rw-r--r-- | eth/db_upgrade.go | 279 | ||||
-rw-r--r-- | eth/filters/filter_test.go | 12 | ||||
-rw-r--r-- | internal/ethapi/api.go | 214 | ||||
-rw-r--r-- | light/txpool.go | 66 | ||||
-rw-r--r-- | miner/worker.go | 4 |
12 files changed, 274 insertions, 628 deletions
diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 7ac8b5820..14bf7bd75 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -144,7 +144,8 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres // TransactionReceipt returns the receipt of a transaction. func (b *SimulatedBackend) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - return core.GetReceipt(b.database, txHash), nil + receipt, _, _, _ := core.GetReceipt(b.database, txHash) + return receipt, nil } // PendingCodeAt returns the code associated with an account in the pending state. diff --git a/core/blockchain.go b/core/blockchain.go index 6772ea284..bb1c14f43 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -759,16 +759,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ log.Crit("Failed to write log blooms", "err", err) return } - if err := WriteTransactions(bc.chainDb, block); err != nil { - errs[index] = fmt.Errorf("failed to write individual transactions: %v", err) + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { + errs[index] = fmt.Errorf("failed to write lookup metadata: %v", err) atomic.AddInt32(&failed, 1) - log.Crit("Failed to write individual transactions", "err", err) - return - } - if err := WriteReceipts(bc.chainDb, receipts); err != nil { - errs[index] = fmt.Errorf("failed to write individual receipts: %v", err) - atomic.AddInt32(&failed, 1) - log.Crit("Failed to write individual receipts", "err", err) + log.Crit("Failed to write lookup metadata", "err", err) return } atomic.AddInt32(&stats.processed, 1) @@ -1002,12 +996,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) - // This puts transactions in a extra db for rpc - if err := WriteTransactions(bc.chainDb, block); err != nil { - return i, err - } - // store the receipts - if err := WriteReceipts(bc.chainDb, receipts); err != nil { + // Write the positional metadata for transaction and receipt lookups + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { return i, err } // Write map map bloom filters @@ -1167,16 +1157,12 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { for _, block := range newChain { // insert the block in the canonical way, re-writing history bc.insert(block) - // write canonical receipts and transactions - if err := WriteTransactions(bc.chainDb, block); err != nil { - return err - } - receipts := GetBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64()) - // write receipts - if err := WriteReceipts(bc.chainDb, receipts); err != nil { + // write lookup entries for hash based transaction/receipt searches + if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { return err } // Write map map bloom filters + receipts := GetBlockReceipts(bc.chainDb, block.Hash(), block.NumberU64()) if err := WriteMipmapBloom(bc.chainDb, block.NumberU64(), receipts); err != nil { return err } @@ -1188,8 +1174,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // When transactions get deleted from the database that means the // receipts that were created in the fork must also be deleted for _, tx := range diff { - DeleteReceipt(bc.chainDb, tx.Hash()) - DeleteTransaction(bc.chainDb, tx.Hash()) + DeleteTxLookupEntry(bc.chainDb, tx.Hash()) } // Must be posted in a goroutine because of the transaction pool trying // to acquire the chain manager lock diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 371522ab7..5fa671e2b 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -806,8 +806,8 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil { t.Errorf("drop %d: tx %v found while shouldn't have been", i, txn) } - if GetReceipt(db, tx.Hash()) != nil { - t.Errorf("drop %d: receipt found while shouldn't have been", i) + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt != nil { + t.Errorf("drop %d: receipt %v found while shouldn't have been", i, rcpt) } } // added tx @@ -815,7 +815,7 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil { t.Errorf("add %d: expected tx to be found", i) } - if GetReceipt(db, tx.Hash()) == nil { + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt == nil { t.Errorf("add %d: expected receipt to be found", i) } } @@ -824,7 +824,7 @@ func TestChainTxReorgs(t *testing.T) { if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn == nil { t.Errorf("share %d: expected tx to be found", i) } - if GetReceipt(db, tx.Hash()) == nil { + if rcpt, _, _, _ := GetReceipt(db, tx.Hash()); rcpt == nil { t.Errorf("share %d: expected receipt to be found", i) } } diff --git a/core/database_util.go b/core/database_util.go index b4a230c9c..697111394 100644 --- a/core/database_util.go +++ b/core/database_util.go @@ -45,24 +45,17 @@ var ( blockHashPrefix = []byte("H") // blockHashPrefix + hash -> num (uint64 big endian) bodyPrefix = []byte("b") // bodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts + lookupPrefix = []byte("l") // lookupPrefix + hash -> transaction/receipt lookup metadata preimagePrefix = "secure-key-" // preimagePrefix + hash -> preimage - txMetaSuffix = []byte{0x01} - receiptsPrefix = []byte("receipts-") - mipmapPre = []byte("mipmap-log-bloom-") MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000} configPrefix = []byte("ethereum-config-") // config prefix for the db - // used by old (non-sequential keys) db, now only used for conversion - oldBlockPrefix = []byte("block-") - oldHeaderSuffix = []byte("-header") - oldTdSuffix = []byte("-td") // headerPrefix + num (uint64 big endian) + hash + tdSuffix -> td - oldBodySuffix = []byte("-body") - oldBlockNumPrefix = []byte("block-num-") - oldBlockReceiptsPrefix = []byte("receipts-block-") - oldBlockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually] + // used by old db, now only used for conversion + oldReceiptsPrefix = []byte("receipts-") + oldTxMetaSuffix = []byte{0x01} ErrChainConfigNotFound = errors.New("ChainConfig not found") // general config not found error @@ -72,6 +65,14 @@ var ( preimageHitCounter = metrics.NewCounter("db/preimage/hits") ) +// txLookupEntry is a positional metadata to help looking up the data content of +// a transaction or receipt given only its hash. +type txLookupEntry struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 +} + // encodeBlockNumber encodes a block number as big endian uint64 func encodeBlockNumber(number uint64) []byte { enc := make([]byte, 8) @@ -83,10 +84,7 @@ func encodeBlockNumber(number uint64) []byte { func GetCanonicalHash(db ethdb.Database, number uint64) common.Hash { data, _ := db.Get(append(append(headerPrefix, encodeBlockNumber(number)...), numSuffix...)) if len(data) == 0 { - data, _ = db.Get(append(oldBlockNumPrefix, big.NewInt(int64(number)).Bytes()...)) - if len(data) == 0 { - return common.Hash{} - } + return common.Hash{} } return common.BytesToHash(data) } @@ -100,15 +98,7 @@ const missingNumber = uint64(0xffffffffffffffff) func GetBlockNumber(db ethdb.Database, hash common.Hash) uint64 { data, _ := db.Get(append(blockHashPrefix, hash.Bytes()...)) if len(data) != 8 { - data, _ := db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldHeaderSuffix...)) - if len(data) == 0 { - return missingNumber - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(data), header); err != nil { - log.Crit("Failed to decode block header", "err", err) - } - return header.Number.Uint64() + return missingNumber } return binary.BigEndian.Uint64(data) } @@ -151,9 +141,6 @@ func GetHeadFastBlockHash(db ethdb.Database) common.Hash { // if the header's not found. func GetHeaderRLP(db ethdb.Database, hash common.Hash, number uint64) rlp.RawValue { data, _ := db.Get(append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) - if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldHeaderSuffix...)) - } return data } @@ -175,9 +162,6 @@ func GetHeader(db ethdb.Database, hash common.Hash, number uint64) *types.Header // GetBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func GetBodyRLP(db ethdb.Database, hash common.Hash, number uint64) rlp.RawValue { data, _ := db.Get(append(append(bodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) - if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldBodySuffix...)) - } return data } @@ -201,10 +185,7 @@ func GetBody(db ethdb.Database, hash common.Hash, number uint64) *types.Body { func GetTd(db ethdb.Database, hash common.Hash, number uint64) *big.Int { data, _ := db.Get(append(append(append(headerPrefix, encodeBlockNumber(number)...), hash[:]...), tdSuffix...)) if len(data) == 0 { - data, _ = db.Get(append(append(oldBlockPrefix, hash.Bytes()...), oldTdSuffix...)) - if len(data) == 0 { - return nil - } + return nil } td := new(big.Int) if err := rlp.Decode(bytes.NewReader(data), td); err != nil { @@ -239,10 +220,7 @@ func GetBlock(db ethdb.Database, hash common.Hash, number uint64) *types.Block { func GetBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) types.Receipts { data, _ := db.Get(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash[:]...)) if len(data) == 0 { - data, _ = db.Get(append(oldBlockReceiptsPrefix, hash.Bytes()...)) - if len(data) == 0 { - return nil - } + return nil } storageReceipts := []*types.ReceiptForStorage{} if err := rlp.DecodeBytes(data, &storageReceipts); err != nil { @@ -256,10 +234,38 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) types. return receipts } +// GetTxLookupEntry retrieves the positional metadata associated with a transaction +// hash to allow retrieving the transaction or receipt by hash. +func GetTxLookupEntry(db ethdb.Database, hash common.Hash) (common.Hash, uint64, uint64) { + // Load the positional metadata from disk and bail if it fails + data, _ := db.Get(append(lookupPrefix, hash.Bytes()...)) + if len(data) == 0 { + return common.Hash{}, 0, 0 + } + // Parse and return the contents of the lookup entry + var entry txLookupEntry + if err := rlp.DecodeBytes(data, &entry); err != nil { + log.Error("Invalid lookup entry RLP", "hash", hash, "err", err) + return common.Hash{}, 0, 0 + } + return entry.BlockHash, entry.BlockIndex, entry.Index +} + // GetTransaction retrieves a specific transaction from the database, along with // its added positional metadata. func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) { - // Retrieve the transaction itself from the database + // Retrieve the lookup metadata and resolve the transaction from the body + blockHash, blockNumber, txIndex := GetTxLookupEntry(db, hash) + + if blockHash != (common.Hash{}) { + body := GetBody(db, blockHash, blockNumber) + if body == nil || len(body.Transactions) <= int(txIndex) { + log.Error("Transaction referenced missing", "number", blockNumber, "hash", blockHash, "index", txIndex) + return nil, common.Hash{}, 0, 0 + } + return body.Transactions[txIndex], blockHash, blockNumber, txIndex + } + // Old transaction representation, load the transaction and it's metadata separately data, _ := db.Get(hash.Bytes()) if len(data) == 0 { return nil, common.Hash{}, 0, 0 @@ -269,33 +275,42 @@ func GetTransaction(db ethdb.Database, hash common.Hash) (*types.Transaction, co return nil, common.Hash{}, 0, 0 } // Retrieve the blockchain positional metadata - data, _ = db.Get(append(hash.Bytes(), txMetaSuffix...)) + data, _ = db.Get(append(hash.Bytes(), oldTxMetaSuffix...)) if len(data) == 0 { return nil, common.Hash{}, 0, 0 } - var meta struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - } - if err := rlp.DecodeBytes(data, &meta); err != nil { + var entry txLookupEntry + if err := rlp.DecodeBytes(data, &entry); err != nil { return nil, common.Hash{}, 0, 0 } - return &tx, meta.BlockHash, meta.BlockIndex, meta.Index + return &tx, entry.BlockHash, entry.BlockIndex, entry.Index } -// GetReceipt returns a receipt by hash -func GetReceipt(db ethdb.Database, hash common.Hash) *types.Receipt { - data, _ := db.Get(append(receiptsPrefix, hash[:]...)) +// GetReceipt retrieves a specific transaction receipt from the database, along with +// its added positional metadata. +func GetReceipt(db ethdb.Database, hash common.Hash) (*types.Receipt, common.Hash, uint64, uint64) { + // Retrieve the lookup metadata and resolve the receipt from the receipts + blockHash, blockNumber, receiptIndex := GetTxLookupEntry(db, hash) + + if blockHash != (common.Hash{}) { + receipts := GetBlockReceipts(db, blockHash, blockNumber) + if len(receipts) <= int(receiptIndex) { + log.Error("Receipt refereced missing", "number", blockNumber, "hash", blockHash, "index", receiptIndex) + return nil, common.Hash{}, 0, 0 + } + return receipts[receiptIndex], blockHash, blockNumber, receiptIndex + } + // Old receipt representation, load the receipt and set an unknown metadata + data, _ := db.Get(append(oldReceiptsPrefix, hash[:]...)) if len(data) == 0 { - return nil + return nil, common.Hash{}, 0, 0 } var receipt types.ReceiptForStorage err := rlp.DecodeBytes(data, &receipt) if err != nil { log.Error("Invalid receipt RLP", "hash", hash, "err", err) } - return (*types.Receipt)(&receipt) + return (*types.Receipt)(&receipt), common.Hash{}, 0, 0 } // WriteCanonicalHash stores the canonical hash for the given block number. @@ -416,76 +431,29 @@ func WriteBlockReceipts(db ethdb.Database, hash common.Hash, number uint64, rece return nil } -// WriteTransactions stores the transactions associated with a specific block -// into the given database. Beside writing the transaction, the function also -// stores a metadata entry along with the transaction, detailing the position -// of this within the blockchain. -func WriteTransactions(db ethdb.Database, block *types.Block) error { +// WriteTxLookupEntries stores a positional metadata for every transaction from +// a block, enabling hash based transaction and receipt lookups. +func WriteTxLookupEntries(db ethdb.Database, block *types.Block) error { batch := db.NewBatch() - // Iterate over each transaction and encode it with its metadata + // Iterate over each transaction and encode its metadata for i, tx := range block.Transactions() { - // Encode and queue up the transaction for storage - data, err := rlp.EncodeToBytes(tx) - if err != nil { - return err - } - if err = batch.Put(tx.Hash().Bytes(), data); err != nil { - return err - } - // Encode and queue up the transaction metadata for storage - meta := struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - }{ + entry := txLookupEntry{ BlockHash: block.Hash(), BlockIndex: block.NumberU64(), Index: uint64(i), } - data, err = rlp.EncodeToBytes(meta) - if err != nil { - return err - } - if err := batch.Put(append(tx.Hash().Bytes(), txMetaSuffix...), data); err != nil { - return err - } - } - // Write the scheduled data into the database - if err := batch.Write(); err != nil { - log.Crit("Failed to store transactions", "err", err) - } - return nil -} - -// WriteReceipt stores a single transaction receipt into the database. -func WriteReceipt(db ethdb.Database, receipt *types.Receipt) error { - storageReceipt := (*types.ReceiptForStorage)(receipt) - data, err := rlp.EncodeToBytes(storageReceipt) - if err != nil { - return err - } - return db.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data) -} - -// WriteReceipts stores a batch of transaction receipts into the database. -func WriteReceipts(db ethdb.Database, receipts types.Receipts) error { - batch := db.NewBatch() - - // Iterate over all the receipts and queue them for database injection - for _, receipt := range receipts { - storageReceipt := (*types.ReceiptForStorage)(receipt) - data, err := rlp.EncodeToBytes(storageReceipt) + data, err := rlp.EncodeToBytes(entry) if err != nil { return err } - if err := batch.Put(append(receiptsPrefix, receipt.TxHash.Bytes()...), data); err != nil { + if err := batch.Put(append(lookupPrefix, tx.Hash().Bytes()...), data); err != nil { return err } } // Write the scheduled data into the database if err := batch.Write(); err != nil { - log.Crit("Failed to store receipts", "err", err) + log.Crit("Failed to store lookup entries", "err", err) } return nil } @@ -524,15 +492,9 @@ func DeleteBlockReceipts(db ethdb.Database, hash common.Hash, number uint64) { db.Delete(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) } -// DeleteTransaction removes all transaction data associated with a hash. -func DeleteTransaction(db ethdb.Database, hash common.Hash) { - db.Delete(hash.Bytes()) - db.Delete(append(hash.Bytes(), txMetaSuffix...)) -} - -// DeleteReceipt removes all receipt data associated with a transaction hash. -func DeleteReceipt(db ethdb.Database, hash common.Hash) { - db.Delete(append(receiptsPrefix, hash.Bytes()...)) +// DeleteTxLookupEntry removes all transaction data associated with a hash. +func DeleteTxLookupEntry(db ethdb.Database, hash common.Hash) { + db.Delete(append(lookupPrefix, hash.Bytes()...)) } // returns a formatted MIP mapped key by adding prefix, canonical number and level diff --git a/core/database_util_test.go b/core/database_util_test.go index 9f16b660a..e9a6df97b 100644 --- a/core/database_util_test.go +++ b/core/database_util_test.go @@ -290,8 +290,8 @@ func TestHeadStorage(t *testing.T) { } } -// Tests that transactions and associated metadata can be stored and retrieved. -func TestTransactionStorage(t *testing.T) { +// Tests that positional lookup metadata can be stored and retrieved. +func TestLookupStorage(t *testing.T) { db, _ := ethdb.NewMemDatabase() tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), big.NewInt(111), big.NewInt(1111), big.NewInt(11111), []byte{0x11, 0x11, 0x11}) @@ -308,7 +308,10 @@ func TestTransactionStorage(t *testing.T) { } } // Insert all the transactions into the database, and verify contents - if err := WriteTransactions(db, block); err != nil { + if err := WriteBlock(db, block); err != nil { + t.Fatalf("failed to write block contents: %v", err) + } + if err := WriteTxLookupEntries(db, block); err != nil { t.Fatalf("failed to write transactions: %v", err) } for i, tx := range txs { @@ -325,72 +328,13 @@ func TestTransactionStorage(t *testing.T) { } // Delete the transactions and check purge for i, tx := range txs { - DeleteTransaction(db, tx.Hash()) + DeleteTxLookupEntry(db, tx.Hash()) if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil { t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn) } } } -// Tests that receipts can be stored and retrieved. -func TestReceiptStorage(t *testing.T) { - db, _ := ethdb.NewMemDatabase() - - receipt1 := &types.Receipt{ - PostState: []byte{0x01}, - CumulativeGasUsed: big.NewInt(1), - Logs: []*types.Log{ - {Address: common.BytesToAddress([]byte{0x11})}, - {Address: common.BytesToAddress([]byte{0x01, 0x11})}, - }, - TxHash: common.BytesToHash([]byte{0x11, 0x11}), - ContractAddress: common.BytesToAddress([]byte{0x01, 0x11, 0x11}), - GasUsed: big.NewInt(111111), - } - receipt2 := &types.Receipt{ - PostState: []byte{0x02}, - CumulativeGasUsed: big.NewInt(2), - Logs: []*types.Log{ - {Address: common.BytesToAddress([]byte{0x22})}, - {Address: common.BytesToAddress([]byte{0x02, 0x22})}, - }, - TxHash: common.BytesToHash([]byte{0x22, 0x22}), - ContractAddress: common.BytesToAddress([]byte{0x02, 0x22, 0x22}), - GasUsed: big.NewInt(222222), - } - receipts := []*types.Receipt{receipt1, receipt2} - - // Check that no receipt entries are in a pristine database - for i, receipt := range receipts { - if r := GetReceipt(db, receipt.TxHash); r != nil { - t.Fatalf("receipt #%d [%x]: non existent receipt returned: %v", i, receipt.TxHash, r) - } - } - // Insert all the receipts into the database, and verify contents - if err := WriteReceipts(db, receipts); err != nil { - t.Fatalf("failed to write receipts: %v", err) - } - for i, receipt := range receipts { - if r := GetReceipt(db, receipt.TxHash); r == nil { - t.Fatalf("receipt #%d [%x]: receipt not found", i, receipt.TxHash) - } else { - rlpHave, _ := rlp.EncodeToBytes(r) - rlpWant, _ := rlp.EncodeToBytes(receipt) - - if !bytes.Equal(rlpHave, rlpWant) { - t.Fatalf("receipt #%d [%x]: receipt mismatch: have %v, want %v", i, receipt.TxHash, r, receipt) - } - } - } - // Delete the receipts and check purge - for i, receipt := range receipts { - DeleteReceipt(db, receipt.TxHash) - if r := GetReceipt(db, receipt.TxHash); r != nil { - t.Fatalf("receipt #%d [%x]: deleted receipt returned: %v", i, receipt.TxHash, r) - } - } -} - // Tests that receipts associated with a single block can be stored and retrieved. func TestBlockReceiptStorage(t *testing.T) { db, _ := ethdb.NewMemDatabase() @@ -530,10 +474,6 @@ func TestMipmapChain(t *testing.T) { } // store the receipts - err := WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) - } WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { diff --git a/eth/backend.go b/eth/backend.go index 78478e86e..c7df517c0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -59,8 +59,8 @@ type LesServer interface { type Ethereum struct { chainConfig *params.ChainConfig // Channel for shutting down the service - shutdownChan chan bool // Channel for shutting down the ethereum - stopDbUpgrade func() // stop chain db sequential key upgrade + shutdownChan chan bool // Channel for shutting down the ethereum + stopDbUpgrade func() error // stop chain db sequential key upgrade // Handlers txPool *core.TxPool blockchain *core.BlockChain @@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if err != nil { return nil, err } - stopDbUpgrade := upgradeSequentialKeys(chainDb) + stopDbUpgrade := upgradeDeduplicateData(chainDb) chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr diff --git a/eth/backend_test.go b/eth/backend_test.go index f60e3214c..4351b24cf 100644 --- a/eth/backend_test.go +++ b/eth/backend_test.go @@ -33,24 +33,15 @@ func TestMipmapUpgrade(t *testing.T) { genesis := new(core.Genesis).MustCommit(db) chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) { - var receipts types.Receipts switch i { case 1: receipt := types.NewReceipt(nil, new(big.Int)) receipt.Logs = []*types.Log{{Address: addr}} gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} case 2: receipt := types.NewReceipt(nil, new(big.Int)) receipt.Logs = []*types.Log{{Address: addr}} gen.AddUncheckedReceipt(receipt) - receipts = types.Receipts{receipt} - } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) } }) for i, block := range chain { diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go index 82cdd7e55..90111b2b3 100644 --- a/eth/db_upgrade.go +++ b/eth/db_upgrade.go @@ -19,237 +19,120 @@ package eth import ( "bytes" - "encoding/binary" "fmt" - "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -var useSequentialKeys = []byte("dbUpgrade_20160530sequentialKeys") +var deduplicateData = []byte("dbUpgrade_20170714deduplicateData") -// upgradeSequentialKeys checks the chain database version and +// upgradeDeduplicateData checks the chain database version and // starts a background process to make upgrades if necessary. // Returns a stop function that blocks until the process has // been safely stopped. -func upgradeSequentialKeys(db ethdb.Database) (stopFn func()) { - data, _ := db.Get(useSequentialKeys) +func upgradeDeduplicateData(db ethdb.Database) func() error { + // If the database is already converted or empty, bail out + data, _ := db.Get(deduplicateData) if len(data) > 0 && data[0] == 42 { - return nil // already converted + return nil } - if data, _ := db.Get([]byte("LastHeader")); len(data) == 0 { - db.Put(useSequentialKeys, []byte{42}) - return nil // empty database, nothing to do + db.Put(deduplicateData, []byte{42}) + return nil } - - log.Warn("Upgrading chain database to use sequential keys") - - stopChn := make(chan struct{}) - stoppedChn := make(chan struct{}) + // Start the deduplication upgrade on a new goroutine + log.Warn("Upgrading database to use lookup entries") + stop := make(chan chan error) go func() { - stopFn := func() bool { - select { - case <-time.After(time.Microsecond * 100): // make sure other processes don't get starved - case <-stopChn: - return true - } - return false - } - - err, stopped := upgradeSequentialCanonicalNumbers(db, stopFn) - if err == nil && !stopped { - err, stopped = upgradeSequentialBlocks(db, stopFn) - } - if err == nil && !stopped { - err, stopped = upgradeSequentialOrphanedReceipts(db, stopFn) - } - if err == nil && !stopped { - log.Info("Database conversion successful") - db.Put(useSequentialKeys, []byte{42}) - } - if err != nil { - log.Error("Database conversion failed", "err", err) - } - close(stoppedChn) - }() - - return func() { - close(stopChn) - <-stoppedChn - } -} - -// upgradeSequentialCanonicalNumbers reads all old format canonical numbers from -// the database, writes them in new format and deletes the old ones if successful. -func upgradeSequentialCanonicalNumbers(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-num-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) < 20 { - cnt++ - if cnt%100000 == 0 { + // Create an iterator to read the entire database and covert old lookup entires + it := db.(*ethdb.LDBDatabase).NewIterator() + defer func() { + if it != nil { it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting canonical numbers", "count", cnt) } - number := big.NewInt(0).SetBytes(keyPtr[10:]).Uint64() - newKey := []byte("h12345678n") - binary.BigEndian.PutUint64(newKey[1:9], number) - if err := db.Put(newKey, it.Value()); err != nil { - return err, false + }() + + var ( + converted uint64 + failed error + ) + for failed == nil && it.Next() { + // Skip any entries that don't look like old transaction meta entires (<hash>0x01) + key := it.Key() + if len(key) != common.HashLength+1 || key[common.HashLength] != 0x01 { + continue } - if err := db.Delete(keyPtr); err != nil { - return err, false + // Skip any entries that don't contain metadata (name clash between <hash>0x01 and <some-prefix><hash>) + var meta struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 } - } - - if stopFn() { - return nil, true - } - it.Next() - } - if cnt > 0 { - log.Info("converted canonical numbers", "count", cnt) - } - return nil, false -} - -// upgradeSequentialBlocks reads all old format block headers, bodies, TDs and block -// receipts from the database, writes them in new format and deletes the old ones -// if successful. -func upgradeSequentialBlocks(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer func() { - it.Release() - }() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - keyPtr := it.Key() - if len(keyPtr) >= 38 { - cnt++ - if cnt%10000 == 0 { - it.Release() - it = db.(*ethdb.LDBDatabase).NewIterator() - it.Seek(keyPtr) - log.Info("Converting blocks", "count", cnt) + if err := rlp.DecodeBytes(it.Value(), &meta); err != nil { + continue } - // convert header, body, td and block receipts - var keyPrefix [38]byte - copy(keyPrefix[:], keyPtr[0:38]) - hash := keyPrefix[6:38] - if err := upgradeSequentialBlockData(db, hash); err != nil { - return err, false - } - // delete old db entries belonging to this hash - for bytes.HasPrefix(it.Key(), keyPrefix[:]) { - if err := db.Delete(it.Key()); err != nil { - return err, false + // Skip any already upgraded entries (clash due to <hash> ending with 0x01 (old suffix)) + hash := key[:common.HashLength] + + if hash[0] == byte('l') { + // Potential clash, the "old" `hash` must point to a live transaction. + if tx, _, _, _ := core.GetTransaction(db, common.BytesToHash(hash)); tx == nil || !bytes.Equal(tx.Hash().Bytes(), hash) { + continue } - it.Next() } - if err := db.Delete(append([]byte("receipts-block-"), hash...)); err != nil { - return err, false + // Convert the old metadata to a new lookup entry, delete duplicate data + if failed = db.Put(append([]byte("l"), hash...), it.Value()); failed == nil { // Write the new looku entry + if failed = db.Delete(hash); failed == nil { // Delete the duplicate transaction data + if failed = db.Delete(append([]byte("receipts-"), hash...)); failed == nil { // Delete the duplicate receipt data + if failed = db.Delete(key); failed != nil { // Delete the old transaction metadata + break + } + } + } } - } else { - it.Next() - } - - if stopFn() { - return nil, true - } - } - if cnt > 0 { - log.Info("Converted blocks", "count", cnt) - } - return nil, false -} + // Bump the conversion counter, and recreate the iterator occasionally to + // avoid too high memory consumption. + converted++ + if converted%100000 == 0 { + it.Release() + it = db.(*ethdb.LDBDatabase).NewIterator() + it.Seek(key) -// upgradeSequentialOrphanedReceipts removes any old format block receipts from the -// database that did not have a corresponding block -func upgradeSequentialOrphanedReceipts(db ethdb.Database, stopFn func() bool) (error, bool) { - prefix := []byte("receipts-block-") - it := db.(*ethdb.LDBDatabase).NewIterator() - defer it.Release() - it.Seek(prefix) - cnt := 0 - for bytes.HasPrefix(it.Key(), prefix) { - // phase 2 already converted receipts belonging to existing - // blocks, just remove if there's anything left - cnt++ - if err := db.Delete(it.Key()); err != nil { - return err, false + log.Info("Deduplicating database entries", "deduped", converted) + } + // Check for termination, or continue after a bit of a timeout + select { + case errc := <-stop: + errc <- nil + return + case <-time.After(time.Microsecond * 100): + } } - - if stopFn() { - return nil, true + // Upgrade finished, mark a such and terminate + if failed == nil { + log.Info("Database deduplication successful", "deduped", converted) + db.Put(deduplicateData, []byte{42}) + } else { + log.Error("Database deduplication failed", "deduped", converted, "err", failed) } - it.Next() - } - if cnt > 0 { - log.Info("Removed orphaned block receipts", "count", cnt) - } - return nil, false -} + it.Release() + it = nil -// upgradeSequentialBlockData upgrades the header, body, td and block receipts -// database entries belonging to a single hash (doesn't delete old data). -func upgradeSequentialBlockData(db ethdb.Database, hash []byte) error { - // get old chain data and block number - headerRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-header")...)) - if len(headerRLP) == 0 { - return nil - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil { - return err - } - number := header.Number.Uint64() - bodyRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-body")...)) - tdRLP, _ := db.Get(append(append([]byte("block-"), hash...), []byte("-td")...)) - receiptsRLP, _ := db.Get(append([]byte("receipts-block-"), hash...)) - // store new hash -> number association - encNum := make([]byte, 8) - binary.BigEndian.PutUint64(encNum, number) - if err := db.Put(append([]byte("H"), hash...), encNum); err != nil { - return err - } - // store new chain data - if err := db.Put(append(append([]byte("h"), encNum...), hash...), headerRLP); err != nil { - return err - } - if len(tdRLP) != 0 { - if err := db.Put(append(append(append([]byte("h"), encNum...), hash...), []byte("t")...), tdRLP); err != nil { - return err - } - } - if len(bodyRLP) != 0 { - if err := db.Put(append(append([]byte("b"), encNum...), hash...), bodyRLP); err != nil { - return err - } - } - if len(receiptsRLP) != 0 { - if err := db.Put(append(append([]byte("r"), encNum...), hash...), receiptsRLP); err != nil { - return err - } + errc := <-stop + errc <- failed + }() + // Assembly the cancellation callback + return func() error { + errc := make(chan error) + stop <- errc + return <-errc } - return nil } func addMipmapBloomBins(db ethdb.Database) (err error) { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index cd5e7cafd..b6cfd4bbc 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -82,12 +82,6 @@ func BenchmarkMipmaps(b *testing.B) { gen.AddUncheckedReceipt(receipt) } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - b.Fatal(err) - } core.WriteMipmapBloom(db, uint64(i+1), receipts) }) for i, block := range chain { @@ -183,12 +177,6 @@ func TestFilters(t *testing.T) { gen.AddUncheckedReceipt(receipt) receipts = types.Receipts{receipt} } - - // store the receipts - err := core.WriteReceipts(db, receipts) - if err != nil { - t.Fatal(err) - } // i is used as block number for the writes but since the i // starts at 0 and block 0 (genesis) is already present increment // by one diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index c22c56dfb..1b23ac559 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -17,7 +17,6 @@ package ethapi import ( - "bytes" "context" "errors" "fmt" @@ -35,7 +34,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" @@ -769,7 +767,7 @@ func (s *PublicBlockChainAPI) rpcOutputBlock(b *types.Block, inclTx bool, fullTx if fullTx { formatTx = func(tx *types.Transaction) (interface{}, error) { - return newRPCTransaction(b, tx.Hash()) + return newRPCTransactionFromBlockHash(b, tx.Hash()), nil } } @@ -812,15 +810,17 @@ type RPCTransaction struct { S *hexutil.Big `json:"s"` } -// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation -func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction { +// newRPCTransaction returns a transaction that will serialize to the RPC +// representation, with the given location metadata set (if available). +func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber uint64, index uint64) *RPCTransaction { var signer types.Signer = types.FrontierSigner{} if tx.Protected() { signer = types.NewEIP155Signer(tx.ChainId()) } from, _ := types.Sender(signer, tx) v, r, s := tx.RawSignatureValues() - return &RPCTransaction{ + + result := &RPCTransaction{ From: from, Gas: (*hexutil.Big)(tx.Gas()), GasPrice: (*hexutil.Big)(tx.GasPrice()), @@ -833,58 +833,46 @@ func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction { R: (*hexutil.Big)(r), S: (*hexutil.Big)(s), } + if blockHash != (common.Hash{}) { + result.BlockHash = blockHash + result.BlockNumber = (*hexutil.Big)(new(big.Int).SetUint64(blockNumber)) + result.TransactionIndex = hexutil.Uint(index) + } + return result } -// newRPCTransaction returns a transaction that will serialize to the RPC representation. -func newRPCTransactionFromBlockIndex(b *types.Block, txIndex uint) (*RPCTransaction, error) { - if txIndex < uint(len(b.Transactions())) { - tx := b.Transactions()[txIndex] - var signer types.Signer = types.FrontierSigner{} - if tx.Protected() { - signer = types.NewEIP155Signer(tx.ChainId()) - } - from, _ := types.Sender(signer, tx) - v, r, s := tx.RawSignatureValues() - return &RPCTransaction{ - BlockHash: b.Hash(), - BlockNumber: (*hexutil.Big)(b.Number()), - From: from, - Gas: (*hexutil.Big)(tx.Gas()), - GasPrice: (*hexutil.Big)(tx.GasPrice()), - Hash: tx.Hash(), - Input: hexutil.Bytes(tx.Data()), - Nonce: hexutil.Uint64(tx.Nonce()), - To: tx.To(), - TransactionIndex: hexutil.Uint(txIndex), - Value: (*hexutil.Big)(tx.Value()), - V: (*hexutil.Big)(v), - R: (*hexutil.Big)(r), - S: (*hexutil.Big)(s), - }, nil - } - - return nil, nil +// newRPCPendingTransaction returns a pending transaction that will serialize to the RPC representation +func newRPCPendingTransaction(tx *types.Transaction) *RPCTransaction { + return newRPCTransaction(tx, common.Hash{}, 0, 0) } -// newRPCRawTransactionFromBlockIndex returns the bytes of a transaction given a block and a transaction index. -func newRPCRawTransactionFromBlockIndex(b *types.Block, txIndex uint) (hexutil.Bytes, error) { - if txIndex < uint(len(b.Transactions())) { - tx := b.Transactions()[txIndex] - return rlp.EncodeToBytes(tx) +// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation. +func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransaction { + txs := b.Transactions() + if index >= uint64(len(txs)) { + return nil } + return newRPCTransaction(txs[index], b.Hash(), b.NumberU64(), index) +} - return nil, nil +// newRPCRawTransactionFromBlockIndex returns the bytes of a transaction given a block and a transaction index. +func newRPCRawTransactionFromBlockIndex(b *types.Block, index uint64) hexutil.Bytes { + txs := b.Transactions() + if index >= uint64(len(txs)) { + return nil + } + blob, _ := rlp.EncodeToBytes(txs[index]) + return blob } -// newRPCTransaction returns a transaction that will serialize to the RPC representation. -func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, error) { +// newRPCTransactionFromBlockHash returns a transaction that will serialize to the RPC representation. +func newRPCTransactionFromBlockHash(b *types.Block, hash common.Hash) *RPCTransaction { for idx, tx := range b.Transactions() { - if tx.Hash() == txHash { - return newRPCTransactionFromBlockIndex(b, uint(idx)) + if tx.Hash() == hash { + return newRPCTransactionFromBlockIndex(b, uint64(idx)) } } - - return nil, nil + return nil } // PublicTransactionPoolAPI exposes methods for the RPC interface @@ -898,24 +886,6 @@ func NewPublicTransactionPoolAPI(b Backend, nonceLock *AddrLocker) *PublicTransa return &PublicTransactionPoolAPI{b, nonceLock} } -func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) { - txData, err := chainDb.Get(txHash.Bytes()) - isPending := false - tx := new(types.Transaction) - - if err == nil && len(txData) > 0 { - if err := rlp.DecodeBytes(txData, tx); err != nil { - return nil, isPending, err - } - } else { - // pending transaction? - tx = b.GetPoolTransaction(txHash) - isPending = true - } - - return tx, isPending, nil -} - // GetBlockTransactionCountByNumber returns the number of transactions in the block with the given block number. func (s *PublicTransactionPoolAPI) GetBlockTransactionCountByNumber(ctx context.Context, blockNr rpc.BlockNumber) *hexutil.Uint { if block, _ := s.b.BlockByNumber(ctx, blockNr); block != nil { @@ -935,35 +905,35 @@ func (s *PublicTransactionPoolAPI) GetBlockTransactionCountByHash(ctx context.Co } // GetTransactionByBlockNumberAndIndex returns the transaction for the given block number and index. -func (s *PublicTransactionPoolAPI) GetTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) (*RPCTransaction, error) { +func (s *PublicTransactionPoolAPI) GetTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) *RPCTransaction { if block, _ := s.b.BlockByNumber(ctx, blockNr); block != nil { - return newRPCTransactionFromBlockIndex(block, uint(index)) + return newRPCTransactionFromBlockIndex(block, uint64(index)) } - return nil, nil + return nil } // GetTransactionByBlockHashAndIndex returns the transaction for the given block hash and index. -func (s *PublicTransactionPoolAPI) GetTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) (*RPCTransaction, error) { +func (s *PublicTransactionPoolAPI) GetTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) *RPCTransaction { if block, _ := s.b.GetBlock(ctx, blockHash); block != nil { - return newRPCTransactionFromBlockIndex(block, uint(index)) + return newRPCTransactionFromBlockIndex(block, uint64(index)) } - return nil, nil + return nil } // GetRawTransactionByBlockNumberAndIndex returns the bytes of the transaction for the given block number and index. -func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) (hexutil.Bytes, error) { +func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) hexutil.Bytes { if block, _ := s.b.BlockByNumber(ctx, blockNr); block != nil { - return newRPCRawTransactionFromBlockIndex(block, uint(index)) + return newRPCRawTransactionFromBlockIndex(block, uint64(index)) } - return nil, nil + return nil } // GetRawTransactionByBlockHashAndIndex returns the bytes of the transaction for the given block hash and index. -func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) (hexutil.Bytes, error) { +func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) hexutil.Bytes { if block, _ := s.b.GetBlock(ctx, blockHash); block != nil { - return newRPCRawTransactionFromBlockIndex(block, uint(index)) + return newRPCRawTransactionFromBlockIndex(block, uint64(index)) } - return nil, nil + return nil } // GetTransactionCount returns the number of transactions the given address has sent for the given block number @@ -976,90 +946,42 @@ func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, addr return (*hexutil.Uint64)(&nonce), state.Error() } -// getTransactionBlockData fetches the meta data for the given transaction from the chain database. This is useful to -// retrieve block information for a hash. It returns the block hash, block index and transaction index. -func getTransactionBlockData(chainDb ethdb.Database, txHash common.Hash) (common.Hash, uint64, uint64, error) { - var txBlock struct { - BlockHash common.Hash - BlockIndex uint64 - Index uint64 - } - - blockData, err := chainDb.Get(append(txHash.Bytes(), 0x0001)) - if err != nil { - return common.Hash{}, uint64(0), uint64(0), err - } - - reader := bytes.NewReader(blockData) - if err = rlp.Decode(reader, &txBlock); err != nil { - return common.Hash{}, uint64(0), uint64(0), err - } - - return txBlock.BlockHash, txBlock.BlockIndex, txBlock.Index, nil -} - // GetTransactionByHash returns the transaction for the given hash -func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) { - var tx *types.Transaction - var isPending bool - var err error - - if tx, isPending, err = getTransaction(s.b.ChainDb(), s.b, hash); err != nil { - log.Debug("Failed to retrieve transaction", "hash", hash, "err", err) - return nil, nil - } else if tx == nil { - return nil, nil +func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) *RPCTransaction { + // Try to return an already finalized transaction + if tx, blockHash, blockNumber, index := core.GetTransaction(s.b.ChainDb(), hash); tx != nil { + return newRPCTransaction(tx, blockHash, blockNumber, index) } - if isPending { - return newRPCPendingTransaction(tx), nil - } - - blockHash, _, _, err := getTransactionBlockData(s.b.ChainDb(), hash) - if err != nil { - log.Debug("Failed to retrieve transaction block", "hash", hash, "err", err) - return nil, nil + // No finalized transaction, try to retrieve it from the pool + if tx := s.b.GetPoolTransaction(hash); tx != nil { + return newRPCPendingTransaction(tx) } - - if block, _ := s.b.GetBlock(ctx, blockHash); block != nil { - return newRPCTransaction(block, hash) - } - return nil, nil + // Transaction unknown, return as such + return nil } // GetRawTransactionByHash returns the bytes of the transaction for the given hash. func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) { var tx *types.Transaction - var err error - if tx, _, err = getTransaction(s.b.ChainDb(), s.b, hash); err != nil { - log.Debug("Failed to retrieve transaction", "hash", hash, "err", err) - return nil, nil - } else if tx == nil { - return nil, nil + // Retrieve a finalized transaction, or a pooled otherwise + if tx, _, _, _ = core.GetTransaction(s.b.ChainDb(), hash); tx == nil { + if tx = s.b.GetPoolTransaction(hash); tx == nil { + // Transaction not found anywhere, abort + return nil, nil + } } - + // Serialize to RLP and return return rlp.EncodeToBytes(tx) } // GetTransactionReceipt returns the transaction receipt for the given transaction hash. func (s *PublicTransactionPoolAPI) GetTransactionReceipt(hash common.Hash) (map[string]interface{}, error) { - receipt := core.GetReceipt(s.b.ChainDb(), hash) - if receipt == nil { - log.Debug("Receipt not found for transaction", "hash", hash) - return nil, nil - } - - tx, _, err := getTransaction(s.b.ChainDb(), s.b, hash) - if err != nil { - log.Debug("Failed to retrieve transaction", "hash", hash, "err", err) - return nil, nil - } - - txBlock, blockIndex, index, err := getTransactionBlockData(s.b.ChainDb(), hash) - if err != nil { - log.Debug("Failed to retrieve transaction block", "hash", hash, "err", err) + tx, blockHash, blockNumber, index := core.GetTransaction(s.b.ChainDb(), hash) + if tx == nil { return nil, nil } + receipt, _, _, _ := core.GetReceipt(s.b.ChainDb(), hash) // Old receipts don't have the lookup data available var signer types.Signer = types.FrontierSigner{} if tx.Protected() { @@ -1069,8 +991,8 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceipt(hash common.Hash) (map[ fields := map[string]interface{}{ "root": hexutil.Bytes(receipt.PostState), - "blockHash": txBlock, - "blockNumber": hexutil.Uint64(blockIndex), + "blockHash": blockHash, + "blockNumber": hexutil.Uint64(blockNumber), "transactionHash": hash, "transactionIndex": hexutil.Uint64(index), "from": from, diff --git a/light/txpool.go b/light/txpool.go index 0430b280f..416148b7e 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -130,19 +130,6 @@ type txBlockData struct { Index uint64 } -// storeTxBlockData stores the block position of a mined tx in the local db -func (pool *TxPool) storeTxBlockData(txh common.Hash, tbd txBlockData) { - //fmt.Println("storeTxBlockData", txh, tbd) - data, _ := rlp.EncodeToBytes(tbd) - pool.chainDb.Put(append(txh[:], byte(1)), data) -} - -// removeTxBlockData removes the stored block position of a rolled back tx -func (pool *TxPool) removeTxBlockData(txh common.Hash) { - //fmt.Println("removeTxBlockData", txh) - pool.chainDb.Delete(append(txh[:], byte(1))) -} - // txStateChanges stores the recent changes between pending/mined states of // transactions. True means mined, false means rolled back, no entry means no change type txStateChanges map[common.Hash]bool @@ -172,59 +159,48 @@ func (txc txStateChanges) getLists() (mined []common.Hash, rollback []common.Has // checkMinedTxs checks newly added blocks for the currently pending transactions // and marks them as mined if necessary. It also stores block position in the db // and adds them to the received txStateChanges map. -func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, idx uint64, txc txStateChanges) error { - //fmt.Println("checkMinedTxs") +func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number uint64, txc txStateChanges) error { + // If no transactions are pending, we don't care about anything if len(pool.pending) == 0 { return nil } - //fmt.Println("len(pool) =", len(pool.pending)) - - block, err := GetBlock(ctx, pool.odr, hash, idx) - var receipts types.Receipts + block, err := GetBlock(ctx, pool.odr, hash, number) if err != nil { - //fmt.Println(err) return err } - //fmt.Println("len(block.Transactions()) =", len(block.Transactions())) - + // Gather all the local transaction mined in this block list := pool.mined[hash] - for i, tx := range block.Transactions() { - txHash := tx.Hash() - //fmt.Println(" txHash:", txHash) - if tx, ok := pool.pending[txHash]; ok { - //fmt.Println("TX FOUND") - if receipts == nil { - receipts, err = GetBlockReceipts(ctx, pool.odr, hash, idx) - if err != nil { - return err - } - if len(receipts) != len(block.Transactions()) { - panic(nil) // should never happen if hashes did match - } - core.SetReceiptsData(pool.config, block, receipts) - } - //fmt.Println("WriteReceipt", receipts[i].TxHash) - core.WriteReceipt(pool.chainDb, receipts[i]) - pool.storeTxBlockData(txHash, txBlockData{hash, idx, uint64(i)}) - delete(pool.pending, txHash) + for _, tx := range block.Transactions() { + if _, ok := pool.pending[tx.Hash()]; ok { list = append(list, tx) - txc.setState(txHash, true) } } + // If some transactions have been mined, write the needed data to disk and update if list != nil { + // Retrieve all the receipts belonging to this block and write the loopup table + if _, err := GetBlockReceipts(ctx, pool.odr, hash, number); err != nil { // ODR caches, ignore results + return err + } + if err := core.WriteTxLookupEntries(pool.chainDb, block); err != nil { + return err + } + // Update the transaction pool's state + for _, tx := range list { + delete(pool.pending, tx.Hash()) + txc.setState(tx.Hash(), true) + } pool.mined[hash] = list } return nil } // rollbackTxs marks the transactions contained in recently rolled back blocks -// as rolled back. It also removes block position info from the db and adds them -// to the received txStateChanges map. +// as rolled back. It also removes any positional lookup entries. func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) { if list, ok := pool.mined[hash]; ok { for _, tx := range list { txHash := tx.Hash() - pool.removeTxBlockData(txHash) + core.DeleteTxLookupEntry(pool.chainDb, txHash) pool.pending[txHash] = tx txc.setState(txHash, false) } diff --git a/miner/worker.go b/miner/worker.go index e44514755..411bc4e1b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -293,9 +293,7 @@ func (self *worker) wait() { // check if canon block and write transactions if stat == core.CanonStatTy { // This puts transactions in a extra db for rpc - core.WriteTransactions(self.chainDb, block) - // store the receipts - core.WriteReceipts(self.chainDb, work.receipts) + core.WriteTxLookupEntries(self.chainDb, block) // Write map map bloom filters core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts) // implicit by posting ChainHeadEvent |