diff options
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 266 |
1 files changed, 180 insertions, 86 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 6c8a24751..3e7dfa9ee 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -22,6 +22,8 @@ import ( "fmt" "io" "math/big" + "math/rand" + "runtime" "sync" "sync/atomic" "time" @@ -671,7 +673,7 @@ func (self *BlockChain) writeHeader(header *types.Header) error { // should be done or not. The reason behind the optional check is because some // of the header retrieval mechanisms already need to verfy nonces, as well as // because nonces can be verified sparsely, not needing to check each. -func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (int, error) { +func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { self.wg.Add(1) defer self.wg.Done() @@ -683,16 +685,85 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i stats := struct{ processed, ignored int }{} start := time.Now() - // Start the parallel nonce verifier, with a fake nonce if not requested - verifier := self.pow - if !verify { - verifier = FakePow{} + // Generate the list of headers that should be POW verified + verify := make([]bool, len(chain)) + for i := 0; i < len(verify)/checkFreq; i++ { + index := i*checkFreq + rand.Intn(checkFreq) + if index >= len(verify) { + index = len(verify) - 1 + } + verify[index] = true } - nonceAbort, nonceResults := verifyNoncesFromHeaders(verifier, chain) - defer close(nonceAbort) + verify[len(verify)-1] = true // Last should always be verified to avoid junk + + // Create the header verification task queue and worker functions + tasks := make(chan int, len(chain)) + for i := 0; i < len(chain); i++ { + tasks <- i + } + close(tasks) - // Iterate over the headers, inserting any new ones - complete := make([]bool, len(chain)) + errs, failed := make([]error, len(tasks)), int32(0) + process := func(worker int) { + for index := range tasks { + header, hash := chain[index], chain[index].Hash() + + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&self.procInterrupt) == 1 { + return + } + if atomic.LoadInt32(&failed) > 0 { + return + } + // Short circuit if the header is bad or already known + if BadHashes[hash] { + errs[index] = BadHashError(hash) + atomic.AddInt32(&failed, 1) + return + } + if self.HasHeader(hash) { + continue + } + // Verify that the header honors the chain parameters + checkPow := verify[index] + + var err error + if index == 0 { + err = self.processor.ValidateHeader(header, checkPow, false) + } else { + err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false) + } + if err != nil { + errs[index] = err + atomic.AddInt32(&failed, 1) + return + } + } + } + // Start as many worker threads as goroutines allowed + pending := new(sync.WaitGroup) + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + pending.Add(1) + go func(id int) { + defer pending.Done() + process(id) + }(i) + } + pending.Wait() + + // If anything failed, report + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } + if failed > 0 { + for i, err := range errs { + if err != nil { + return i, err + } + } + } + // All headers passed verification, import them into the database for i, header := range chain { // Short circuit insertion if shutting down if atomic.LoadInt32(&self.procInterrupt) == 1 { @@ -701,24 +772,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, verify bool) (i } hash := header.Hash() - // Accumulate verification results until the next header is verified - for !complete[i] { - if res := <-nonceResults; res.valid { - complete[res.index] = true - } else { - header := chain[res.index] - return res.index, &BlockNonceErr{ - Hash: header.Hash(), - Number: new(big.Int).Set(header.Number), - Nonce: header.Nonce.Uint64(), - } - } - } - if BadHashes[hash] { - glog.V(logger.Error).Infof("bad header %d [%x…], known bad hash", header.Number, hash) - return i, BadHashError(hash) - } - // Write the header to the chain and get the status + // If the header's already known, skip it, otherwise store if self.HasHeader(hash) { stats.ignored++ continue @@ -743,76 +797,116 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain defer self.wg.Done() // Collect some import statistics to report on - stats := struct{ processed, ignored int }{} + stats := struct{ processed, ignored int32 }{} start := time.Now() - // Iterate over the blocks and receipts, inserting any new ones + // Create the block importing task queue and worker functions + tasks := make(chan int, len(blockChain)) for i := 0; i < len(blockChain) && i < len(receiptChain); i++ { - block, receipts := blockChain[i], receiptChain[i] + tasks <- i + } + close(tasks) - // Short circuit insertion if shutting down - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - break - } - // Short circuit if the owner header is unknown - if !self.HasHeader(block.Hash()) { - glog.V(logger.Debug).Infof("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) - return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) - } - // Skip if the entire data is already known - if self.HasBlock(block.Hash()) { - stats.ignored++ - continue - } - // Compute all the non-consensus fields of the receipts - transactions, logIndex := block.Transactions(), uint(0) - for j := 0; j < len(receipts); j++ { - // The transaction hash can be retrieved from the transaction itself - receipts[j].TxHash = transactions[j].Hash() - - // The contract address can be derived from the transaction itself - if MessageCreatesContract(transactions[j]) { - from, _ := transactions[j].From() - receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce()) + errs, failed := make([]error, len(tasks)), int32(0) + process := func(worker int) { + for index := range tasks { + block, receipts := blockChain[index], receiptChain[index] + + // Short circuit insertion if shutting down or processing failed + if atomic.LoadInt32(&self.procInterrupt) == 1 { + return } - // The used gas can be calculated based on previous receipts - if j == 0 { - receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed) - } else { - receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed) + if atomic.LoadInt32(&failed) > 0 { + return } - // The derived log fields can simply be set from the block and transaction - for k := 0; k < len(receipts[j].Logs); k++ { - receipts[j].Logs[k].BlockNumber = block.NumberU64() - receipts[j].Logs[k].BlockHash = block.Hash() - receipts[j].Logs[k].TxHash = receipts[j].TxHash - receipts[j].Logs[k].TxIndex = uint(j) - receipts[j].Logs[k].Index = logIndex - logIndex++ + // Short circuit if the owner header is unknown + if !self.HasHeader(block.Hash()) { + errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) + atomic.AddInt32(&failed, 1) + return } + // Skip if the entire data is already known + if self.HasBlock(block.Hash()) { + atomic.AddInt32(&stats.ignored, 1) + continue + } + // Compute all the non-consensus fields of the receipts + transactions, logIndex := block.Transactions(), uint(0) + for j := 0; j < len(receipts); j++ { + // The transaction hash can be retrieved from the transaction itself + receipts[j].TxHash = transactions[j].Hash() + + // The contract address can be derived from the transaction itself + if MessageCreatesContract(transactions[j]) { + from, _ := transactions[j].From() + receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce()) + } + // The used gas can be calculated based on previous receipts + if j == 0 { + receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed) + } else { + receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed) + } + // The derived log fields can simply be set from the block and transaction + for k := 0; k < len(receipts[j].Logs); k++ { + receipts[j].Logs[k].BlockNumber = block.NumberU64() + receipts[j].Logs[k].BlockHash = block.Hash() + receipts[j].Logs[k].TxHash = receipts[j].TxHash + receipts[j].Logs[k].TxIndex = uint(j) + receipts[j].Logs[k].Index = logIndex + logIndex++ + } + } + // Write all the data out into the database + if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil { + errs[index] = fmt.Errorf("failed to write block body: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } + if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil { + errs[index] = fmt.Errorf("failed to write block receipts: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } + atomic.AddInt32(&stats.processed, 1) } - // Write all the data out into the database - if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil { - glog.Fatalf("failed to write block body: %v", err) - return i, err - } - if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil { - glog.Fatalf("failed to write block receipts: %v", err) - return i, err - } - // Update the head fast sync block if better - self.mu.Lock() - if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(block.Hash())) < 0 { - if err := WriteHeadFastBlockHash(self.chainDb, block.Hash()); err != nil { - glog.Fatalf("failed to update head fast block hash: %v", err) + } + // Start as many worker threads as goroutines allowed + pending := new(sync.WaitGroup) + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + pending.Add(1) + go func(id int) { + defer pending.Done() + process(id) + }(i) + } + pending.Wait() + + // If anything failed, report + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } + if failed > 0 { + for i, err := range errs { + if err != nil { + return i, err } - self.currentFastBlock = block } - self.mu.Unlock() - - stats.processed++ } + // Update the head fast sync block if better + self.mu.Lock() + head := blockChain[len(errs)-1] + if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(head.Hash())) < 0 { + if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil { + glog.Fatalf("failed to update head fast block hash: %v", err) + } + self.currentFastBlock = head + } + self.mu.Unlock() + // Report some public statistics so the user has a clue what's going on first, last := blockChain[0], blockChain[len(blockChain)-1] glog.V(logger.Info).Infof("imported %d receipt(s) (%d ignored) in %v. #%d [%x… / %x…]", stats.processed, stats.ignored, |