diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-13 17:04:25 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-21 21:49:55 +0800 |
commit | 5b0ee8ec304663898073b7a4c659e1def23716df (patch) | |
tree | 8f2f49a8d26dc1c29e1d360fb787ab420d90a2ae /eth/fetcher | |
parent | aa0538db0b5de2bb2c609d629b65d083649f9171 (diff) | |
download | dexon-5b0ee8ec304663898073b7a4c659e1def23716df.tar.gz dexon-5b0ee8ec304663898073b7a4c659e1def23716df.tar.zst dexon-5b0ee8ec304663898073b7a4c659e1def23716df.zip |
core, eth, trie: fix data races and merge/review issues
Diffstat (limited to 'eth/fetcher')
-rw-r--r-- | eth/fetcher/fetcher.go | 26 | ||||
-rw-r--r-- | eth/fetcher/fetcher_test.go | 49 |
2 files changed, 59 insertions, 16 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index b8ec1fc55..d88d91982 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -142,9 +142,11 @@ type Fetcher struct { dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks - fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch - completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) - importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list + queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue + fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch + completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) + importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) } // New creates a block fetcher to retrieve blocks based on hash announcements. @@ -324,11 +326,16 @@ func (f *Fetcher) loop() { height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*inject) - + if f.queueChangeHook != nil { + f.queueChangeHook(op.block.Hash(), false) + } // If too high up the chain or phase, continue later number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -float32(op.block.NumberU64())) + if f.queueChangeHook != nil { + f.queueChangeHook(op.block.Hash(), true) + } break } // Otherwise if fresh and still unknown, try and import @@ -372,6 +379,9 @@ func (f *Fetcher) loop() { } f.announces[notification.origin] = count f.announced[notification.hash] = append(f.announced[notification.hash], notification) + if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 { + f.announceChangeHook(notification.hash, true) + } if len(f.announced) == 1 { f.rescheduleFetch(fetchTimer) } @@ -714,7 +724,9 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) - + if f.queueChangeHook != nil { + f.queueChangeHook(op.block.Hash(), true) + } if glog.V(logger.Debug) { glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size()) } @@ -781,7 +793,9 @@ func (f *Fetcher) forgetHash(hash common.Hash) { } } delete(f.announced, hash) - + if f.announceChangeHook != nil { + f.announceChangeHook(hash, false) + } // Remove any pending fetches and decrement the DOS counters if announce := f.fetching[hash]; announce != nil { f.announces[announce.origin]-- diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index 170a80aba..2404c8cfa 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -145,6 +145,9 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) { // dropPeer is an emulator for the peer removal, simply accumulating the various // peers dropped by the fetcher. func (f *fetcherTester) dropPeer(peer string) { + f.lock.Lock() + defer f.lock.Unlock() + f.drops[peer] = true } @@ -608,8 +611,11 @@ func TestDistantPropagationDiscarding(t *testing.T) { // Create a tester and simulate a head block being the middle of the above chain tester := newTester() + + tester.lock.Lock() tester.hashes = []common.Hash{head} tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} + tester.lock.Unlock() // Ensure that a block with a lower number than the threshold is discarded tester.fetcher.Enqueue("lower", blocks[hashes[low]]) @@ -641,8 +647,11 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { // Create a tester and simulate a head block being the middle of the above chain tester := newTester() + + tester.lock.Lock() tester.hashes = []common.Hash{head} tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} + tester.lock.Unlock() headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher(blocks, 0) @@ -687,14 +696,22 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) verifyImportEvent(t, imported, false) - if !tester.drops["bad"] { + tester.lock.RLock() + dropped := tester.drops["bad"] + tester.lock.RUnlock() + + if !dropped { t.Fatalf("peer with invalid numbered announcement not dropped") } // Make sure a good announcement passes without a drop tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) verifyImportEvent(t, imported, true) - if tester.drops["good"] { + tester.lock.RLock() + dropped = tester.drops["good"] + tester.lock.RUnlock() + + if dropped { t.Fatalf("peer with valid numbered announcement dropped") } verifyImportDone(t, imported) @@ -752,9 +769,15 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { // Create a tester with instrumented import hooks tester := newTester() - imported := make(chan *types.Block) + imported, announces := make(chan *types.Block), int32(0) tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) { + if added { + atomic.AddInt32(&announces, 1) + } else { + atomic.AddInt32(&announces, -1) + } + } // Create a valid chain and an infinite junk chain targetBlocks := hashLimit + 2*maxQueueDist hashes, blocks := makeChain(targetBlocks, 0, genesis) @@ -782,8 +805,8 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher) } } - if len(tester.fetcher.announced) != hashLimit+maxQueueDist { - t.Fatalf("queued announce count mismatch: have %d, want %d", len(tester.fetcher.announced), hashLimit+maxQueueDist) + if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist { + t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist) } // Wait for fetches to complete verifyImportCount(t, imported, maxQueueDist) @@ -807,9 +830,15 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { // Create a tester with instrumented import hooks tester := newTester() - imported := make(chan *types.Block) + imported, enqueued := make(chan *types.Block), int32(0) tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) { + if added { + atomic.AddInt32(&enqueued, 1) + } else { + atomic.AddInt32(&enqueued, -1) + } + } // Create a valid chain and a batch of dangling (but in range) blocks targetBlocks := hashLimit + 2*maxQueueDist hashes, blocks := makeChain(targetBlocks, 0, genesis) @@ -825,7 +854,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester.fetcher.Enqueue("attacker", block) } time.Sleep(200 * time.Millisecond) - if queued := tester.fetcher.queue.Size(); queued != blockLimit { + if queued := atomic.LoadInt32(&enqueued); queued != blockLimit { t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit) } // Queue up a batch of valid blocks, and check that a new peer is allowed to do so @@ -833,7 +862,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]]) } time.Sleep(100 * time.Millisecond) - if queued := tester.fetcher.queue.Size(); queued != blockLimit+maxQueueDist-1 { + if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 { t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1) } // Insert the missing piece (and sanity check the import) |