aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go80
-rw-r--r--eth/downloader/downloader_test.go95
-rw-r--r--eth/downloader/queue.go10
3 files changed, 119 insertions, 66 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 55455262a..cc75c3014 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -65,12 +65,15 @@ type Downloader struct {
// Status
synchronising int32
+ notified int32
// Channels
newPeerCh chan *peer
hashCh chan hashPack
blockCh chan blockPack
- cancelCh chan struct{}
+
+ cancelCh chan struct{} // Channel to cancel mid-flight syncs
+ cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
}
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
@@ -83,7 +86,6 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1),
}
-
return downloader
}
@@ -123,8 +125,14 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
}
defer atomic.StoreInt32(&d.synchronising, 0)
- // Create cancel channel for aborting midflight
+ // Post a user notification of the sync (only once per session)
+ if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
+ glog.V(logger.Info).Infoln("Block synchronisation started")
+ }
+ // Create cancel channel for aborting mid-flight
+ d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
@@ -142,16 +150,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
return d.syncWithPeer(p, hash)
}
-// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
-// it's possible it yields no blocks
+// TakeBlocks takes blocks from the queue and yields them to the caller.
func (d *Downloader) TakeBlocks() types.Blocks {
- // Check that there are blocks available and its parents are known
- head := d.queue.GetHeadBlock()
- if head == nil || !d.hasBlock(head.ParentHash()) {
- return nil
- }
- // Retrieve a full batch of blocks
- return d.queue.TakeBlocks(head)
+ return d.queue.TakeBlocks()
}
func (d *Downloader) Has(hash common.Hash) bool {
@@ -183,32 +184,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
// Cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
func (d *Downloader) Cancel() bool {
- hs, bs := d.queue.Size()
// If we're not syncing just return.
+ hs, bs := d.queue.Size()
if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 {
return false
}
-
+ // Close the current cancel channel
+ d.cancelLock.RLock()
close(d.cancelCh)
-
- // clean up
-hashDone:
- for {
- select {
- case <-d.hashCh:
- default:
- break hashDone
- }
- }
-
-blockDone:
- for {
- select {
- case <-d.blockCh:
- default:
- break blockDone
- }
- }
+ d.cancelLock.RUnlock()
// reset the queue
d.queue.Reset()
@@ -421,9 +405,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
- d.blockCh <- blockPack{id, blocks}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- return nil
+ select {
+ case d.blockCh <- blockPack{id, blocks}:
+ return nil
+
+ case <-cancel:
+ return errNoSyncActive
+ }
}
// DeliverHashes injects a new batch of hashes received from a remote node into
@@ -434,11 +427,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
- if glog.V(logger.Debug) && len(hashes) != 0 {
- from, to := hashes[0], hashes[len(hashes)-1]
- glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
- }
- d.hashCh <- hashPack{id, hashes}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- return nil
+ select {
+ case d.hashCh <- hashPack{id, hashes}:
+ return nil
+
+ case <-cancel:
+ return errNoSyncActive
+ }
}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 78eff011a..cfa6257a3 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -10,7 +10,10 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
-var knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
+var (
+ knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
+ unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}
+)
func createHashes(start, amount int) (hashes []common.Hash) {
hashes = make([]common.Hash, amount+1)
@@ -27,7 +30,7 @@ func createBlock(i int, prevHash, hash common.Hash) *types.Block {
header := &types.Header{Number: big.NewInt(int64(i))}
block := types.NewBlockWithHeader(header)
block.HeaderHash = hash
- block.ParentHeaderHash = knownHash
+ block.ParentHeaderHash = prevHash
return block
}
@@ -42,9 +45,12 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
}
type downloadTester struct {
- downloader *Downloader
- hashes []common.Hash
- blocks map[common.Hash]*types.Block
+ downloader *Downloader
+
+ hashes []common.Hash // Chain of hashes simulating
+ blocks map[common.Hash]*types.Block // Blocks associated with the hashes
+ chain []common.Hash // Block-chain being constructed
+
t *testing.T
pcount int
done chan bool
@@ -52,7 +58,15 @@ type downloadTester struct {
}
func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
- tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)}
+ tester := &downloadTester{
+ t: t,
+
+ hashes: hashes,
+ blocks: blocks,
+ chain: []common.Hash{knownHash},
+
+ done: make(chan bool),
+ }
downloader := New(tester.hasBlock, tester.getBlock)
tester.downloader = downloader
@@ -64,9 +78,17 @@ func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
return dl.downloader.Synchronise(peerId, hash)
}
+func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
+ for _, block := range blocks {
+ dl.chain = append(dl.chain, block.Hash())
+ }
+}
+
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
- if knownHash == hash {
- return true
+ for _, h := range dl.chain {
+ if h == hash {
+ return true
+ }
}
return false
}
@@ -175,10 +197,9 @@ func TestTaking(t *testing.T) {
if err != nil {
t.Error("download error", err)
}
-
- bs1 := tester.downloader.TakeBlocks()
- if len(bs1) != 1000 {
- t.Error("expected to take 1000, got", len(bs1))
+ bs := tester.downloader.TakeBlocks()
+ if len(bs) != targetBlocks {
+ t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks)
}
}
@@ -248,17 +269,17 @@ func TestThrottling(t *testing.T) {
done := make(chan struct{})
took := []*types.Block{}
go func() {
- for {
+ for running := true; running; {
select {
case <-done:
- took = append(took, tester.downloader.TakeBlocks()...)
- done <- struct{}{}
- return
+ running = false
default:
- took = append(took, tester.downloader.TakeBlocks()...)
time.Sleep(time.Millisecond)
}
+ // Take a batch of blocks and accumulate
+ took = append(took, tester.downloader.TakeBlocks()...)
}
+ done <- struct{}{}
}()
// Synchronise the two threads and verify
@@ -273,3 +294,43 @@ func TestThrottling(t *testing.T) {
t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
}
}
+
+// Tests that if a peer returns an invalid chain with a block pointing to a non-
+// existing parent, it is correctly detected and handled.
+func TestNonExistingParentAttack(t *testing.T) {
+ // Forge a single-link chain with a forged header
+ hashes := createHashes(0, 1)
+ blocks := createBlocksFromHashes(hashes)
+
+ forged := blocks[hashes[0]]
+ forged.ParentHeaderHash = unknownHash
+
+ // Try and sync with the malicious node and check that it fails
+ tester := newTester(t, hashes, blocks)
+ tester.newPeer("attack", big.NewInt(10000), hashes[0])
+ if err := tester.sync("attack", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ bs := tester.downloader.TakeBlocks()
+ if len(bs) != 1 {
+ t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
+ }
+ if tester.hasBlock(bs[0].ParentHash()) {
+ t.Fatalf("tester knows about the unknown hash")
+ }
+ tester.downloader.Cancel()
+
+ // Reconstruct a valid chain, and try to synchronize with it
+ forged.ParentHeaderHash = knownHash
+ tester.newPeer("valid", big.NewInt(20000), hashes[0])
+ if err := tester.sync("valid", hashes[0]); err != nil {
+ t.Fatalf("failed to synchronise blocks: %v", err)
+ }
+ bs = tester.downloader.TakeBlocks()
+ if len(bs) != 1 {
+ t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
+ }
+ if !tester.hasBlock(bs[0].ParentHash()) {
+ t.Fatalf("tester doesn't know about the origin hash")
+ }
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 40749698c..6ad915757 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -172,17 +172,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block {
}
// TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
-// The head parameter is required to prevent a race condition where concurrent
-// takes may fail parent verifications.
-func (q *queue) TakeBlocks(head *types.Block) types.Blocks {
+func (q *queue) TakeBlocks() types.Blocks {
q.lock.Lock()
defer q.lock.Unlock()
- // Short circuit if the head block's different
- if len(q.blockCache) == 0 || q.blockCache[0] != head {
- return nil
- }
- // Otherwise accumulate all available blocks
+ // Accumulate all available blocks
var blocks types.Blocks
for _, block := range q.blockCache {
if block == nil {