diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 44 |
1 files changed, 38 insertions, 6 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index b68c5bc82..4d1aa4e93 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -2,16 +2,20 @@ package downloader import ( "math" + "math/big" "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "gopkg.in/fatih/set.v0" ) // queue represents hashes that are either need fetching or are being fetched type queue struct { - hashPool *set.Set + hashPool *set.Set + fetchPool *set.Set + blockHashes *set.Set mu sync.Mutex fetching map[string]*chunk @@ -20,8 +24,10 @@ type queue struct { func newqueue() *queue { return &queue{ - hashPool: set.New(), - fetching: make(map[string]*chunk), + hashPool: set.New(), + fetchPool: set.New(), + blockHashes: set.New(), + fetching: make(map[string]*chunk), } } @@ -50,6 +56,8 @@ func (c *queue) get(p *peer, max int) *chunk { }) // remove the fetchable hashes from hash pool c.hashPool.Separate(hashes) + c.fetchPool.Merge(hashes) + // Create a new chunk for the seperated hashes. The time is being used // to reset the chunk (timeout) chunk := &chunk{hashes, time.Now()} @@ -60,6 +68,22 @@ func (c *queue) get(p *peer, max int) *chunk { return chunk } +func (c *queue) has(hash common.Hash) bool { + return c.hashPool.Has(hash) || c.fetchPool.Has(hash) +} + +func (c *queue) addBlock(id string, block *types.Block, td *big.Int) { + c.mu.Lock() + defer c.mu.Unlock() + + // when adding a block make sure it doesn't already exist + if !c.blockHashes.Has(block.Hash()) { + c.hashPool.Remove(block.Hash()) + c.blocks = append(c.blocks, block) + } +} + +// deliver delivers a chunk to the queue that was requested of the peer func (c *queue) deliver(id string, blocks []*types.Block) { c.mu.Lock() defer c.mu.Unlock() @@ -70,15 +94,19 @@ func (c *queue) deliver(id string, blocks []*types.Block) { delete(c.fetching, id) // seperate the blocks and the hashes - chunk.seperate(blocks) + blockHashes := chunk.fetchedHashes(blocks) + // merge block hashes + c.blockHashes.Merge(blockHashes) // Add the blocks c.blocks = append(c.blocks, blocks...) // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) + c.fetchPool.Separate(chunk.hashes) } } +// puts puts sets of hashes on to the queue for fetching func (c *queue) put(hashes *set.Set) { c.mu.Lock() defer c.mu.Unlock() @@ -91,8 +119,12 @@ type chunk struct { itime time.Time } -func (ch *chunk) seperate(blocks []*types.Block) { +func (ch *chunk) fetchedHashes(blocks []*types.Block) *set.Set { + fhashes := set.New() for _, block := range blocks { - ch.hashes.Remove(block.Hash()) + fhashes.Add(block.Hash()) } + ch.hashes.Separate(fhashes) + + return fhashes } |