From a4a2343cdc1946e38da1aea1476642d1744c1354 Mon Sep 17 00:00:00 2001 From: gary rong Date: Mon, 2 Jul 2018 16:16:30 +0800 Subject: ethdb, core: implement delete for db batch (#17101) --- core/blockchain.go | 9 ++++++--- core/headerchain.go | 19 ++++++++++++------- ethdb/database.go | 10 ++++++++++ ethdb/interface.go | 8 +++++++- ethdb/memory_database.go | 9 +++++++++ light/txpool.go | 10 +++++++--- 6 files changed, 51 insertions(+), 14 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 0b50e3f37..2f1e78423 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -269,8 +269,8 @@ func (bc *BlockChain) SetHead(head uint64) error { defer bc.mu.Unlock() // Rewind the header chain, deleting all block bodies until then - delFn := func(hash common.Hash, num uint64) { - rawdb.DeleteBody(bc.db, hash, num) + delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) { + rawdb.DeleteBody(db, hash, num) } bc.hc.SetHead(head, delFn) currentHeader := bc.hc.CurrentHeader() @@ -1340,9 +1340,12 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { diff := types.TxDifference(deletedTxs, addedTxs) // When transactions get deleted from the database that means the // receipts that were created in the fork must also be deleted + batch := bc.db.NewBatch() for _, tx := range diff { - rawdb.DeleteTxLookupEntry(bc.db, tx.Hash()) + rawdb.DeleteTxLookupEntry(batch, tx.Hash()) } + batch.Write() + if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } diff --git a/core/headerchain.go b/core/headerchain.go index 6e759ed1c..da6716d67 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -156,13 +156,16 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { // Delete any canonical number assignments above the new head + batch := hc.chainDb.NewBatch() for i := number + 1; ; i++ { hash := rawdb.ReadCanonicalHash(hc.chainDb, i) if hash == (common.Hash{}) { break } - rawdb.DeleteCanonicalHash(hc.chainDb, i) + rawdb.DeleteCanonicalHash(batch, i) } + batch.Write() + // Overwrite any stale canonical number assignments var ( headHash = header.ParentHash @@ -438,7 +441,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { // DeleteCallback is a callback function that is called by SetHead before // each header is deleted. -type DeleteCallback func(common.Hash, uint64) +type DeleteCallback func(rawdb.DatabaseDeleter, common.Hash, uint64) // SetHead rewinds the local chain to a new head. Everything above the new head // will be deleted and the new one set. @@ -448,22 +451,24 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { if hdr := hc.CurrentHeader(); hdr != nil { height = hdr.Number.Uint64() } - + batch := hc.chainDb.NewBatch() for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { hash := hdr.Hash() num := hdr.Number.Uint64() if delFn != nil { - delFn(hash, num) + delFn(batch, hash, num) } - rawdb.DeleteHeader(hc.chainDb, hash, num) - rawdb.DeleteTd(hc.chainDb, hash, num) + rawdb.DeleteHeader(batch, hash, num) + rawdb.DeleteTd(batch, hash, num) hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1)) } // Roll back the canonical chain numbering for i := height; i > head; i-- { - rawdb.DeleteCanonicalHash(hc.chainDb, i) + rawdb.DeleteCanonicalHash(batch, i) } + batch.Write() + // Clear out any stale content from the caches hc.headerCache.Purge() hc.tdCache.Purge() diff --git a/ethdb/database.go b/ethdb/database.go index f4a5ce2c8..e32c912f9 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -388,6 +388,12 @@ func (b *ldbBatch) Put(key, value []byte) error { return nil } +func (b *ldbBatch) Delete(key []byte) error { + b.b.Delete(key) + b.size += 1 + return nil +} + func (b *ldbBatch) Write() error { return b.db.Write(b.b, nil) } @@ -453,6 +459,10 @@ func (tb *tableBatch) Put(key, value []byte) error { return tb.batch.Put(append([]byte(tb.prefix), key...), value) } +func (tb *tableBatch) Delete(key []byte) error { + return tb.batch.Delete(append([]byte(tb.prefix), key...)) +} + func (tb *tableBatch) Write() error { return tb.batch.Write() } diff --git a/ethdb/interface.go b/ethdb/interface.go index 537312003..af1355779 100644 --- a/ethdb/interface.go +++ b/ethdb/interface.go @@ -25,12 +25,17 @@ type Putter interface { Put(key []byte, value []byte) error } +// Deleter wraps the database delete operation supported by both batches and regular databases. +type Deleter interface { + Delete(key []byte) error +} + // Database wraps all database operations. All methods are safe for concurrent use. type Database interface { Putter + Deleter Get(key []byte) ([]byte, error) Has(key []byte) (bool, error) - Delete(key []byte) error Close() NewBatch() Batch } @@ -39,6 +44,7 @@ type Database interface { // when Write is called. Batch cannot be used concurrently. type Batch interface { Putter + Deleter ValueSize() int // amount of data in the batch Write() error // Reset resets the batch for reuse diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go index c57042920..f28ff5481 100644 --- a/ethdb/memory_database.go +++ b/ethdb/memory_database.go @@ -110,11 +110,20 @@ func (b *memBatch) Put(key, value []byte) error { return nil } +func (b *memBatch) Delete(key []byte) error { + b.writes = append(b.writes, kv{common.CopyBytes(key), nil}) + return nil +} + func (b *memBatch) Write() error { b.db.lock.Lock() defer b.db.lock.Unlock() for _, kv := range b.writes { + if kv.v == nil { + delete(b.db.db, string(kv.k)) + continue + } b.db.db[string(kv.k)] = kv.v } return nil diff --git a/light/txpool.go b/light/txpool.go index 5b4d06d90..767a797bd 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -199,15 +199,17 @@ func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number // rollbackTxs marks the transactions contained in recently rolled back blocks // as rolled back. It also removes any positional lookup entries. func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) { + batch := pool.chainDb.NewBatch() if list, ok := pool.mined[hash]; ok { for _, tx := range list { txHash := tx.Hash() - rawdb.DeleteTxLookupEntry(pool.chainDb, txHash) + rawdb.DeleteTxLookupEntry(batch, txHash) pool.pending[txHash] = tx txc.setState(txHash, false) } delete(pool.mined, hash) } + batch.Write() } // reorgOnNewHead sets a new head header, processing (and rolling back if necessary) @@ -504,14 +506,16 @@ func (self *TxPool) Content() (map[common.Address]types.Transactions, map[common func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() + var hashes []common.Hash + batch := self.chainDb.NewBatch() for _, tx := range txs { - //self.RemoveTx(tx.Hash()) hash := tx.Hash() delete(self.pending, hash) - self.chainDb.Delete(hash[:]) + batch.Delete(hash.Bytes()) hashes = append(hashes, hash) } + batch.Write() self.relay.Discard(hashes) } -- cgit