diff options
-rw-r--r-- | core/blockchain.go | 7 | ||||
-rw-r--r-- | core/state/statedb.go | 51 | ||||
-rw-r--r-- | eth/api.go | 6 | ||||
-rw-r--r-- | eth/api_backend.go | 2 | ||||
-rw-r--r-- | ethdb/database.go | 2 | ||||
-rw-r--r-- | internal/ethapi/api.go | 2 | ||||
-rw-r--r-- | miner/worker.go | 2 | ||||
-rw-r--r-- | trie/secure_trie.go | 33 | ||||
-rw-r--r-- | trie/secure_trie_test.go | 71 |
9 files changed, 151 insertions, 25 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index a5f146a2d..1fbcdfc6f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -357,7 +357,12 @@ func (self *BlockChain) AuxValidator() pow.PoW { return self.pow } // State returns a new mutable state based on the current HEAD block. func (self *BlockChain) State() (*state.StateDB, error) { - return state.New(self.CurrentBlock().Root(), self.chainDb) + return self.StateAt(self.CurrentBlock().Root()) +} + +// StateAt returns a new mutable state based on a particular point in time. +func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { + return self.stateCache.New(root) } // Reset purges the entire blockchain, restoring it to its genesis state. diff --git a/core/state/statedb.go b/core/state/statedb.go index 802f37ba0..5c51e3b59 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -20,6 +20,7 @@ package state import ( "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/vm" @@ -66,6 +67,8 @@ type StateDB struct { txIndex int logs map[common.Hash]vm.Logs logSize uint + + lock sync.Mutex } // Create a new state from a given trie @@ -86,32 +89,53 @@ func New(root common.Hash, db ethdb.Database) (*StateDB, error) { }, nil } -// Reset clears out all emphemeral state objects from the state db, but keeps -// the underlying state trie to avoid reloading data for the next operations. -func (self *StateDB) Reset(root common.Hash) error { +// New creates a new statedb by reusing any journalled tries to avoid costly +// disk io. +func (self *StateDB) New(root common.Hash) (*StateDB, error) { + self.lock.Lock() + defer self.lock.Unlock() + tr, err := self.openTrie(root) if err != nil { - return err + return nil, err } - *self = StateDB{ + return &StateDB{ db: self.db, trie: tr, - pastTries: self.pastTries, codeSizeCache: self.codeSizeCache, stateObjects: make(map[common.Address]*StateObject), stateObjectsDirty: make(map[common.Address]struct{}), refund: new(big.Int), logs: make(map[common.Hash]vm.Logs), + }, nil +} + +// Reset clears out all emphemeral state objects from the state db, but keeps +// the underlying state trie to avoid reloading data for the next operations. +func (self *StateDB) Reset(root common.Hash) error { + self.lock.Lock() + defer self.lock.Unlock() + + tr, err := self.openTrie(root) + if err != nil { + return err } + self.trie = tr + self.stateObjects = make(map[common.Address]*StateObject) + self.stateObjectsDirty = make(map[common.Address]struct{}) + self.refund = new(big.Int) + self.thash = common.Hash{} + self.bhash = common.Hash{} + self.txIndex = 0 + self.logs = make(map[common.Hash]vm.Logs) + self.logSize = 0 + return nil } // openTrie creates a trie. It uses an existing trie if one is available // from the journal if available. func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) { - if self.trie != nil && self.trie.Hash() == root { - return self.trie, nil - } for i := len(self.pastTries) - 1; i >= 0; i-- { if self.pastTries[i].Hash() == root { tr := *self.pastTries[i] @@ -122,6 +146,9 @@ func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) { } func (self *StateDB) pushTrie(t *trie.SecureTrie) { + self.lock.Lock() + defer self.lock.Unlock() + if len(self.pastTries) >= maxJournalLength { copy(self.pastTries, self.pastTries[1:]) self.pastTries[len(self.pastTries)-1] = t @@ -381,6 +408,9 @@ func (self *StateDB) CreateAccount(addr common.Address) vm.Account { // func (self *StateDB) Copy() *StateDB { + self.lock.Lock() + defer self.lock.Unlock() + // Copy all the basic fields, initialize the memory ones state := &StateDB{ db: self.db, @@ -406,6 +436,9 @@ func (self *StateDB) Copy() *StateDB { } func (self *StateDB) Set(state *StateDB) { + self.lock.Lock() + defer self.lock.Unlock() + self.db = state.db self.trie = state.trie self.pastTries = state.pastTries diff --git a/eth/api.go b/eth/api.go index d6c0826ed..c2fdbe99c 100644 --- a/eth/api.go +++ b/eth/api.go @@ -293,7 +293,7 @@ func (api *PublicDebugAPI) DumpBlock(number uint64) (state.Dump, error) { if block == nil { return state.Dump{}, fmt.Errorf("block #%d not found", number) } - stateDb, err := state.New(block.Root(), api.eth.ChainDb()) + stateDb, err := api.eth.BlockChain().StateAt(block.Root()) if err != nil { return state.Dump{}, err } @@ -406,7 +406,7 @@ func (api *PrivateDebugAPI) traceBlock(block *types.Block, logConfig *vm.LogConf if err := core.ValidateHeader(api.config, blockchain.AuxValidator(), block.Header(), blockchain.GetHeader(block.ParentHash(), block.NumberU64()-1), true, false); err != nil { return false, structLogger.StructLogs(), err } - statedb, err := state.New(blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1).Root(), api.eth.ChainDb()) + statedb, err := blockchain.StateAt(blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1).Root()) if err != nil { return false, structLogger.StructLogs(), err } @@ -501,7 +501,7 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common. if parent == nil { return nil, fmt.Errorf("block parent %x not found", block.ParentHash()) } - stateDb, err := state.New(parent.Root(), api.eth.ChainDb()) + stateDb, err := api.eth.BlockChain().StateAt(parent.Root()) if err != nil { return nil, err } diff --git a/eth/api_backend.go b/eth/api_backend.go index 4f8f06529..4adeb0aa0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -81,7 +81,7 @@ func (b *EthApiBackend) StateAndHeaderByNumber(blockNr rpc.BlockNumber) (ethapi. if header == nil { return nil, nil, nil } - stateDb, err := state.New(header.Root, b.eth.chainDb) + stateDb, err := b.eth.BlockChain().StateAt(header.Root) return EthApiState{stateDb}, header, err } diff --git a/ethdb/database.go b/ethdb/database.go index f93731cfe..a4a27303a 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -84,6 +85,7 @@ func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) { OpenFilesCacheCapacity: handles, BlockCacheCapacity: cache / 2 * opt.MiB, WriteBuffer: cache / 4 * opt.MiB, // Two of these are used internally + Filter: filter.NewBloomFilter(10), }) if _, corrupted := err.(*errors.ErrCorrupted); corrupted { db, err = leveldb.RecoverFile(file, nil) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 6480085dd..9a97be25f 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -454,6 +454,8 @@ type CallArgs struct { } func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (string, *big.Int, error) { + defer func(start time.Time) { glog.V(logger.Debug).Infof("call took %v", time.Since(start)) }(time.Now()) + state, header, err := s.b.StateAndHeaderByNumber(blockNr) if state == nil || err != nil { return "0x", common.Big0, err diff --git a/miner/worker.go b/miner/worker.go index 1676036d8..ac1ef5ba3 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -361,7 +361,7 @@ func (self *worker) push(work *Work) { // makeCurrent creates a new environment for the current cycle. func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error { - state, err := state.New(parent.Root(), self.eth.ChainDb()) + state, err := self.chain.StateAt(parent.Root()) if err != nil { return err } diff --git a/trie/secure_trie.go b/trie/secure_trie.go index efe875bc8..2a8b57214 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -24,6 +24,8 @@ import ( var secureKeyPrefix = []byte("secure-key-") +const secureKeyLength = 11 + 32 // Length of the above prefix + 32byte hash + // SecureTrie wraps a trie with key hashing. In a secure trie, all // access operations hash the key using keccak256. This prevents // calling code from creating long chains of nodes that @@ -35,10 +37,11 @@ var secureKeyPrefix = []byte("secure-key-") // // SecureTrie is not safe for concurrent use. type SecureTrie struct { - trie Trie - hashKeyBuf []byte - secKeyBuf [200]byte - secKeyCache map[string][]byte + trie Trie + hashKeyBuf [secureKeyLength]byte + secKeyBuf [200]byte + secKeyCache map[string][]byte + secKeyCacheOwner *SecureTrie // Pointer to self, replace the key cache on mismatch } // NewSecure creates a trie with an existing root node from db. @@ -56,8 +59,7 @@ func NewSecure(root common.Hash, db Database) (*SecureTrie, error) { return nil, err } return &SecureTrie{ - trie: *trie, - secKeyCache: make(map[string][]byte), + trie: *trie, }, nil } @@ -104,7 +106,7 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error { if err != nil { return err } - t.secKeyCache[string(hk)] = common.CopyBytes(key) + t.getSecKeyCache()[string(hk)] = common.CopyBytes(key) return nil } @@ -119,14 +121,14 @@ func (t *SecureTrie) Delete(key []byte) { // If a node was not found in the database, a MissingNodeError is returned. func (t *SecureTrie) TryDelete(key []byte) error { hk := t.hashKey(key) - delete(t.secKeyCache, string(hk)) + delete(t.getSecKeyCache(), string(hk)) return t.trie.TryDelete(hk) } // GetKey returns the sha3 preimage of a hashed key that was // previously used to store a value. func (t *SecureTrie) GetKey(shaKey []byte) []byte { - if key, ok := t.secKeyCache[string(shaKey)]; ok { + if key, ok := t.getSecKeyCache()[string(shaKey)]; ok { return key } key, _ := t.trie.db.Get(t.secKey(shaKey)) @@ -165,7 +167,7 @@ func (t *SecureTrie) NodeIterator() *NodeIterator { // the trie's database. Calling code must ensure that the changes made to db are // written back to the trie's attached database before using the trie. func (t *SecureTrie) CommitTo(db DatabaseWriter) (root common.Hash, err error) { - if len(t.secKeyCache) > 0 { + if len(t.getSecKeyCache()) > 0 { for hk, key := range t.secKeyCache { if err := db.Put(t.secKey([]byte(hk)), key); err != nil { return common.Hash{}, err @@ -196,3 +198,14 @@ func (t *SecureTrie) hashKey(key []byte) []byte { returnHasherToPool(h) return buf } + +// getSecKeyCache returns the current secure key cache, creating a new one if +// ownership changed (i.e. the current secure trie is a copy of another owning +// the actual cache). +func (t *SecureTrie) getSecKeyCache() map[string][]byte { + if t != t.secKeyCacheOwner { + t.secKeyCacheOwner = t + t.secKeyCache = make(map[string][]byte) + } + return t.secKeyCache +} diff --git a/trie/secure_trie_test.go b/trie/secure_trie_test.go index 0be5b3d15..3171b8c31 100644 --- a/trie/secure_trie_test.go +++ b/trie/secure_trie_test.go @@ -18,6 +18,8 @@ package trie import ( "bytes" + "runtime" + "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -31,6 +33,37 @@ func newEmptySecure() *SecureTrie { return trie } +// makeTestSecureTrie creates a large enough secure trie for testing. +func makeTestSecureTrie() (ethdb.Database, *SecureTrie, map[string][]byte) { + // Create an empty trie + db, _ := ethdb.NewMemDatabase() + trie, _ := NewSecure(common.Hash{}, db) + + // Fill it with some arbitrary data + content := make(map[string][]byte) + for i := byte(0); i < 255; i++ { + // Map the same data under multiple keys + key, val := common.LeftPadBytes([]byte{1, i}, 32), []byte{i} + content[string(key)] = val + trie.Update(key, val) + + key, val = common.LeftPadBytes([]byte{2, i}, 32), []byte{i} + content[string(key)] = val + trie.Update(key, val) + + // Add some other data to inflate th trie + for j := byte(3); j < 13; j++ { + key, val = common.LeftPadBytes([]byte{j, i}, 32), []byte{j, i} + content[string(key)] = val + trie.Update(key, val) + } + } + trie.Commit() + + // Return the generated trie + return db, trie, content +} + func TestSecureDelete(t *testing.T) { trie := newEmptySecure() vals := []struct{ k, v string }{ @@ -72,3 +105,41 @@ func TestSecureGetKey(t *testing.T) { t.Errorf("GetKey returned %q, want %q", k, key) } } + +func TestSecureTrieConcurrency(t *testing.T) { + // Create an initial trie and copy if for concurrent access + _, trie, _ := makeTestSecureTrie() + + threads := runtime.NumCPU() + tries := make([]*SecureTrie, threads) + for i := 0; i < threads; i++ { + cpy := *trie + tries[i] = &cpy + } + // Start a batch of goroutines interactng with the trie + pend := new(sync.WaitGroup) + pend.Add(threads) + for i := 0; i < threads; i++ { + go func(index int) { + defer pend.Done() + + for j := byte(0); j < 255; j++ { + // Map the same data under multiple keys + key, val := common.LeftPadBytes([]byte{byte(index), 1, j}, 32), []byte{j} + tries[index].Update(key, val) + + key, val = common.LeftPadBytes([]byte{byte(index), 2, j}, 32), []byte{j} + tries[index].Update(key, val) + + // Add some other data to inflate the trie + for k := byte(3); k < 13; k++ { + key, val = common.LeftPadBytes([]byte{byte(index), k, j}, 32), []byte{k, j} + tries[index].Update(key, val) + } + } + tries[index].Commit() + }(i) + } + // Wait for all threads to finish + pend.Wait() +} |