diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-06-17 21:53:28 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-06-18 20:56:08 +0800 |
commit | 37c5ff392f71fddaa6acd92925f69e81876fe22e (patch) | |
tree | 30602b2ab8bd498ba3ace24d2d6447fcee107eb9 /eth/fetcher/fetcher.go | |
parent | 2a7411bc9605dae9798ed29ed237e28c8fc98895 (diff) | |
download | go-tangerine-37c5ff392f71fddaa6acd92925f69e81876fe22e.tar.gz go-tangerine-37c5ff392f71fddaa6acd92925f69e81876fe22e.tar.zst go-tangerine-37c5ff392f71fddaa6acd92925f69e81876fe22e.zip |
eth/fetcher: build longest chain until proven otherwise
Diffstat (limited to 'eth/fetcher/fetcher.go')
-rw-r--r-- | eth/fetcher/fetcher.go | 67 |
1 files changed, 37 insertions, 30 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 78d25f72e..a70fcbeed 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -3,7 +3,6 @@ package fetcher import ( "errors" - "math" "math/rand" "time" @@ -57,8 +56,9 @@ type inject struct { type Fetcher struct { // Various event channels notify chan *announce - insert chan *inject + inject chan *inject filter chan chan []*types.Block + done chan common.Hash quit chan struct{} // Announce states @@ -79,8 +79,9 @@ type Fetcher struct { func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher { return &Fetcher{ notify: make(chan *announce), - insert: make(chan *inject), + inject: make(chan *inject), filter: make(chan chan []*types.Block), + done: make(chan common.Hash), quit: make(chan struct{}), announced: make(map[common.Hash][]*announce), fetching: make(map[common.Hash]*announce), @@ -128,7 +129,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { block: block, } select { - case f.insert <- op: + case f.inject <- op: return nil case <-f.quit: return errTerminated @@ -166,8 +167,6 @@ func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks { func (f *Fetcher) loop() { // Iterate the block fetching until a quit is requested fetch := time.NewTimer(0) - done := make(chan common.Hash) - for { // Clean up any expired block fetches for hash, announce := range f.fetching { @@ -179,27 +178,19 @@ func (f *Fetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - // If too high up the chain, continue later op := f.queue.PopItem().(*inject) - if number := op.block.NumberU64(); number > height+1 { + number := op.block.NumberU64() + + // If too high up the chain or phase, continue later + if number > height+1 { f.queue.Push(op, -float32(op.block.NumberU64())) break } - // Otherwise if not known yet, try and import - hash := op.block.Hash() - if f.hasBlock(hash) { + // Otherwise if fresh and still unknown, try and import + if number <= height || f.hasBlock(op.block.Hash()) { continue } - // Block may just fit, try to import it - glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", op.origin, op.block.NumberU64(), hash.Bytes()[:4]) - go func() { - defer func() { done <- hash }() - - if err := f.importBlock(op.origin, op.block); err != nil { - glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", op.origin, op.block.NumberU64(), hash.Bytes()[:4], err) - return - } - }() + f.insert(op.origin, op.block) } // Wait for an outside event to occur select { @@ -209,7 +200,6 @@ func (f *Fetcher) loop() { case notification := <-f.notify: // A block was announced, schedule if it's not yet downloading - glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4]) if _, ok := f.fetching[notification.hash]; ok { break } @@ -218,11 +208,11 @@ func (f *Fetcher) loop() { f.reschedule(fetch) } - case op := <-f.insert: + case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps f.enqueue(op.origin, op.block) - case hash := <-done: + case hash := <-f.done: // A pending import finished, remove all traces of the notification delete(f.announced, hash) delete(f.fetching, hash) @@ -243,8 +233,7 @@ func (f *Fetcher) loop() { } } // Send out all block requests - for peer, hashes := range request { - glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes)) + for _, hashes := range request { go f.fetching[hashes[0]].fetch(hashes) } // Schedule the next fetch if blocks are still pending @@ -304,7 +293,6 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { earliest = announces[0].time } } - glog.V(logger.Detail).Infof("Scheduling next fetch in %v", arriveTimeout-time.Since(earliest)) fetch.Reset(arriveTimeout - time.Since(earliest)) } @@ -313,9 +301,9 @@ func (f *Fetcher) reschedule(fetch *time.Timer) { func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash() - // Make sure the block isn't in some weird place - if math.Abs(float64(f.chainHeight())-float64(block.NumberU64())) > maxQueueDist { - glog.Infof("Peer %s: discarded block #%d [%x] too far from head", peer, block.NumberU64(), hash.Bytes()[:4]) + // Discard any past or too distant blocks + if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist <= 0 || dist > maxQueueDist { + glog.Infof("Peer %s: discarded block #%d [%x], distance %d", peer, block.NumberU64(), hash.Bytes()[:4], dist) return } // Schedule the block for future importing @@ -328,3 +316,22 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { } } } + +// insert spawns a new goroutine to run a block insertion into the chain. If the +// block's number is at the same height as the current import phase, if updates +// the phase states accordingly. +func (f *Fetcher) insert(peer string, block *types.Block) { + hash := block.Hash() + + // Run the import on a new thread + glog.V(logger.Debug).Infof("Peer %s: importing block #%d [%x]", peer, block.NumberU64(), hash[:4]) + go func() { + defer func() { f.done <- hash }() + + // Run the actual import and log any issues + if err := f.importBlock(peer, block); err != nil { + glog.V(logger.Detail).Infof("Peer %s: block #%d [%x] import failed: %v", peer, block.NumberU64(), hash[:4], err) + return + } + }() +} |