aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/fetcher/fetcher.go21
-rw-r--r--eth/fetcher/fetcher_test.go28
-rw-r--r--eth/handler.go2
3 files changed, 51 insertions, 0 deletions
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go
index e8a9cc093..34d368780 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/fetcher.go
@@ -56,6 +56,7 @@ type inject struct {
type Fetcher struct {
// Various event channels
notify chan *announce
+ insert chan *inject
filter chan chan []*types.Block
quit chan struct{}
@@ -69,6 +70,7 @@ type Fetcher struct {
func New(hasBlock hashCheckFn, importBlock blockImporterFn, chainHeight chainHeightFn) *Fetcher {
return &Fetcher{
notify: make(chan *announce),
+ insert: make(chan *inject),
filter: make(chan chan []*types.Block),
quit: make(chan struct{}),
hasBlock: hasBlock,
@@ -106,6 +108,20 @@ func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher
}
}
+// Enqueue tries to fill gaps the the fetcher's future import queue.
+func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
+ op := &inject{
+ origin: peer,
+ block: block,
+ }
+ select {
+ case f.insert <- op:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
// Filter extracts all the blocks that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
@@ -192,6 +208,11 @@ func (f *Fetcher) loop() {
}
announced[notification.hash] = append(announced[notification.hash], notification)
+ case op := <-f.insert:
+ // A direct block insertion was requested, try and fill any pending gaps
+ queued.Push(op, -float32(op.block.NumberU64()))
+ glog.V(logger.Detail).Infof("Peer %s: filled block %x, total %v", op.origin, op.block.Hash().Bytes()[:4], queued.Size())
+
case hash := <-done:
// A pending import finished, remove all traces of the notification
delete(announced, hash)
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go
index cde4bb70a..7c975841c 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/fetcher_test.go
@@ -271,3 +271,31 @@ func TestRandomArrivalImport(t *testing.T) {
t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
}
}
+
+// Tests that direct block enqueues (due to block propagation vs. hash announce)
+// are correctly schedule, filling and import queue gaps.
+func TestQueueGapFill(t *testing.T) {
+ // Create a chain of blocks to import, and choose one to not announce at all
+ targetBlocks := 24
+ hashes := createHashes(targetBlocks, knownHash)
+ blocks := createBlocksFromHashes(hashes)
+ skip := targetBlocks / 2
+
+ tester := newTester()
+ fetcher := tester.makeFetcher(blocks)
+
+ // Iteratively announce blocks, skipping one entry
+ for i := len(hashes) - 1; i >= 0; i-- {
+ if i != skip {
+ tester.fetcher.Notify("valid", hashes[i], time.Now().Add(-arriveTimeout), fetcher)
+ time.Sleep(50 * time.Millisecond)
+ }
+ }
+ // Fill the missing block directly as if propagated
+ tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
+ time.Sleep(50 * time.Millisecond)
+
+ if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
+ t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
+ }
+}
diff --git a/eth/handler.go b/eth/handler.go
index e5908dc88..604983a7b 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -340,6 +340,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
request.Block.ReceivedAt = msg.ReceivedAt
+ // Try to import the propagated block, also making it fill any fetcher gaps
+ self.fetcher.Enqueue(p.id, request.Block)
if err := self.importBlock(p, request.Block, request.TD); err != nil {
return err
}