From 8150597b3878368ad090c9846cdf50dd812b2181 Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Thu, 15 Nov 2018 13:30:50 +0800 Subject: core: refactor validator and fix light node sync (#25) Remove custom Dexon validator by adding a new `ValidateWitnessData` method into the validator interface. This allow us to properly detect know blocks. This also allow other gdex "light" client to sync compaction chain. Also, setup a standalone RPC node for handling RPC reqeusts. --- cmd/utils/flags.go | 4 ++ core/block_validator.go | 41 +++++----------- core/blockchain.go | 121 ++++++++++------------------------------------ core/blockchain_test.go | 3 -- core/chain_makers_test.go | 1 + core/tx_pool.go | 35 +++++++++++++- core/tx_pool_test.go | 34 ++++++------- core/types.go | 3 ++ core/types/block.go | 1 + dex/backend.go | 7 ++- dex/config.go | 12 +++-- dex/handler.go | 103 ++++++++++++++++++++++++--------------- eth/backend.go | 2 +- p2p/server.go | 2 +- test/run_test.sh | 26 ++++++---- 15 files changed, 189 insertions(+), 206 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 6a1d84869..611db1975 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1171,6 +1171,10 @@ func SetDexConfig(ctx *cli.Context, stack *node.Node, cfg *dex.Config) { if ctx.GlobalIsSet(NetworkIdFlag.Name) { cfg.NetworkId = ctx.GlobalUint64(NetworkIdFlag.Name) } + if ctx.GlobalIsSet(BlockProposerEnabledFlag.Name) { + cfg.BlockProposerEnabled = ctx.GlobalBool(BlockProposerEnabledFlag.Name) + } + if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheDatabaseFlag.Name) { cfg.DatabaseCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheDatabaseFlag.Name) / 100 } diff --git a/core/block_validator.go b/core/block_validator.go index 65f311f9f..09539790b 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -101,37 +101,20 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat return nil } -// BlockValidator implements Validator. -type DexonBlockValidator struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus.Engine // Consensus engine used for validating -} +func (v *BlockValidator) ValidateWitnessData(height uint64, data types.WitnessData) error { + currentBlock := v.bc.CurrentBlock() + if height > currentBlock.NumberU64() && height != 0 { + pendingBlock := v.bc.GetPendingBlockByNumber(height) -// NewDexonBlockValidator returns a new block validator which is safe for re-use -func NewDexonBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *DexonBlockValidator { - validator := &DexonBlockValidator{ - config: config, - engine: engine, - bc: blockchain, + if pendingBlock.Root() != data.Root { + return fmt.Errorf("invalid witness root %s vs %s", + pendingBlock.Root().String(), data.Root.String()) + } + if pendingBlock.ReceiptHash() != data.ReceiptHash { + return fmt.Errorf("invalid witness receipt hash %s vs %s", + pendingBlock.ReceiptHash().String(), data.ReceiptHash.String()) + } } - return validator -} - -// ValidateBody validates the given block's uncles and verifies the block -// header's transaction and uncle roots. The headers are assumed to be already -// validated at this point. -func (v *DexonBlockValidator) ValidateBody(block *types.Block) error { - // TODO(Bojie): implement it - return nil -} - -// ValidateState validates the various changes that happen after a state -// transition, such as amount of used gas, the receipt roots and the state root -// itself. ValidateState returns a database batch if the validation was a success -// otherwise nil and an error is returned. -func (v *DexonBlockValidator) ValidateState(block, parent *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { - // TODO(Bojie): implement it return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 70394f0db..43d044875 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -237,79 +237,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return bc, nil } -func NewBlockChainWithDexonValidator(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) { - if cacheConfig == nil { - cacheConfig = &CacheConfig{ - TrieNodeLimit: 256 * 1024 * 1024, - TrieTimeLimit: 5 * time.Minute, - } - } - bodyCache, _ := lru.New(bodyCacheLimit) - bodyRLPCache, _ := lru.New(bodyCacheLimit) - receiptsCache, _ := lru.New(receiptsCacheLimit) - blockCache, _ := lru.New(blockCacheLimit) - futureBlocks, _ := lru.New(maxFutureBlocks) - badBlocks, _ := lru.New(badBlockLimit) - - bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabase(db), - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - pendingBlocks: make(map[uint64]struct { - block *types.Block - receipts types.Receipts - }), - confirmedBlocks: make(map[uint32]map[coreCommon.Hash]*blockInfo), - addressNonce: make(map[uint32]map[common.Address]uint64), - addressCost: make(map[uint32]map[common.Address]*big.Int), - addressCounter: make(map[uint32]map[common.Address]uint64), - chainLastHeight: make(map[uint32]uint64), - } - bc.SetValidator(NewDexonBlockValidator(chainConfig, bc, engine)) - bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) - - var err error - bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) - if err != nil { - return nil, err - } - bc.genesisBlock = bc.GetBlockByNumber(0) - if bc.genesisBlock == nil { - return nil, ErrNoGenesis - } - if err := bc.loadLastState(); err != nil { - return nil, err - } - // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain - for hash := range BadHashes { - if header := bc.GetHeaderByHash(hash); header != nil { - // get the canonical block corresponding to the offending header's number - headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64()) - // make sure the headerByNumber (if present) is in our current canonical chain - if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { - log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) - bc.SetHead(header.Number.Uint64() - 1) - log.Error("Chain rewind was successful, resuming normal operation") - } - } - } - - // Take ownership of this particular state - go bc.update() - return bc, nil -} - func (bc *BlockChain) getProcInterrupt() bool { return atomic.LoadInt32(&bc.procInterrupt) == 1 } @@ -1600,7 +1527,15 @@ func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes return n, err } -func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { +func (bc *BlockChain) processPendingBlock( + block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { + // Pre-checks passed, start the full block imports + bc.wg.Add(1) + defer bc.wg.Done() + + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -1611,12 +1546,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes coalescedLogs []*types.Log ) - var witnessData types.WitnessData - if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil { - log.Error("Witness rlp decode failed", "error", err) - panic(err) - } - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, block.Number()), []*types.Block{block}) @@ -1626,19 +1555,18 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes } bstart := time.Now() - currentBlock := bc.CurrentBlock() - if witness.Height > currentBlock.NumberU64() && witness.Height != 0 { - if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root { - return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s", - bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String()) - } + var witnessData types.WitnessData + if err := rlp.Decode(bytes.NewReader(witness.Data), &witnessData); err != nil { + log.Error("Witness rlp decode failed", "error", err) + panic(err) + } - if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash { - return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", - bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String()) - } + if err := bc.Validator().ValidateWitnessData(witness.Height, witnessData); err != nil { + return nil, nil, nil, fmt.Errorf("valiadte witness data error: %v", err) } + currentBlock := bc.CurrentBlock() + var parentBlock *types.Block var pendingState *state.StateDB var err error @@ -1680,12 +1608,6 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } - // Validate the state using the default validator - err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) - if err != nil { - bc.reportBlock(block, receipts, err) - return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err) - } proctime := time.Since(bstart) // commit state to refresh stateCache @@ -1790,6 +1712,13 @@ func (bc *BlockChain) GetPendingBlock() *types.Block { return bc.pendingBlocks[bc.lastPendingHeight].block } +func (bc *BlockChain) GetPendingBlockByNumber(number uint64) *types.Block { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.pendingBlocks[number].block +} + func (bc *BlockChain) GetPending() (*types.Block, *state.StateDB) { block := bc.GetPendingBlock() s, err := state.New(block.Header().Root, bc.stateCache) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 786c47064..11bb1317a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1074,9 +1074,6 @@ func TestEIP155Transition(t *testing.T) { } ) - dexConf := new(params.DexconConfig) - dexConf.BlockReward = new(big.Int) - gspec.Config.Dexcon = dexConf genesis := gspec.MustCommit(db) blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil) diff --git a/core/chain_makers_test.go b/core/chain_makers_test.go index cb01ae0c9..0fb995f5f 100644 --- a/core/chain_makers_test.go +++ b/core/chain_makers_test.go @@ -148,6 +148,7 @@ func ExampleGenerateChainWithRoundChange() { }, addr1: { Balance: big.NewInt(1000000), + Staked: big.NewInt(0), }, }, } diff --git a/core/tx_pool.go b/core/tx_pool.go index 622c8ce9d..54bad9eae 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,6 +36,9 @@ import ( ) const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. blockConfirmedChanSize = 10 ) @@ -118,6 +121,7 @@ type blockChain interface { GetBlock(hash common.Hash, number uint64) *types.Block StateAt(root common.Hash) (*state.StateDB, error) + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } @@ -209,6 +213,8 @@ type TxPool struct { gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription blockConfirmedCh chan BlockConfirmedEvent blockConfirmedSub event.Subscription signer types.Signer @@ -229,12 +235,13 @@ type TxPool struct { wg sync.WaitGroup // for shutdown sync - homestead bool + homestead bool + isBlockProposer bool } // NewTxPool creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, isBlockProposer bool) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -248,8 +255,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), + isBlockProposer: isBlockProposer, } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -272,6 +281,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } // Subscribe events from blockchain pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return pool.wg.Add(1) @@ -304,8 +314,29 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + if pool.isBlockProposer { + break + } + if ev.Block != nil { + pool.mu.Lock() + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true + } + pool.reset(head.Header(), ev.Block.Header()) + head = ev.Block + + pool.mu.Unlock() + } + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return // Handle BlockConfirmedEvent case ev := <-pool.blockConfirmedCh: + if !pool.isBlockProposer { + break + } if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 2cc6c7903..4c1d78f7f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -87,7 +87,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} key, _ := crypto.GenerateKey() - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) return pool, key } @@ -197,7 +197,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { tx0 := transaction(0, 100000, key) tx1 := transaction(1, 100000, key) - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() nonce := pool.State().GetNonce(address) @@ -562,7 +562,7 @@ func TestTransactionPostponing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create two test accounts to produce different gap profiles with @@ -781,7 +781,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { config.NoLocals = nolocals config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them (last one will be the local) @@ -869,7 +869,7 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { config.Lifetime = time.Second config.NoLocals = nolocals - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create two test accounts to ensure remotes expire but locals do not @@ -1022,7 +1022,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1070,7 +1070,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { config.AccountQueue = 2 config.GlobalSlots = 8 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1102,7 +1102,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { config := testTxPoolConfig config.GlobalSlots = 1 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1147,7 +1147,7 @@ func TestTransactionPoolRepricing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1268,7 +1268,7 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create a number of test accounts and fund them @@ -1334,7 +1334,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { config.GlobalSlots = 2 config.GlobalQueue = 2 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1440,7 +1440,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { config.GlobalSlots = 128 config.GlobalQueue = 0 - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1502,7 +1502,7 @@ func TestTransactionReplacement(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Keep track of transaction events to ensure all executables get announced @@ -1601,7 +1601,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { config.Journal = journal config.Rejournal = time.Second - pool := NewTxPool(config, params.TestChainConfig, blockchain) + pool := NewTxPool(config, params.TestChainConfig, blockchain, false) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -1638,7 +1638,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pool = NewTxPool(config, params.TestChainConfig, blockchain, false) pending, queued = pool.Stats() if queued != 0 { @@ -1664,7 +1664,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool = NewTxPool(config, params.TestChainConfig, blockchain) + pool = NewTxPool(config, params.TestChainConfig, blockchain, false) pending, queued = pool.Stats() if pending != 0 { @@ -1694,7 +1694,7 @@ func TestTransactionStatusCheck(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(ethdb.NewMemDatabase())) blockchain := &testBlockChain{statedb, 1000000, new(event.Feed), new(event.Feed)} - pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) + pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain, false) defer pool.Stop() // Create the test accounts to check various transaction statuses with diff --git a/core/types.go b/core/types.go index 38b8e51dc..327031b01 100644 --- a/core/types.go +++ b/core/types.go @@ -33,6 +33,9 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block, parent *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + + // ValidateWitnessData validates the given witness result. + ValidateWitnessData(height uint64, data types.WitnessData) error } // Processor is an interface for processing blocks using a given initial state. diff --git a/core/types/block.go b/core/types/block.go index eb75dcb30..73b3ddfda 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -64,6 +64,7 @@ func (n *BlockNonce) UnmarshalText(input []byte) error { return hexutil.UnmarshalFixedText("BlockNonce", input, n[:]) } +// WitnessData represents the witness data. type WitnessData struct { Root common.Hash TxHash common.Hash diff --git a/dex/backend.go b/dex/backend.go index d7bc4630e..740b8cd6f 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -134,8 +134,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { } cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieCleanLimit: config.TrieCleanCache, TrieDirtyLimit: config.TrieDirtyCache, TrieTimeLimit: config.TrieTimeout} ) - dex.blockchain, err = core.NewBlockChainWithDexonValidator(chainDb, cacheConfig, - dex.chainConfig, dex.engine, vmConfig, nil) + dex.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, dex.chainConfig, dex.engine, vmConfig, nil) // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { @@ -148,7 +147,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain) + dex.txPool = core.NewTxPool(config.TxPool, dex.chainConfig, dex.blockchain, config.BlockProposerEnabled) dex.APIBackend = &DexAPIBackend{dex, nil} gpoParams := config.GPO @@ -166,7 +165,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, - chainDb, dex.governance) + chainDb, config.BlockProposerEnabled, dex.governance) if err != nil { return nil, err } diff --git a/dex/config.go b/dex/config.go index 2b8669fde..e78a698a9 100644 --- a/dex/config.go +++ b/dex/config.go @@ -46,10 +46,11 @@ var DefaultConfig = Config{ Blocks: 20, Percentile: 60, }, - DefaultGasPrice: big.NewInt(params.GWei), - GasFloor: 8000000, - GasCeil: 8000000, - GasLimitTolerance: 1000000, + BlockProposerEnabled: false, + DefaultGasPrice: big.NewInt(params.GWei), + GasFloor: 8000000, + GasCeil: 8000000, + GasLimitTolerance: 1000000, } func init() { @@ -106,6 +107,9 @@ type Config struct { // Gas Price Oracle options GPO gasprice.Config + // BlockProposer options + BlockProposerEnabled bool + // Enables tracking of SHA3 preimages in the VM EnablePreimageRecording bool diff --git a/dex/handler.go b/dex/handler.go index 2f8ed13fa..7bc9c297d 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -106,12 +106,11 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol - eventMux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription - metasCh chan newMetasEvent - metasSub event.Subscription - minedBlockSub *event.TypeMuxSubscription + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer @@ -132,6 +131,9 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup + + // Dexcon + isBlockProposer bool } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -140,24 +142,25 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - nodeTable: tab, - gov: gov, - blockchain: blockchain, - cache: newCache(128), - chainconfig: config, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - metasyncCh: make(chan *metasync), - quitSync: make(chan struct{}), - receiveCh: make(chan interface{}, 1024), + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + cache: newCache(128), + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + metasyncCh: make(chan *metasync), + quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), + isBlockProposer: isBlockProposer, } // Figure out whether to allow fast sync or not @@ -258,10 +261,6 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) go pm.metaBroadcastLoop() - // broadcast mined blocks - pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() - // run the peer set loop pm.chainHeadCh = make(chan core.ChainHeadEvent) pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh) @@ -306,8 +305,7 @@ func (pm *ProtocolManager) makeSelfNodeMeta() *NodeMeta { func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.chainHeadSub.Unsubscribe() // Quit the sync loop. @@ -650,6 +648,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: + // Ignore new block hash messages in block proposer mode. + if pm.isBlockProposer { + break + } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -670,6 +672,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: + // Ignore new block messages in block proposer mode. + if pm.isBlockProposer { + break + } // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { @@ -732,7 +738,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + + // Block proposer-only messages. + case msg.Code == LatticeBlockMsg: + if !pm.isBlockProposer { + break + } var block coreTypes.Block if err := msg.Decode(&block); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -740,6 +752,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.cache.addBlock(&block) pm.receiveCh <- &block case msg.Code == VoteMsg: + if !pm.isBlockProposer { + break + } var vote coreTypes.Vote if err := msg.Decode(&vote); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -749,6 +764,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &vote case msg.Code == AgreementMsg: + if !pm.isBlockProposer { + break + } // DKG set is receiver var agreement coreTypes.AgreementResult if err := msg.Decode(&agreement); err != nil { @@ -756,6 +774,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: + if !pm.isBlockProposer { + break + } // Broadcast this to all peer var randomness coreTypes.BlockRandomnessResult if err := msg.Decode(&randomness); err != nil { @@ -763,6 +784,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &randomness case msg.Code == DKGPrivateShareMsg: + if !pm.isBlockProposer { + break + } // Do not relay this msg var ps dkgTypes.PrivateShare if err := msg.Decode(&ps); err != nil { @@ -770,6 +794,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: + if !pm.isBlockProposer { + break + } // broadcast in DKG set var psig dkgTypes.PartialSignature if err := msg.Decode(&psig); err != nil { @@ -777,6 +804,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.receiveCh <- &psig case msg.Code == PullBlocksMsg: + if !pm.isBlockProposer { + break + } var hashes coreCommon.Hashes if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -789,6 +819,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } case msg.Code == PullVotesMsg: + if !pm.isBlockProposer { + break + } var pos coreTypes.Position if err := msg.Decode(&pos); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1003,17 +1036,6 @@ func (pm *ProtocolManager) BroadcastPullVotes( } } -// Mined broadcast loop -func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range pm.minedBlockSub.Chan() { - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - pm.BroadcastBlock(ev.Block, true) // First propagate block to peers - pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest - } - } -} - func (pm *ProtocolManager) txBroadcastLoop() { for { select { @@ -1053,7 +1075,10 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case <-pm.chainHeadCh: + case event := <-pm.chainHeadCh: + pm.BroadcastBlock(event.Block, true) // First propagate block to peers + pm.BroadcastBlock(event.Block, false) // Only then announce to the rest + newRound := pm.gov.LenCRS() - 1 log.Trace("new round", "round", newRound) if newRound == round { diff --git a/eth/backend.go b/eth/backend.go index 04e4569f2..a6a558823 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } - eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) + eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain, false) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, config.Whitelist); err != nil { return nil, err diff --git a/p2p/server.go b/p2p/server.go index 8cd2863b3..36b1721a5 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -33,13 +33,13 @@ import ( "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/p2p/discover" "github.com/dexon-foundation/dexon/p2p/discv5" "github.com/dexon-foundation/dexon/p2p/enode" "github.com/dexon-foundation/dexon/p2p/enr" "github.com/dexon-foundation/dexon/p2p/nat" "github.com/dexon-foundation/dexon/p2p/netutil" "github.com/dexon-foundation/dexon/rlp" - "github.com/ethereum/go-ethereum/p2p/discover" ) const ( diff --git a/test/run_test.sh b/test/run_test.sh index a7dd934bc..f535a609d 100755 --- a/test/run_test.sh +++ b/test/run_test.sh @@ -15,14 +15,20 @@ rm -f log-latest ln -s $logsdir log-latest # A standalone RPC server for accepting RPC requests. -# datadir=$PWD/Dexon.rpc -# rm -rf $datadir -# $GDEX --datadir=$datadir init genesis.json -# $GDEX --verbosity=4 --gcmode=archive --datadir=$datadir \ -# --rpc --rpcapi=eth,net,web3,debug --rpcaddr=0.0.0.0 --rpcport=8543 \ -# --ws --wsapi=eth,net,web3,debug --wsaddr=0.0.0.0 --wsport=8544 \ -# --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \ -# > $logsdir/gdex.rpc.log 2>&1 & +datadir=$PWD/Dexon.rpc +rm -rf $datadir +$GDEX --datadir=$datadir init genesis.json +$GDEX \ + --testnet \ + --verbosity=4 \ + --gcmode=archive \ + --datadir=$datadir \ + --rpc --rpcapi=eth,net,web3,debug \ + --rpcaddr=0.0.0.0 --rpcport=8545 \ + --ws --wsapi=eth,net,web3,debug \ + --wsaddr=0.0.0.0 --wsport=8546 \ + --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \ + > $logsdir/gdex.rpc.log 2>&1 & # Nodes for i in $(seq 0 3); do @@ -37,9 +43,9 @@ for i in $(seq 0 3); do --datadir=$datadir --nodekey=test$i.nodekey \ --port=$((30305 + $i)) \ --rpc --rpcapi=eth,net,web3,debug \ - --rpcaddr=0.0.0.0 --rpcport=$((8545 + $i * 2)) \ + --rpcaddr=0.0.0.0 --rpcport=$((8547 + $i * 2)) \ --ws --wsapi=eth,net,web3,debug \ - --wsaddr=0.0.0.0 --wsport=$((8546 + $i * 2)) \ + --wsaddr=0.0.0.0 --wsport=$((8548 + $i * 2)) \ --wsorigins='*' --rpcvhosts='*' --rpccorsdomain="*" \ --pprof --pprofaddr=localhost --pprofport=$((6060 + $i)) \ > $logsdir/gdex.$i.log 2>&1 & -- cgit