aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-05-08 02:07:20 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-05-08 02:07:20 +0800
commit9d188f73b58ee1fe4bda00a9536bda4056755f2c (patch)
treef9f9361b8440865a890df900ea7f7986ba169098 /eth/downloader
parent43901c92825389b694fb5488c520cf5122f022de (diff)
downloadgo-tangerine-9d188f73b58ee1fe4bda00a9536bda4056755f2c.tar.gz
go-tangerine-9d188f73b58ee1fe4bda00a9536bda4056755f2c.tar.zst
go-tangerine-9d188f73b58ee1fe4bda00a9536bda4056755f2c.zip
eth, eth/downloader: make synchronize thread safe
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go72
-rw-r--r--eth/downloader/downloader_test.go2
-rw-r--r--eth/downloader/queue.go10
3 files changed, 16 insertions, 68 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 25b251112..ef2a193ff 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -68,8 +68,7 @@ type Downloader struct {
getBlock getBlockFn
// Status
- fetchingHashes int32
- downloadingBlocks int32
+ synchronizing int32
// Channels
newPeerCh chan *peer
@@ -120,43 +119,26 @@ func (d *Downloader) UnregisterPeer(id string) {
delete(d.peers, id)
}
-// SynchroniseWithPeer will select the peer and use it for synchronizing. If an empty string is given
+// Synchronize will select the peer and use it for synchronizing. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) Synchronise(id string, hash common.Hash) error {
- // Make sure it's doing neither. Once done we can restart the
- // downloading process if the TD is higher. For now just get on
- // with whatever is going on. This prevents unnecessary switching.
- if d.isBusy() {
- return errBusy
+func (d *Downloader) Synchronize(id string, hash common.Hash) error {
+ // Make sure only one goroutine is ever allowed past this point at once
+ if !atomic.CompareAndSwapInt32(&d.synchronizing, 0, 1) {
+ return nil
}
+ defer atomic.StoreInt32(&d.synchronizing, 0)
- // When a synchronization attempt is made while the queue still
- // contains items we abort the sync attempt
- if done, pend := d.queue.Size(); done+pend > 0 {
+ // Abort if the queue still contains some leftover data
+ if _, cached := d.queue.Size(); cached > 0 {
return errPendingQueue
}
-
- // Fetch the peer using the id or throw an error if the peer couldn't be found
+ // Retrieve the origin peer and initiate the downloading process
p := d.peers[id]
if p == nil {
return errUnknownPeer
}
-
- // Get the hash from the peer and initiate the downloading progress.
- err := d.getFromPeer(p, hash, false)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// Done lets the downloader know that whatever previous hashes were taken
-// are processed. If the block count reaches zero and done is called
-// we reset the queue for the next batch of incoming hashes and blocks.
-func (d *Downloader) Done() {
- d.queue.Done()
+ return d.getFromPeer(p, hash, false)
}
// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
@@ -176,6 +158,7 @@ func (d *Downloader) Has(hash common.Hash) bool {
}
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) {
+
d.activePeer = p.id
defer func() {
// reset on error
@@ -184,7 +167,7 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
}
}()
- glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+ glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id)
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
@@ -200,20 +183,13 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool)
return err
}
- glog.V(logger.Detail).Infoln("Sync completed")
+ glog.V(logger.Debug).Infoln("Synchronization completed")
return nil
}
// XXX Make synchronous
func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error {
- atomic.StoreInt32(&d.fetchingHashes, 1)
- defer atomic.StoreInt32(&d.fetchingHashes, 0)
-
- if d.queue.Has(h) { // TODO: Is this possible? Shouldn't queue be empty for startFetchingHashes to be even called?
- return errAlreadyInPool
- }
-
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
start := time.Now()
@@ -312,10 +288,8 @@ out:
}
func (d *Downloader) startFetchingBlocks(p *peer) error {
- glog.V(logger.Detail).Infoln("Downloading", d.queue.Pending(), "block(s)")
+ glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
- atomic.StoreInt32(&d.downloadingBlocks, 1)
- defer atomic.StoreInt32(&d.downloadingBlocks, 0)
// Defer the peer reset. This will empty the peer requested set
// and makes sure there are no lingering peers with an incorrect
// state
@@ -439,19 +413,3 @@ func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
return nil
}
-
-func (d *Downloader) isFetchingHashes() bool {
- return atomic.LoadInt32(&d.fetchingHashes) == 1
-}
-
-func (d *Downloader) isDownloadingBlocks() bool {
- return atomic.LoadInt32(&d.downloadingBlocks) == 1
-}
-
-func (d *Downloader) isBusy() bool {
- return d.isFetchingHashes() || d.isDownloadingBlocks()
-}
-
-func (d *Downloader) IsBusy() bool {
- return d.isBusy()
-}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index bd439d96a..f3402794b 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -61,7 +61,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
dl.activePeerId = peerId
- return dl.downloader.Synchronise(peerId, hash)
+ return dl.downloader.Synchronize(peerId, hash)
}
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index d849d4d68..515440bca 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -63,16 +63,6 @@ func (q *queue) Reset() {
q.blockCache = nil
}
-// Done checks if all the downloads have been retrieved, wiping the queue.
-func (q *queue) Done() {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- if len(q.blockCache) == 0 {
- q.Reset()
- }
-}
-
// Size retrieves the number of hashes in the queue, returning separately for
// pending and already downloaded.
func (q *queue) Size() (int, int) {