diff options
author | Martin Holst Swende <martin@swende.se> | 2018-10-20 16:43:59 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-11-20 18:28:43 +0800 |
commit | 493903eedecaae3d14966cd99aa84d146ea0ce13 (patch) | |
tree | f90bd1c68a95cc96493bb46b9ddb24d6f11d1d50 | |
parent | 3d997b6decfaa42e37521ae20bf58886c8b2de8f (diff) | |
download | dexon-493903eedecaae3d14966cd99aa84d146ea0ce13.tar.gz dexon-493903eedecaae3d14966cd99aa84d146ea0ce13.tar.zst dexon-493903eedecaae3d14966cd99aa84d146ea0ce13.zip |
core: better side-chain importing
-rw-r--r-- | core/blockchain.go | 369 | ||||
-rw-r--r-- | node/node.go | 2 | ||||
-rw-r--r-- | p2p/discover/table.go | 2 |
3 files changed, 275 insertions, 98 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index d173b2de2..74ac30e70 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1048,6 +1048,80 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { return n, err } +// addFutureBlock checks if the block is within the max allowed window to get accepted for future processing, and +// returns an error if the block is too far ahead and was not added. +func (bc *BlockChain) addFutureBlock(block *types.Block) error { + max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) + if block.Time().Cmp(max) > 0 { + return fmt.Errorf("future block: %v > %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + return nil +} + +// importBatch is a helper function to assist during chain import +type importBatch struct { + chain types.Blocks + results <-chan error + index int + validator Validator +} + +// newBatch creates a new batch based on the given blocks, which are assumed to be a contiguous chain +func newBatch(chain types.Blocks, results <-chan error, validator Validator) *importBatch { + return &importBatch{ + chain: chain, + results: results, + index: -1, + validator: validator, + } +} + +// next returns the next block in the batch, along with any potential validation error for that block +// When the end is reached, it will return (nil, nil), but Current() will always return the last element. +func (batch *importBatch) next() (*types.Block, error) { + if batch.index+1 >= len(batch.chain) { + return nil, nil + } + batch.index++ + if err := <-batch.results; err != nil { + return batch.chain[batch.index], err + } + return batch.chain[batch.index], batch.validator.ValidateBody(batch.chain[batch.index]) +} + +// current returns the current block that's being processed. Even after the next() has progressed the entire +// chain, current will always return the last element +func (batch *importBatch) current() *types.Block { + if batch.index < 0 { + return nil + } + return batch.chain[batch.index] +} + +// previous returns the previous block was being processed, or nil +func (batch *importBatch) previous() *types.Block { + if batch.index < 1 { + return nil + } + return batch.chain[batch.index-1] +} + +// first returns the first block in the batch +func (batch *importBatch) first() *types.Block { + return batch.chain[0] +} + +// remaining returns the number of remaining blocks +func (batch *importBatch) remaining() int { + return len(batch.chain) - batch.index +} + +// processed returns the number of processed blocks +func (batch *importBatch) processed() int { + return batch.index + 1 +} + // insertChain will execute the actual chain insertion and event aggregation. The // only reason this method exists as a separate one is to make locking cleaner // with deferred statements. @@ -1067,12 +1141,27 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) } } + log.Info("insertChain", "from", chain[0].NumberU64(), "to", chain[len(chain)-1].NumberU64()) + // Pre-checks passed, start the full block imports bc.wg.Add(1) defer bc.wg.Done() - bc.chainmu.Lock() defer bc.chainmu.Unlock() + return bc.insertChainInternal(chain, true) +} + +//insertChainInternal is the internal implementation of insertChain, which assumes that +// 1. chains are contiguous, and +// 2. The `chainMu` lock is held +// This method is split out so that import batches that require re-injecting historical blocks can do +// so without releasing the lock, which could lead to racey behaviour. If a sidechain import is in progress, +// and the historic state is imported, but then new canon-head is added before the actual sidechain completes, +// then the historic state could be pruned again +func (bc *BlockChain) insertChainInternal(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex @@ -1082,6 +1171,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty events = make([]interface{}, 0, len(chain)) lastCanon *types.Block coalescedLogs []*types.Log + block *types.Block + err error ) // Start the parallel header verifier headers := make([]*types.Header, len(chain)) @@ -1089,16 +1180,57 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty for i, block := range chain { headers[i] = block.Header() - seals[i] = true + seals[i] = verifySeals } abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) - - // Iterate over the blocks and insert when the verifier permits - for i, block := range chain { + // Peek the error for the first block + batch := newBatch(chain, results, bc.Validator()) + if block, err = batch.next(); err != nil { + if err == consensus.ErrPrunedAncestor { + return bc.insertSidechainInternal(batch, err) + } else if err == consensus.ErrFutureBlock || + (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(batch.first().ParentHash())) { + + // The first block is a future block + // We can shove that one and any child blocks (that fail because of UnknownAncestor) into the future-queue + for block != nil && (batch.index == 0 || err == consensus.ErrUnknownAncestor) { + block := batch.current() + if futureError := bc.addFutureBlock(block); futureError != nil { + return batch.index, events, coalescedLogs, futureError + } + block, err = batch.next() + } + stats.queued += batch.processed() + stats.ignored += batch.remaining() + + // If there are any still remaining, mark as ignored + return batch.index, events, coalescedLogs, err + } else if err == ErrKnownBlock { + + // Block and state both already known -- there can be two explanations. + // 1. We did a roll-back, and should now do a re-import + // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot + // from the canonical chain, which has not been verified. + + // Skip all known blocks that are blocks behind us + currentNum := bc.CurrentBlock().NumberU64() + for block != nil && err == ErrKnownBlock && currentNum >= block.NumberU64() { + // We ignore these + stats.ignored++ + block, err = batch.next() + } + // Falls through to the block import + } else { + // Some other error + stats.ignored += len(batch.chain) + bc.reportBlock(block, nil, err) + return batch.index, events, coalescedLogs, err + } + } + // No validation errors + for ; block != nil && err == nil; block, err = batch.next() { // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") @@ -1107,115 +1239,45 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return i, events, coalescedLogs, ErrBlacklistedHash + return batch.index, events, coalescedLogs, ErrBlacklistedHash } - // Wait for the block's verification to complete bstart := time.Now() - - err := <-results - if err == nil { - err = bc.Validator().ValidateBody(block) - } - switch { - case err == ErrKnownBlock: - // Block and state both already known. However if the current block is below - // this number we did a rollback and we should reimport it nonetheless. - if bc.CurrentBlock().NumberU64() >= block.NumberU64() { - stats.ignored++ - continue - } - - case err == consensus.ErrFutureBlock: - // Allow up to MaxFuture second in the future blocks. If this limit is exceeded - // the chain is discarded and processed at a later time if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) - } - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrPrunedAncestor: - // Block competing with the canonical chain, store in the db, but don't process - // until the competitor TD goes above the canonical TD - currentBlock := bc.CurrentBlock() - localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) - if localTd.Cmp(externTd) > 0 { - if err = bc.WriteBlockWithoutState(block, externTd); err != nil { - return i, events, coalescedLogs, err - } - continue - } - // Competitor chain beat canonical, gather all blocks from the common ancestor - var winner []*types.Block - - parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - for !bc.HasState(parent.Root()) { - winner = append(winner, parent) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) - } - for j := 0; j < len(winner)/2; j++ { - winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] - } - // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() - _, evs, logs, err := bc.insertChain(winner) - bc.chainmu.Lock() - events, coalescedLogs = evs, logs - - if err != nil { - return i, events, coalescedLogs, err - } - - case err != nil: - bc.reportBlock(block, nil, err) - return i, events, coalescedLogs, err - } - // Create a new statedb using the parent block and report an - // error if it fails. var parent *types.Block - if i == 0 { + parent = batch.previous() + if parent == nil { parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - } else { - parent = chain[i-1] } + state, err := state.New(parent.Root(), bc.stateCache) if err != nil { - return i, events, coalescedLogs, err + return batch.index, events, coalescedLogs, err } // Process block using the parent state as reference point. receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return batch.index, events, coalescedLogs, err } // Validate the state using the default validator - err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) - if err != nil { + if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return batch.index, events, coalescedLogs, err } proctime := time.Since(bstart) // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { - return i, events, coalescedLogs, err + return batch.index, events, coalescedLogs, err } switch status { case CanonStatTy: - log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), - "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), + "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), + "elapsed", common.PrettyDuration(time.Since(bstart)), + "root", block.Root().String()) coalescedLogs = append(coalescedLogs, logs...) - blockInsertTimer.UpdateSince(bstart) events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block @@ -1223,23 +1285,138 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty bc.gcproc += proctime case SideStatTy: - log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", - common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) - - blockInsertTimer.UpdateSince(bstart) + log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), + "root", block.Root().String()) events = append(events, ChainSideEvent{block}) } + blockInsertTimer.UpdateSince(bstart) stats.processed++ stats.usedGas += usedGas cache, _ := bc.stateCache.TrieDB().Size() - stats.report(chain, i, cache) + stats.report(chain, batch.index, cache) } + + // Any blocks remaining here? If so, the only ones we need to care about are + // shoving future blocks into queue + if block != nil && err == consensus.ErrFutureBlock { + if futureErr := bc.addFutureBlock(block); futureErr != nil { + return batch.index, events, coalescedLogs, futureErr + } + for block, err = batch.next(); block != nil && err == consensus.ErrUnknownAncestor; { + if futureErr := bc.addFutureBlock(block); futureErr != nil { + return batch.index, events, coalescedLogs, futureErr + } + stats.queued++ + block, err = batch.next() + } + } + stats.ignored += batch.remaining() // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, nil + return 0, events, coalescedLogs, err +} + +// insertSidechainInternal should be called when an import batch hits upon a pruned ancestor error, which happens when +// an sidechain with a sufficiently old fork-block is found. It writes all (header-and-body-valid) blocks to disk, then +// tries to switch over to the new chain if the TD exceeded the current chain. +// It assumes that relevant locks are held already (hence 'Internal') +func (bc *BlockChain) insertSidechainInternal(batch *importBatch, err error) (int, []interface{}, []*types.Log, error) { + // If we're given a chain of blocks, and the first one is pruned, that means we're getting a + // sidechain imported. On the sidechain, we validate headers, but do not validate body and state + // (and actually import them) until the sidechain reaches a higher TD. + // Until then, we store them in the database (assuming that the header PoW check works out) + var ( + externTd *big.Int + canonHeadNumber = bc.CurrentBlock().NumberU64() + events = make([]interface{}, 0) + coalescedLogs []*types.Log + ) + // The first sidechain block error is already verified to be ErrPrunedAncestor. Since we don't import + // them here, we expect ErrUnknownAncestor for the remaining ones. Any other errors means that + // the block is invalid, and should not be written to disk. + block := batch.current() + for block != nil && (err == consensus.ErrPrunedAncestor) { + // Check the canonical stateroot for that number + if remoteNum := block.NumberU64(); canonHeadNumber >= remoteNum { + canonBlock := bc.GetBlockByNumber(remoteNum) + if canonBlock != nil && canonBlock.Root() == block.Root() { + // This is most likely a shadow-state attack. + // When a fork is imported into the database, and it eventually reaches a block height which is + // not pruned, we just found that the state already exist! This means that the sidechain block + // refers to a state which already exists in our canon chain. + // If left unchecked, we would now proceed importing the blocks, without actually having verified + // the state of the previous blocks. + log.Warn("Sidechain ghost-state attack detected", "blocknum", block.NumberU64(), + "sidechain root", block.Root(), "canon root", canonBlock.Root()) + // If someone legitimately side-mines blocks, they would still be imported as usual. However, + // we cannot risk writing unverified blocks to disk when they obviously target the pruning + // mechanism. + return batch.index, events, coalescedLogs, fmt.Errorf("sidechain ghost-state attack detected") + } + } + if externTd == nil { + externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) + } + externTd = new(big.Int).Add(externTd, block.Difficulty()) + if !bc.HasBlock(block.Hash(), block.NumberU64()) { + if err := bc.WriteBlockWithoutState(block, externTd); err != nil { + return batch.index, events, coalescedLogs, err + } + } + block, err = batch.next() + } + // At this point, we've written all sidechain blocks to database. Loop ended either on some other error, + // or all were processed. If there was some other error, we can ignore the rest of those blocks. + // + // If the externTd was larger than our local TD, we now need to reimport the previous + // blocks to regenerate the required state + currentBlock := bc.CurrentBlock() + localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + // don't process until the competitor TD goes above the canonical TD + if localTd.Cmp(externTd) > 0 { + // If we have hit a sidechain, we may have to reimport pruned blocks + log.Info("Sidechain stored", "start", batch.first().NumberU64(), "end", batch.current().NumberU64(), "sidechain TD", externTd, "local TD", localTd) + return batch.index, events, coalescedLogs, err + } + // Competitor chain beat canonical. Before we reprocess to get the common ancestor, investigate if + // any blocks in the chain are 'known bad' blocks. + for index, b := range batch.chain { + if bc.badBlocks.Contains(b.Hash()) { + log.Info("Sidechain import aborted, bad block found", "index", index, "hash", b.Hash()) + return index, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", b.NumberU64(), b.Hash()) + } + } + // gather all blocks from the common ancestor + var parents []*types.Block + // Import all the pruned blocks to make the state available + parent := bc.GetBlock(batch.first().ParentHash(), batch.first().NumberU64()-1) + for !bc.HasState(parent.Root()) { + if bc.badBlocks.Contains(parent.Hash()) { + log.Info("Sidechain parent processing aborted, bad block found", "number", parent.NumberU64(), "hash", parent.Hash()) + return 0, events, coalescedLogs, fmt.Errorf("known bad block %d 0x%x", parent.NumberU64(), parent.Hash()) + } + parents = append(parents, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(parents)/2; j++ { + parents[j], parents[len(parents)-1-j] = parents[len(parents)-1-j], parents[j] + } + // Import all the pruned blocks to make the state available + // During re-import, we can disable PoW-verification, since these are already verified + log.Info("Inserting parent blocks for reprocessing", "first", parents[0].NumberU64(), "count", len(parents), "last", parents[len(parents)-1].NumberU64) + _, evs, logs, err := bc.insertChainInternal(parents, false) + events, coalescedLogs = evs, logs + if err != nil { + return 0, events, coalescedLogs, err + } + log.Info("Inserting sidechain blocks for processing") + errindex, events, coalescedLogs, err := bc.insertChainInternal(batch.chain[0:batch.index], false) + return errindex, events, coalescedLogs, err } // insertStats tracks and reports on block insertion. diff --git a/node/node.go b/node/node.go index 85299dba7..846100839 100644 --- a/node/node.go +++ b/node/node.go @@ -287,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error { if err := handler.RegisterName(api.Namespace, api.Service); err != nil { return err } - n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace) + n.log.Debug("InProc registered", "namespace", api.Namespace) } n.inprocHandler = handler return nil diff --git a/p2p/discover/table.go b/p2p/discover/table.go index afd4c9a27..9f7f1d41b 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -434,7 +434,7 @@ func (tab *Table) loadSeedNodes() { for i := range seeds { seed := seeds[i] age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }} - log.Debug("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) + log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age) tab.add(seed) } } |