aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-05-13 18:47:21 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-05-13 18:47:21 +0800
commitee0c8923035f44a44cfc9120255b807e90baada9 (patch)
tree595a05dbe8288356fc2ba5c45dea459c5c3c5ecd /eth/downloader/downloader.go
parent7cb0e242450fd15c6ee8f5d70af41504a047e0aa (diff)
downloadgo-tangerine-ee0c8923035f44a44cfc9120255b807e90baada9.tar.gz
go-tangerine-ee0c8923035f44a44cfc9120255b807e90baada9.tar.zst
go-tangerine-ee0c8923035f44a44cfc9120255b807e90baada9.zip
eth/downloader: fix deliveries to check for sync cancels
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go41
1 files changed, 31 insertions, 10 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index c6eecfe2f..04e9c3a21 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -70,7 +70,9 @@ type Downloader struct {
newPeerCh chan *peer
hashCh chan hashPack
blockCh chan blockPack
- cancelCh chan struct{}
+
+ cancelCh chan struct{} // Channel to cancel mid-flight syncs
+ cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
}
func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
@@ -83,6 +85,9 @@ func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1),
}
+ // Set the initial downloader state as canceled (sanity check)
+ downloader.cancelCh = make(chan struct{})
+ close(downloader.cancelCh)
return downloader
}
@@ -123,8 +128,10 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
}
defer atomic.StoreInt32(&d.synchronising, 0)
- // Create cancel channel for aborting midflight
+ // Create cancel channel for aborting mid-flight
+ d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
@@ -421,9 +428,18 @@ func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
- d.blockCh <- blockPack{id, blocks}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- return nil
+ select {
+ case d.blockCh <- blockPack{id, blocks}:
+ return nil
+
+ case <-cancel:
+ return errNoSyncActive
+ }
}
// DeliverHashes injects a new batch of hashes received from a remote node into
@@ -434,11 +450,16 @@ func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
if atomic.LoadInt32(&d.synchronising) == 0 {
return errNoSyncActive
}
- if glog.V(logger.Debug) && len(hashes) != 0 {
- from, to := hashes[0], hashes[len(hashes)-1]
- glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
- }
- d.hashCh <- hashPack{id, hashes}
+ // Deliver or abort if the sync is canceled while queuing
+ d.cancelLock.RLock()
+ cancel := d.cancelCh
+ d.cancelLock.RUnlock()
- return nil
+ select {
+ case d.hashCh <- hashPack{id, hashes}:
+ return nil
+
+ case <-cancel:
+ return errNoSyncActive
+ }
}