aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-18 05:04:57 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-18 05:04:57 +0800
commit2f4cbe22f5207b830f2685caae175cce70bcd231 (patch)
treeeac133c9dee26ec44418afc0fda6a1adf6e76555 /eth
parentae36beb38f356a08370e95559d04243140105c32 (diff)
downloadgo-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.gz
go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.tar.zst
go-tangerine-2f4cbe22f5207b830f2685caae175cce70bcd231.zip
eth, eth/downloader: fix processing interrupt caused by temp cancel
Diffstat (limited to 'eth')
-rw-r--r--eth/downloader/downloader.go40
-rw-r--r--eth/downloader/downloader_test.go4
-rw-r--r--eth/sync.go2
3 files changed, 22 insertions, 24 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index c7a05eb35..a79eabb3c 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -87,6 +87,8 @@ type Downloader struct {
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
banned *set.Set // Set of hashes we've received and banned
+ interrupt int32 // Atomic boolean to signal termination
+
// Statistics
importStart time.Time // Instance when the last blocks were taken from the cache
importQueue []*Block // Previously taken blocks to check import progress
@@ -245,12 +247,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started")
}
-
- // Create cancel channel for aborting mid-flight
- d.cancelLock.Lock()
- d.cancelCh = make(chan struct{})
- d.cancelLock.Unlock()
-
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
return errPendingQueue
@@ -260,12 +256,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
d.peers.Reset()
d.checks = make(map[common.Hash]*crossCheck)
+ // Create cancel channel for aborting mid-flight
+ d.cancelLock.Lock()
+ d.cancelCh = make(chan struct{})
+ d.cancelLock.Unlock()
+
// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
if p == nil {
return errUnknownPeer
}
-
return d.syncWithPeer(p, hash)
}
@@ -282,7 +282,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
defer func() {
// reset on error
if err != nil {
- d.Cancel()
+ d.cancel()
d.mux.Post(FailedEvent{err})
} else {
d.mux.Post(DoneEvent{})
@@ -301,9 +301,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
return nil
}
-// Cancel cancels all of the operations and resets the queue. It returns true
+// cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed.
-func (d *Downloader) Cancel() {
+func (d *Downloader) cancel() {
// Close the current cancel channel
d.cancelLock.Lock()
if d.cancelCh != nil {
@@ -320,6 +320,12 @@ func (d *Downloader) Cancel() {
d.queue.Reset()
}
+// Terminate interrupts the downloader, canceling all pending operations.
+func (d *Downloader) Terminate() {
+ atomic.StoreInt32(&d.interrupt, 1)
+ d.cancel()
+}
+
// fetchHahes starts retrieving hashes backwards from a specific peer and hash,
// up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation.
@@ -737,12 +743,6 @@ func (d *Downloader) process() (err error) {
atomic.StoreInt32(&d.processing, 0)
}()
-
- // Fetch the current cancel channel to allow termination
- d.cancelLock.RLock()
- cancel := d.cancelCh
- d.cancelLock.RUnlock()
-
// Repeat the processing as long as there are blocks to import
for {
// Fetch the next batch of blocks
@@ -759,12 +759,10 @@ func (d *Downloader) process() (err error) {
// Actually import the blocks
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
- for len(blocks) != 0 { // TODO: quit
+ for len(blocks) != 0 {
// Check for any termination requests
- select {
- case <-cancel:
+ if atomic.LoadInt32(&d.interrupt) == 1 {
return errCancelChainImport
- default:
}
// Retrieve the first batch of blocks to insert
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
@@ -777,7 +775,7 @@ func (d *Downloader) process() (err error) {
if err != nil {
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
d.dropPeer(blocks[index].OriginPeer)
- d.Cancel()
+ d.cancel()
return errCancelChainImport
}
blocks = blocks[max:]
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 53eb5f81d..f97e6077b 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -247,7 +247,7 @@ func TestCancel(t *testing.T) {
tester.newPeer("peer", hashes, blocks)
// Make sure canceling works with a pristine downloader
- tester.downloader.Cancel()
+ tester.downloader.cancel()
hashCount, blockCount := tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
@@ -256,7 +256,7 @@ func TestCancel(t *testing.T) {
if err := tester.sync("peer"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
- tester.downloader.Cancel()
+ tester.downloader.cancel()
hashCount, blockCount = tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
diff --git a/eth/sync.go b/eth/sync.go
index a3b177a4d..751bc1a2a 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -251,7 +251,7 @@ func (pm *ProtocolManager) fetcher() {
// downloading hashes and blocks as well as retrieving cached ones.
func (pm *ProtocolManager) syncer() {
// Abort any pending syncs if we terminate
- defer pm.downloader.Cancel()
+ defer pm.downloader.Terminate()
forceSync := time.Tick(forceSyncCycle)
for {