aboutsummaryrefslogtreecommitdiffstats
path: root/eth/fetcher
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-13 17:04:25 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-10-21 21:49:55 +0800
commit5b0ee8ec304663898073b7a4c659e1def23716df (patch)
tree8f2f49a8d26dc1c29e1d360fb787ab420d90a2ae /eth/fetcher
parentaa0538db0b5de2bb2c609d629b65d083649f9171 (diff)
downloaddexon-5b0ee8ec304663898073b7a4c659e1def23716df.tar.gz
dexon-5b0ee8ec304663898073b7a4c659e1def23716df.tar.zst
dexon-5b0ee8ec304663898073b7a4c659e1def23716df.zip
core, eth, trie: fix data races and merge/review issues
Diffstat (limited to 'eth/fetcher')
-rw-r--r--eth/fetcher/fetcher.go26
-rw-r--r--eth/fetcher/fetcher_test.go49
2 files changed, 59 insertions, 16 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index b8ec1fc55..d88d91982 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -142,9 +142,11 @@ type Fetcher struct {
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
- fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
- completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
- importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
+ announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
+ queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
+ fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
+ completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
+ importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
}
// New creates a block fetcher to retrieve blocks based on hash announcements.
@@ -324,11 +326,16 @@ func (f *Fetcher) loop() {
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*inject)
-
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(op.block.Hash(), false)
+ }
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(op.block.NumberU64()))
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(op.block.Hash(), true)
+ }
break
}
// Otherwise if fresh and still unknown, try and import
@@ -372,6 +379,9 @@ func (f *Fetcher) loop() {
}
f.announces[notification.origin] = count
f.announced[notification.hash] = append(f.announced[notification.hash], notification)
+ if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
+ f.announceChangeHook(notification.hash, true)
+ }
if len(f.announced) == 1 {
f.rescheduleFetch(fetchTimer)
}
@@ -714,7 +724,9 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
f.queues[peer] = count
f.queued[hash] = op
f.queue.Push(op, -float32(block.NumberU64()))
-
+ if f.queueChangeHook != nil {
+ f.queueChangeHook(op.block.Hash(), true)
+ }
if glog.V(logger.Debug) {
glog.Infof("Peer %s: queued block #%d [%x…], total %v", peer, block.NumberU64(), hash.Bytes()[:4], f.queue.Size())
}
@@ -781,7 +793,9 @@ func (f *Fetcher) forgetHash(hash common.Hash) {
}
}
delete(f.announced, hash)
-
+ if f.announceChangeHook != nil {
+ f.announceChangeHook(hash, false)
+ }
// Remove any pending fetches and decrement the DOS counters
if announce := f.fetching[hash]; announce != nil {
f.announces[announce.origin]--
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go
index 170a80aba..2404c8cfa 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/fetcher_test.go
@@ -145,6 +145,9 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
// dropPeer is an emulator for the peer removal, simply accumulating the various
// peers dropped by the fetcher.
func (f *fetcherTester) dropPeer(peer string) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
f.drops[peer] = true
}
@@ -608,8 +611,11 @@ func TestDistantPropagationDiscarding(t *testing.T) {
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester()
+
+ tester.lock.Lock()
tester.hashes = []common.Hash{head}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+ tester.lock.Unlock()
// Ensure that a block with a lower number than the threshold is discarded
tester.fetcher.Enqueue("lower", blocks[hashes[low]])
@@ -641,8 +647,11 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester()
+
+ tester.lock.Lock()
tester.hashes = []common.Hash{head}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
+ tester.lock.Unlock()
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@@ -687,14 +696,22 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, false)
- if !tester.drops["bad"] {
+ tester.lock.RLock()
+ dropped := tester.drops["bad"]
+ tester.lock.RUnlock()
+
+ if !dropped {
t.Fatalf("peer with invalid numbered announcement not dropped")
}
// Make sure a good announcement passes without a drop
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, true)
- if tester.drops["good"] {
+ tester.lock.RLock()
+ dropped = tester.drops["good"]
+ tester.lock.RUnlock()
+
+ if dropped {
t.Fatalf("peer with valid numbered announcement dropped")
}
verifyImportDone(t, imported)
@@ -752,9 +769,15 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// Create a tester with instrumented import hooks
tester := newTester()
- imported := make(chan *types.Block)
+ imported, announces := make(chan *types.Block), int32(0)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
-
+ tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
+ if added {
+ atomic.AddInt32(&announces, 1)
+ } else {
+ atomic.AddInt32(&announces, -1)
+ }
+ }
// Create a valid chain and an infinite junk chain
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
@@ -782,8 +805,8 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher)
}
}
- if len(tester.fetcher.announced) != hashLimit+maxQueueDist {
- t.Fatalf("queued announce count mismatch: have %d, want %d", len(tester.fetcher.announced), hashLimit+maxQueueDist)
+ if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
+ t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
}
// Wait for fetches to complete
verifyImportCount(t, imported, maxQueueDist)
@@ -807,9 +830,15 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
// Create a tester with instrumented import hooks
tester := newTester()
- imported := make(chan *types.Block)
+ imported, enqueued := make(chan *types.Block), int32(0)
tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
-
+ tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
+ if added {
+ atomic.AddInt32(&enqueued, 1)
+ } else {
+ atomic.AddInt32(&enqueued, -1)
+ }
+ }
// Create a valid chain and a batch of dangling (but in range) blocks
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
@@ -825,7 +854,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
tester.fetcher.Enqueue("attacker", block)
}
time.Sleep(200 * time.Millisecond)
- if queued := tester.fetcher.queue.Size(); queued != blockLimit {
+ if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
}
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
@@ -833,7 +862,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
}
time.Sleep(100 * time.Millisecond)
- if queued := tester.fetcher.queue.Size(); queued != blockLimit+maxQueueDist-1 {
+ if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
}
// Insert the missing piece (and sanity check the import)