From 88f174a014c1f2f99fa6d6a8054ada28a0b43504 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 1 Jun 2016 18:07:25 +0300 Subject: eth/downloader: adaptive quality of service tuning --- eth/downloader/downloader_test.go | 58 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) (limited to 'eth/downloader/downloader_test.go') diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index f3a0e38f1..a9c069a92 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -179,6 +179,12 @@ func newTester() *downloadTester { return tester } +// terminate aborts any operations on the embedded downloader and releases all +// held resources. +func (dl *downloadTester) terminate() { + dl.downloader.Terminate() +} + // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { dl.lock.RLock() @@ -740,6 +746,8 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Synchronise with the peer and make sure all relevant data was retrieved @@ -764,6 +772,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Wrap the importer to allow stepping @@ -851,6 +861,8 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) tester := newTester() + defer tester.terminate() + tester.newPeer("fork A", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("fork B", protocol, hashesB, headersB, blocksB, receiptsB) @@ -885,6 +897,8 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("light", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy", protocol, hashesB[fork/2:], headersB, blocksB, receiptsB) @@ -934,6 +948,8 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, true) tester := newTester() + defer tester.terminate() + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("rewriter", protocol, hashesB, headersB, blocksB, receiptsB) @@ -968,6 +984,8 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { hashesA, hashesB, headersA, headersB, blocksA, blocksB, receiptsA, receiptsB := makeChainFork(common+fork, fork, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("original", protocol, hashesA, headersA, blocksA, receiptsA) tester.newPeer("heavy-rewriter", protocol, hashesB[MaxForkAncestry-17:], headersB, blocksB, receiptsB) // Root the fork below the ancestor limit @@ -987,7 +1005,9 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { // bodies. func TestInactiveDownloader62(t *testing.T) { t.Parallel() + tester := newTester() + defer tester.terminate() // Check that neither block headers nor bodies are accepted if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { @@ -1002,7 +1022,9 @@ func TestInactiveDownloader62(t *testing.T) { // bodies and receipts. func TestInactiveDownloader63(t *testing.T) { t.Parallel() + tester := newTester() + defer tester.terminate() // Check that neither block headers nor bodies are accepted if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive { @@ -1039,6 +1061,8 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Make sure canceling works with a pristine downloader @@ -1074,6 +1098,8 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], headers, blocks, receipts) @@ -1103,6 +1129,8 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { // Create peers of every type tester := newTester() + defer tester.terminate() + tester.newPeer("peer 61", 61, hashes, nil, blocks, nil) tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) @@ -1140,6 +1168,8 @@ func testEmptyShortCircuit(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Instrument the downloader to signal body requests @@ -1193,6 +1223,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt a full sync with an attacker feeding gapped headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) @@ -1225,6 +1256,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt a full sync with an attacker feeding shifted headers tester.newPeer("attack", protocol, hashes, headers, blocks, receipts) @@ -1256,6 +1288,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { hashes, headers, blocks, receipts := makeChain(targetBlocks, 0, genesis, nil, false) tester := newTester() + defer tester.terminate() // Attempt to sync with an attacker that feeds junk during the fast sync phase. // This should result in the last fsHeaderSafetyNet headers being rolled back. @@ -1347,9 +1380,11 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() - hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) + defer tester.terminate() + hashes, headers, blocks, receipts := makeChain(0, 0, genesis, nil, false) tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, headers, blocks, receipts) + if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -1392,6 +1427,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) { } // Run the tests and check disconnection status tester := newTester() + defer tester.terminate() + for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) @@ -1433,6 +1470,8 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1505,6 +1544,8 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1580,6 +1621,8 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1656,6 +1699,8 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { progress := make(chan struct{}) tester := newTester() + defer tester.terminate() + tester.downloader.syncInitHook = func(origin, latest uint64) { starting <- struct{}{} <-progress @@ -1742,7 +1787,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { impl := tester.peerGetAbsHeadersFn("peer", 0) go impl(from, count, skip, reverse) // None of the extra deliveries should block. - timeout := time.After(5 * time.Second) + timeout := time.After(15 * time.Second) for i := 0; i < cap(deliveriesDone); i++ { select { case <-deliveriesDone: @@ -1755,6 +1800,7 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("peer", nil, mode); err != nil { t.Errorf("sync failed: %v", err) } + tester.terminate() } } @@ -1772,8 +1818,9 @@ func testFastCriticalRestarts(t *testing.T, protocol int) { // Create a tester peer with the critical section state roots missing (force failures) tester := newTester() - tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) + defer tester.terminate() + tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) for i := 0; i < fsPivotInterval; i++ { tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true } @@ -1783,11 +1830,14 @@ func testFastCriticalRestarts(t *testing.T, protocol int) { if err := tester.sync("peer", nil, FastSync); err == nil { t.Fatalf("failing fast sync succeeded: %v", err) } + time.Sleep(500 * time.Millisecond) // Make sure no in-flight requests remain + // If it's the first failure, pivot should be locked => reenable all others to detect pivot changes if i == 0 { + tester.lock.Lock() tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} + tester.lock.Unlock() } - time.Sleep(100 * time.Millisecond) // Make sure no in-flight requests remain } // Retry limit exhausted, downloader will switch to full sync, should succeed if err := tester.sync("peer", nil, FastSync); err != nil { -- cgit