aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/handler.go48
-rw-r--r--eth/sync.go56
2 files changed, 81 insertions, 23 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 7e9ec593a..acc16812a 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -18,15 +18,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-const (
- forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
- blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
- notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
- notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
- blockProcAmount = 256
-)
-
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -57,9 +48,11 @@ type ProtocolManager struct {
txSub event.Subscription
minedBlockSub event.Subscription
- newPeerCh chan *peer
- newHashCh chan []*blockAnnounce
- quitSync chan struct{}
+ newPeerCh chan *peer
+ newHashCh chan []*blockAnnounce
+ newBlockCh chan chan []*types.Block
+ quitSync chan struct{}
+
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
newHashCh: make(chan []*blockAnnounce, 1),
+ newBlockCh: make(chan chan []*types.Block),
quitSync: make(chan struct{}),
}
@@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
return p.sendBlocks(blocks)
case BlocksMsg:
- var blocks []*types.Block
-
+ // Decode the arrived block message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+
+ var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
-
- // Either deliver to the downloader or the importer
- if self.downloader.Synchronising() {
- self.downloader.DeliverBlocks(p.id, blocks)
- } else {
- for _, block := range blocks {
- if err := self.importBlock(p, block, nil); err != nil {
- return err
+ // Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
+ filter := make(chan []*types.Block)
+ select {
+ case <-self.quitSync:
+ case self.newBlockCh <- filter:
+ select {
+ case <-self.quitSync:
+ case filter <- blocks:
+ select {
+ case <-self.quitSync:
+ case blocks := <-filter:
+ self.downloader.DeliverBlocks(p.id, blocks)
}
}
}
@@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
}
}
if len(announces) > 0 {
- self.newHashCh <- announces
+ select {
+ case self.newHashCh <- announces:
+ case <-self.quitSync:
+ }
}
case NewBlockMsg:
diff --git a/eth/sync.go b/eth/sync.go
index 1a1cbdb47..f761f3cd1 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -12,6 +12,16 @@ import (
"github.com/ethereum/go-ethereum/logger/glog"
)
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process
+ notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
+ notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+ notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockProcAmount = 256
+)
+
// blockAnnounce is the hash notification of the availability of a new block in
// the network.
type blockAnnounce struct {
@@ -25,6 +35,7 @@ type blockAnnounce struct {
func (pm *ProtocolManager) fetcher() {
announces := make(map[common.Hash]*blockAnnounce)
request := make(map[*peer][]common.Hash)
+ pending := make(map[common.Hash]*blockAnnounce)
cycle := time.Tick(notifyCheckCycle)
// Iterate the block fetching until a quit is requested
@@ -38,11 +49,18 @@ func (pm *ProtocolManager) fetcher() {
}
case <-cycle:
+ // Clean up any expired block fetches
+ for hash, announce := range pending {
+ if time.Since(announce.time) > notifyFetchTimeout {
+ delete(pending, hash)
+ }
+ }
// Check if any notified blocks failed to arrive
for hash, announce := range announces {
if time.Since(announce.time) > notifyArriveTimeout {
if !pm.chainman.HasBlock(hash) {
request[announce.peer] = append(request[announce.peer], hash)
+ pending[hash] = announce
}
delete(announces, hash)
}
@@ -57,6 +75,44 @@ func (pm *ProtocolManager) fetcher() {
}
request = make(map[*peer][]common.Hash)
+ case filter := <-pm.newBlockCh:
+ // Blocks arrived, extract any explicit requests, return all else
+ var blocks types.Blocks
+ select {
+ case blocks = <-filter:
+ case <-pm.quitSync:
+ return
+ }
+
+ fetch, sync := []*types.Block{}, []*types.Block{}
+ for _, block := range blocks {
+ hash := block.Hash()
+ if _, ok := pending[hash]; ok {
+ fetch = append(fetch, block)
+ } else {
+ sync = append(sync, block)
+ }
+ }
+
+ select {
+ case filter <- sync:
+ case <-pm.quitSync:
+ return
+ }
+ // If any explicit fetches were replied to, import them
+ if len(fetch) > 0 {
+ go func() {
+ for _, block := range fetch {
+ if announce := pending[block.Hash()]; announce != nil {
+ if err := pm.importBlock(announce.peer, block, nil); err != nil {
+ glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
+ return
+ }
+ }
+ }
+ }()
+ }
+
case <-pm.quitSync:
return
}