diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-30 06:52:25 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-04-30 06:52:25 +0800 |
commit | 3fef60190384106af390dd23a65384b9cc6e4a28 (patch) | |
tree | 49f52b8bba6af0228bb20758dd1465f8dce6842d | |
parent | 764e81bf12bc45b00cec7db216e72d6396cf0c13 (diff) | |
parent | 30b921ef463247da63dece1cb81887f7e66668ff (diff) | |
download | go-tangerine-3fef60190384106af390dd23a65384b9cc6e4a28.tar.gz go-tangerine-3fef60190384106af390dd23a65384b9cc6e4a28.tar.zst go-tangerine-3fef60190384106af390dd23a65384b9cc6e4a28.zip |
Merge pull request #830 from obscuren/downloader-missing-parent
eth/downloader: missing parent improvement
-rw-r--r-- | cmd/geth/main.go | 2 | ||||
-rw-r--r-- | cmd/utils/cmd.go | 4 | ||||
-rw-r--r-- | core/chain_makers.go | 2 | ||||
-rw-r--r-- | core/chain_manager.go | 10 | ||||
-rw-r--r-- | core/chain_manager_test.go | 37 | ||||
-rw-r--r-- | core/transaction_pool.go | 23 | ||||
-rw-r--r-- | core/transaction_pool_test.go | 27 | ||||
-rw-r--r-- | core/types/block.go | 2 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 21 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 4 | ||||
-rw-r--r-- | eth/handler.go | 6 | ||||
-rw-r--r-- | miner/worker.go | 2 | ||||
-rw-r--r-- | p2p/message.go | 7 | ||||
-rw-r--r-- | p2p/peer.go | 1 | ||||
-rw-r--r-- | tests/block_test_util.go | 2 |
15 files changed, 117 insertions, 33 deletions
diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 365390a07..ef007051c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -47,7 +47,7 @@ import _ "net/http/pprof" const ( ClientIdentifier = "Geth" - Version = "0.9.13" + Version = "0.9.14" ) var ( diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index 64faf6ad1..cbb2d42aa 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -172,7 +172,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error { n++ if n == batchSize { - if err := chainmgr.InsertChain(blocks); err != nil { + if _, err := chainmgr.InsertChain(blocks); err != nil { return fmt.Errorf("invalid block %v", err) } n = 0 @@ -181,7 +181,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error { } if n > 0 { - if err := chainmgr.InsertChain(blocks[:n]); err != nil { + if _, err := chainmgr.InsertChain(blocks[:n]); err != nil { return fmt.Errorf("invalid block %v", err) } } diff --git a/core/chain_makers.go b/core/chain_makers.go index 4512a5493..73c2205f4 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -141,6 +141,6 @@ func newCanonical(n int, db common.Database) (*BlockProcessor, error) { return bman, nil } lchain := makeChain(bman, parent, n, db, CanonicalSeed) - err := bman.bc.InsertChain(lchain) + _, err := bman.bc.InsertChain(lchain) return bman, err } diff --git a/core/chain_manager.go b/core/chain_manager.go index 32ad4a2ba..4fdb2edce 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -497,7 +497,9 @@ func (self *ChainManager) procFutureBlocks() { self.InsertChain(blocks) } -func (self *ChainManager) InsertChain(chain types.Blocks) error { +// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned +// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). +func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. var ( queue = make([]interface{}, len(chain)) @@ -540,7 +542,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { glog.V(logger.Error).Infoln(err) glog.V(logger.Debug).Infoln(block) - return err + return i, err } block.Td = new(big.Int).Set(CalculateTD(block, self.GetBlock(block.ParentHash()))) @@ -591,7 +593,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } } else { if glog.V(logger.Detail) { - glog.Infof("inserted forked block #%d (%d TXs %d UNCs) (%x...)\n", block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) + glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4]) } queue[i] = ChainSideEvent{block, logs} @@ -613,7 +615,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { go self.eventMux.Post(queueEvent) - return nil + return 0, nil } // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index a88afd7c8..50915459b 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -9,6 +9,7 @@ import ( "strconv" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -44,7 +45,7 @@ func testFork(t *testing.T, bman *BlockProcessor, i, N int, f func(td1, td2 *big // extend the fork parent := bman2.bc.CurrentBlock() chainB := makeChain(bman2, parent, N, db, ForkSeed) - err = bman2.bc.InsertChain(chainB) + _, err = bman2.bc.InsertChain(chainB) if err != nil { t.Fatal("Insert chain error for fork:", err) } @@ -108,7 +109,7 @@ func loadChain(fn string, t *testing.T) (types.Blocks, error) { } func insertChain(done chan bool, chainMan *ChainManager, chain types.Blocks, t *testing.T) { - err := chainMan.InsertChain(chain) + _, err := chainMan.InsertChain(chain) if err != nil { fmt.Println(err) t.FailNow() @@ -369,11 +370,8 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block return chain } -func TestReorg(t *testing.T) { - db, _ := ethdb.NewMemDatabase() +func chm(genesis *types.Block, db common.Database) *ChainManager { var eventMux event.TypeMux - - genesis := GenesisBlock(db) bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux} bc.cache = NewBlockCache(100) bc.futureBlocks = NewBlockCache(100) @@ -381,6 +379,14 @@ func TestReorg(t *testing.T) { bc.ResetWithGenesisBlock(genesis) bc.txState = state.ManageState(bc.State()) + return bc +} + +func TestReorgLongest(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + genesis := GenesisBlock(db) + bc := chm(genesis, db) + chain1 := makeChainWithDiff(genesis, []int{1, 2, 4}, 10) chain2 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11) @@ -394,3 +400,22 @@ func TestReorg(t *testing.T) { } } } + +func TestReorgShortest(t *testing.T) { + db, _ := ethdb.NewMemDatabase() + genesis := GenesisBlock(db) + bc := chm(genesis, db) + + chain1 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 10) + chain2 := makeChainWithDiff(genesis, []int{1, 10}, 11) + + bc.InsertChain(chain1) + bc.InsertChain(chain2) + + prev := bc.CurrentBlock() + for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) { + if prev.ParentHash() != block.Hash() { + t.Errorf("parent hash mismatch %x - %x", prev.ParentHash(), block.Hash()) + } + } +} diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 8543aa017..22a804e1d 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -306,6 +306,27 @@ func (pool *TxPool) checkQueue() { } } +func (pool *TxPool) removeTx(hash common.Hash) { + // delete from pending pool + delete(pool.txs, hash) + + // delete from queue +out: + for address, txs := range pool.queue { + for i, tx := range txs { + if tx.Hash() == hash { + if len(txs) == 1 { + // if only one tx, remove entire address entry + delete(pool.queue, address) + } else { + pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) + } + break out + } + } + } +} + func (pool *TxPool) validatePool() { pool.mu.Lock() defer pool.mu.Unlock() @@ -316,7 +337,7 @@ func (pool *TxPool) validatePool() { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - delete(pool.txs, hash) + pool.removeTx(hash) } } } diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go index 4d66776f0..49224be5b 100644 --- a/core/transaction_pool_test.go +++ b/core/transaction_pool_test.go @@ -111,3 +111,30 @@ func TestTransactionQueue(t *testing.T) { t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) } } + +func TestRemoveTx(t *testing.T) { + pool, key := setupTxPool() + tx := transaction() + tx.SignECDSA(key) + from, _ := tx.From() + pool.currentState().AddBalance(from, big.NewInt(1)) + pool.queueTx(tx) + pool.addTx(tx) + if len(pool.queue) != 1 { + t.Error("expected queue to be 1, got", len(pool.queue)) + } + + if len(pool.txs) != 1 { + t.Error("expected txs to be 1, got", len(pool.txs)) + } + + pool.removeTx(tx.Hash()) + + if len(pool.queue) > 0 { + t.Error("expected queue to be 0, got", len(pool.queue)) + } + + if len(pool.txs) > 0 { + t.Error("expected txs to be 0, got", len(pool.txs)) + } +} diff --git a/core/types/block.go b/core/types/block.go index 19cf49c12..c93452fa7 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -99,6 +99,8 @@ type Block struct { Td *big.Int queued bool // flag for blockpool to skip TD check + ReceivedAt time.Time + receipts Receipts } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a3917854f..4cd927fd5 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -37,7 +37,7 @@ var ( ) type hashCheckFn func(common.Hash) bool -type chainInsertFn func(types.Blocks) error +type chainInsertFn func(types.Blocks) (int, error) type hashIterFn func() (common.Hash, error) type blockPack struct { @@ -418,27 +418,30 @@ func (d *Downloader) process(peer *peer) error { // link). We should at least check whihc queue match. This code could move // to a seperate goroutine where it periodically checks for linked pieces. types.BlockBy(types.Number).Sort(d.queue.blocks) - blocks := d.queue.blocks - if len(blocks) == 0 { + if len(d.queue.blocks) == 0 { return nil } + var ( + blocks = d.queue.blocks + err error + ) glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number()) - var err error // Loop untill we're out of blocks for len(blocks) != 0 { max := int(math.Min(float64(len(blocks)), 256)) // TODO check for parent error. When there's a parent error we should stop // processing and start requesting the `block.hash` so that it's parent and // grandparents can be requested and queued. - err = d.insertChain(blocks[:max]) + var i int + i, err = d.insertChain(blocks[:max]) if err != nil && core.IsParentErr(err) { - glog.V(logger.Debug).Infoln("Aborting process due to missing parent.") + // Ignore the missing blocks. Handler should take care of anything that's missing. + glog.V(logger.Debug).Infof("Ignored block with missing parent (%d)\n", i) + blocks = blocks[i+1:] - // XXX this needs a lot of attention - blocks = nil - break + continue } else if err != nil { // immediatly unregister the false peer but do not disconnect d.UnregisterPeer(d.activePeer) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8843ca0c7..5518163ca 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -62,10 +62,10 @@ func (dl *downloadTester) hasBlock(hash common.Hash) bool { return false } -func (dl *downloadTester) insertChain(blocks types.Blocks) error { +func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { dl.insertedBlocks += len(blocks) - return nil + return 0, nil } func (dl *downloadTester) getHashes(hash common.Hash) error { diff --git a/eth/handler.go b/eth/handler.go index 61149049e..fecd71632 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -346,6 +346,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } + request.Block.ReceivedAt = msg.ReceivedAt + hash := request.Block.Hash() // Add the block hash as a known hash to the peer. This will later be used to determine // who should receive this. @@ -376,7 +378,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { // if the parent exists we process the block and propagate to our peers // if the parent does not exists we delegate to the downloader. if self.chainman.HasBlock(request.Block.ParentHash()) { - if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { + if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { // handle error return nil } @@ -419,7 +421,7 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) for _, peer := range peers { peer.sendNewBlock(block) } - glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") + glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total propagation time:", time.Since(block.ReceivedAt)) } // BroadcastTx will propagate the block to its connected peers. It will sort diff --git a/miner/worker.go b/miner/worker.go index a38b8a5d4..87d17dfd6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -184,7 +184,7 @@ func (self *worker) wait() { continue } - if err := self.chain.InsertChain(types.Blocks{block}); err == nil { + if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil { for _, uncle := range block.Uncles() { delete(self.possibleUncles, uncle.Hash()) } diff --git a/p2p/message.go b/p2p/message.go index be6405d6f..5ab5ab73e 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -22,9 +22,10 @@ import ( // structure, encode the payload into a byte array and create a // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { - Code uint64 - Size uint32 // size of the paylod - Payload io.Reader + Code uint64 + Size uint32 // size of the paylod + Payload io.Reader + ReceivedAt time.Time } // Decode parses the RLP content of a message into diff --git a/p2p/peer.go b/p2p/peer.go index 1262ba64a..bc0e6eb5f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -177,6 +177,7 @@ func (p *Peer) readLoop(errc chan<- error) { errc <- err return } + msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 06f082ca3..093c9be0c 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -162,7 +162,7 @@ func (t *BlockTest) TryBlocksInsert(chainManager *core.ChainManager) error { } } // RLP decoding worked, try to insert into chain: - err = chainManager.InsertChain(types.Blocks{cb}) + _, err = chainManager.InsertChain(types.Blocks{cb}) if err != nil { if b.BlockHeader == nil { continue // OK - block is supposed to be invalid, continue with next block |