diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-11-15 13:30:50 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-03-12 12:19:09 +0800 |
commit | 8150597b3878368ad090c9846cdf50dd812b2181 (patch) | |
tree | e6990143005d574424330aeb51af6dc2a193fd06 /core | |
parent | a27488515c91066a30989b6c751d214544367a1a (diff) | |
download | dexon-8150597b3878368ad090c9846cdf50dd812b2181.tar.gz dexon-8150597b3878368ad090c9846cdf50dd812b2181.tar.zst dexon-8150597b3878368ad090c9846cdf50dd812b2181.zip |
core: refactor validator and fix light node sync (#25)
Remove custom Dexon validator by adding a new `ValidateWitnessData`
method into the validator interface. This allow us to properly detect
know blocks. This also allow other gdex "light" client to sync
compaction chain. Also, setup a standalone RPC node for handling RPC
reqeusts.
Diffstat (limited to 'core')
-rw-r--r-- | core/block_validator.go | 41 | ||||
-rw-r--r-- | core/blockchain.go | 121 | ||||
-rw-r--r-- | core/blockchain_test.go | 3 | ||||
-rw-r--r-- | core/chain_makers_test.go | 1 | ||||
-rw-r--r-- | core/tx_pool.go | 35 | ||||
-rw-r--r-- | core/tx_pool_test.go | 34 | ||||
-rw-r--r-- | core/types.go | 3 | ||||
-rw-r--r-- | core/types/block.go | 1 |
8 files changed, 92 insertions, 147 deletions
diff --git a/core/block_validator.go b/core/block_validator.go index 65f311f9f..09539790b 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -101,37 +101,20 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat return nil } -// BlockValidator implements Validator. -type DexonBlockValidator struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus.Engine // Consensus engine used for validating -} +func (v *BlockValidator) ValidateWitnessData(height uint64, data types.WitnessData) error { + currentBlock := v.bc.CurrentBlock() + if height > currentBlock.NumberU64() && height != 0 { + pendingBlock := v.bc.GetPendingBlockByNumber(height) -// NewDexonBlockValidator returns a new block validator which is safe for re-use -func NewDexonBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *DexonBlockValidator { - validator := &DexonBlockValidator{ - config: config, - engine: engine, - bc: blockchain, + if pendingBlock.Root() != data.Root { + return fmt.Errorf("invalid witness root %s vs %s", + pendingBlock.Root().String(), data.Root.String()) + } + if pendingBlock.ReceiptHash() != data.ReceiptHash { + return fmt.Errorf("invalid witness receipt hash %s vs %s", + pendingBlock.ReceiptHash().String(), data.ReceiptHash.String()) + } } - return validator -} - -// ValidateBody validates the given block's uncles and verifies the block -// header's transaction and uncle roots. The headers are assumed to be already -// validated at this point. -func (v *DexonBlockValidator) ValidateBody(block *types.Block) error { - // TODO(Bojie): implement it - return nil -} - -// ValidateState validates the various changes that happen after a state -// transition, such as amount of used gas, the receipt roots and the state root -// itself. ValidateState returns a database batch if the validation was a success -// otherwise nil and an error is returned. -func (v *DexonBlockValidator) ValidateState(block, parent *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { - // TODO(Bojie): implement it return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 70394f0db..43d044875 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -237,79 +237,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return bc, nil } -func NewBlockChainWithDexonValidator(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) { - if cacheConfig == nil { - cacheConfig = &CacheConfig{ - TrieNodeLimit: 256 * 1024 * 1024, - TrieTimeLimit: 5 * time.Minute, - } - } - bodyCache, _ := lru.New(bodyCacheLimit) - bodyRLPCache, _ := lru.New(bodyCacheLimit) - receiptsCache, _ := lru.New(receiptsCacheLimit) - blockCache, _ := lru.New(blockCacheLimit) - futureBlocks, _ := lru.New(maxFutureBlocks) - badBlocks, _ := lru.New(badBlockLimit) - - bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabase(db), - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - pendingBlocks: make(map[uint64]struct { - block *types.Block - receipts types.Receipts - }), - confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo), - addressNonce: make(map[uint32]map[common.Address]uint64), - addressCost: make(map[uint32]map[common.Address]*big.Int), - addressCounter: make(map[uint32]map[common.Address]uint64), - chainLastHeight: make(map[uint32]uint64), - } - bc.SetValidator(NewDexonBlockValidator(chainConfig, bc, engine)) - bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) - - var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) - if err != nil { - return nil, err - } - bc.genesisBlock = bc.GetBlockByNumber(0) - if bc.genesisBlock == nil { - return nil, ErrNoGenesis - } - if err := bc.loadLastState(); err != nil { - return nil, err - } - // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain - for hash := range BadHashes { - if header := bc.GetHeaderByHash(hash); header != nil { - // get the canonical block corresponding to the offending header's number - headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64()) - // make sure the headerByNumber (if present) is in our current canonical chain - if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { - log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) - bc.SetHead(header.Number.Uint64() - 1) - log.Error("Chain rewind was successful, resuming normal operation") - } - } - } - - // Take ownership of this particular state - go bc.update() - return bc, nil -} - func (bc *BlockChain) getProcInterrupt() bool { return atomic.LoadInt32(&bc.procInterrupt) == 1 } @@ -1600,7 +1527,15 @@ func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes return n, err } -func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { +func (bc *BlockChain) processPendingBlock( + block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { + // Pre-checks passed, start the full block imports + bc.wg.Add(1) + defer bc.wg.Done() + + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -1611,12 +1546,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes coalescedLogs []*types.Log ) - var witnessData types.WitnessData - if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil { - log.Error("Witness rlp decode failed", "error", err) - panic(err) - } - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, block.Number()), []*types.Block{block}) @@ -1626,19 +1555,18 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes } bstart := time.Now() - currentBlock := bc.CurrentBlock() - if witness.Height > currentBlock.NumberU64() && witness.Height != 0 { - if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root { - return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s", - bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String()) - } + var witnessData types.WitnessData + if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil { + log.Error("Witness rlp decode failed", "error", err) + panic(err) + } - if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash { - return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", - bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String()) - } + if err := bc.Validator().ValidateWitnessData(witness.Height, witnessData); err != nil { + return nil, nil, nil, fmt.Errorf("valiadte witness data error: %v", err) } + currentBlock := bc.CurrentBlock() + var parentBlock *types.Block var pendingState *state.StateDB var err error @@ -1680,12 +1608,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } - // Validate the state using the default validator - err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) - if err != nil { - bc.reportBlock(block, receipts, err) - return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err) - } proctime := time.Since(bstart) // commit state to refresh stateCache @@ -1790,6 +1712,13 @@ func (bc *BlockChain) GetPendingBlock() *types.Block { return bc.pendingBlocks[bc.lastPendingHeight].block } +func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.pendingBlocks[number].block +} + func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) { block := bc.GetPendingBlock() s, err := state.New(block.Header().Root, bc.stateCache) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 786c47064..11bb1317a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1074,9 +1074,6 @@ func TestEIP155Transition(t *testing.T) { } ) - dexConf := new(params.DexconConfig) - dexConf.BlockReward = new(big.Int) - gspec.Config.Dexcon = dexConf genesis := gspec.MustCommit(db) blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index cb01ae0c9..0fb995f5f 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -148,6 +148,7 @@ func ExampleGenerateChainWithRoundChange() { }, addr1: { Balance: big.NewInt(1000000), + Staked: big.NewInt(0), }, }, } diff --git a/core/tx_pool.go b/core/tx_pool.go index 622c8ce9d..54bad9eae 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,6 +36,9 @@ import ( ) const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. blockConfirmedChanSize = 10 ) @@ -118,6 +121,7 @@ type blockChain interface { GetBlock(hash common.Hash, number uint64) *types.Block StateAt(root common.Hash) (*state.StateDB, error) + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } @@ -209,6 +213,8 @@ type TxPool struct { gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription blockConfirmedCh chan BlockConfirmedEvent blockConfirmedSub event.Subscription signer types.Signer @@ -229,12 +235,13 @@ type TxPool struct { wg sync.WaitGroup // for shutdown sync - homestead bool + homestead bool + isBlockProposer bool } // NewTxPool creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, isBlockProposer bool) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -248,8 +255,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), + isBlockProposer: isBlockProposer, } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -272,6 +281,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } // Subscribe events from blockchain pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return pool.wg.Add(1) @@ -304,8 +314,29 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + if pool.isBlockProposer { + break + } + if ev.Block != nil { + pool.mu.Lock() + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true + } + pool.reset(head.Header(), ev.Block.Header()) + head = ev.Block + + pool.mu.Unlock() + } + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return // Handle BlockConfirmedEvent case ev := <-pool.blockConfirmedCh: + if !pool.isBlockProposer { + break + } if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 2cc6c7903..4c1d78f7f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -87,7 +87,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) return pool, key } @@ -197,7 +197,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { tx0 := transaction(0, 100000, key) tx1 := transaction(1, 100000, key) - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -562,7 +562,7 @@ func TestTransactionPostponing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create two test accounts to produce different gap profiles with @@ -781,7 +781,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) @@ -869,7 +869,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not @@ -1022,7 +1022,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1070,7 +1070,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1102,7 +1102,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = 1 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1147,7 +1147,7 @@ func TestTransactionPoolRepricing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1268,7 +1268,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1334,7 +1334,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1440,7 +1440,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { config.GlobalSlots = 128 config.GlobalQueue = 0 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1502,7 +1502,7 @@ func TestTransactionReplacement(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1601,7 +1601,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -1638,7 +1638,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pool = NewTxPool(config, params.TestChainConfig, blockchain, false) pending, queued = pool.Stats() if queued != 0 { @@ -1664,7 +1664,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pool = NewTxPool(config, params.TestChainConfig, blockchain, false) pending, queued = pool.Stats() if pending != 0 { @@ -1694,7 +1694,7 @@ func TestTransactionStatusCheck(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create the test accounts to check various transaction statuses with diff --git a/core/types.go b/core/types.go index 38b8e51dc..327031b01 100644 --- a/core/types.go +++ b/core/types.go @@ -33,6 +33,9 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + + // ValidateWitnessData validates the given witness result. + ValidateWitnessData(height uint64, data types.WitnessData) error } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/block.go b/core/types/block.go index eb75dcb30..73b3ddfda 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -64,6 +64,7 @@ func (n *BlockNonce) UnmarshalText(input []byte) error { return hexutil.UnmarshalFixedText("BlockNonce", input, n[:]) } +// WitnessData represents the witness data. type WitnessData struct { Root common.Hash TxHash common.Hash |