diff options
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/eth/handler.go b/eth/handler.go index 7e9ec593a..acc16812a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,15 +18,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -const ( - forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process - notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching - notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 -) - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -57,9 +48,11 @@ type ProtocolManager struct { txSub event.Subscription minedBlockSub event.Subscription - newPeerCh chan *peer - newHashCh chan []*blockAnnounce - quitSync chan struct{} + newPeerCh chan *peer + newHashCh chan []*blockAnnounce + newBlockCh chan chan []*types.Block + quitSync chan struct{} + // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup @@ -77,6 +70,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), + newBlockCh: make(chan chan []*types.Block), quitSync: make(chan struct{}), } @@ -274,21 +268,26 @@ func (self *ProtocolManager) handleMsg(p *peer) error { return p.sendBlocks(blocks) case BlocksMsg: - var blocks []*types.Block - + // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + + var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { glog.V(logger.Detail).Infoln("Decode error", err) blocks = nil } - - // Either deliver to the downloader or the importer - if self.downloader.Synchronising() { - self.downloader.DeliverBlocks(p.id, blocks) - } else { - for _, block := range blocks { - if err := self.importBlock(p, block, nil); err != nil { - return err + // Filter out any explicitly requested blocks (cascading select to get blocking back to peer) + filter := make(chan []*types.Block) + select { + case <-self.quitSync: + case self.newBlockCh <- filter: + select { + case <-self.quitSync: + case filter <- blocks: + select { + case <-self.quitSync: + case blocks := <-filter: + self.downloader.DeliverBlocks(p.id, blocks) } } } @@ -322,7 +321,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error { } } if len(announces) > 0 { - self.newHashCh <- announces + select { + case self.newHashCh <- announces: + case <-self.quitSync: + } } case NewBlockMsg: |