aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go149
1 files changed, 112 insertions, 37 deletions
diff --git a/eth/handler.go b/eth/handler.go
index b3890d365..858ae2958 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -1,10 +1,46 @@
package eth
+// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
+// The idea is that most of the calls within the protocol will become synchronous.
+// Block downloading and block processing will be complete seperate processes
+/*
+# Possible scenarios
+
+// Synching scenario
+// Use the best peer to synchronise
+blocks, err := pm.downloader.Synchronise()
+if err != nil {
+ // handle
+ break
+}
+pm.chainman.InsertChain(blocks)
+
+// Receiving block with known parent
+if parent_exist {
+ if err := pm.chainman.InsertChain(block); err != nil {
+ // handle
+ break
+ }
+ pm.BroadcastBlock(block)
+}
+
+// Receiving block with unknown parent
+blocks, err := pm.downloader.SynchroniseWithPeer(peer)
+if err != nil {
+ // handle
+ break
+}
+pm.chainman.InsertChain(blocks)
+
+*/
+
import (
"fmt"
+ "math"
"sync"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
@@ -17,27 +53,6 @@ func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
-// main entrypoint, wrappers starting a server running the eth protocol
-// use this constructor to attach the protocol ("class") to server caps
-// the Dev p2p layer then runs the protocol instance on each peer
-func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, downloader *downloader.Downloader) p2p.Protocol {
- protocol := newProtocolManager(txPool, chainManager, downloader)
-
- return p2p.Protocol{
- Name: "eth",
- Version: uint(protocolVersion),
- Length: ProtocolLength,
- Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- //return runEthProtocol(protocolVersion, networkId, txPool, chainManager, downloader, p, rw)
- peer := protocol.newPeer(protocolVersion, networkId, p, rw)
- err := protocol.handle(peer)
- glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
-
- return err
- },
- }
-}
-
type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error
@@ -51,44 +66,66 @@ type extProt struct {
func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) }
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
-type EthProtocolManager struct {
+type ProtocolManager struct {
protVer, netId int
txpool txPool
- chainman chainManager
+ chainman *core.ChainManager
downloader *downloader.Downloader
pmu sync.Mutex
peers map[string]*peer
+
+ SubProtocol p2p.Protocol
}
-func newProtocolManager(txpool txPool, chainman chainManager, downloader *downloader.Downloader) *EthProtocolManager {
- return &EthProtocolManager{
+// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
+// with the ethereum network.
+func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
+ manager := &ProtocolManager{
txpool: txpool,
chainman: chainman,
downloader: downloader,
peers: make(map[string]*peer),
}
+
+ manager.SubProtocol = p2p.Protocol{
+ Name: "eth",
+ Version: uint(protocolVersion),
+ Length: ProtocolLength,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := manager.newPeer(protocolVersion, networkId, p, rw)
+ err := manager.handle(peer)
+ glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
+
+ return err
+ },
+ }
+
+ return manager
}
-func (pm *EthProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- pm.pmu.Lock()
- defer pm.pmu.Unlock()
+func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
td, current, genesis := pm.chainman.Status()
- peer := newPeer(pv, nv, genesis, current, td, p, rw)
- pm.peers[peer.id] = peer
-
- return peer
+ return newPeer(pv, nv, genesis, current, td, p, rw)
}
-func (pm *EthProtocolManager) handle(p *peer) error {
+func (pm *ProtocolManager) handle(p *peer) error {
if err := p.handleStatus(); err != nil {
return err
}
+ pm.pmu.Lock()
+ pm.peers[p.id] = p
+ pm.pmu.Unlock()
pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks)
- defer pm.downloader.UnregisterPeer(p.id)
+ defer func() {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+ delete(pm.peers, p.id)
+ pm.downloader.UnregisterPeer(p.id)
+ }()
// propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
@@ -106,7 +143,7 @@ func (pm *EthProtocolManager) handle(p *peer) error {
return nil
}
-func (self *EthProtocolManager) handleMsg(p *peer) error {
+func (self *ProtocolManager) handleMsg(p *peer) error {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
@@ -192,7 +229,6 @@ func (self *EthProtocolManager) handleMsg(p *peer) error {
var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
- fmt.Println("decode error", err)
blocks = nil
}
self.downloader.DeliverChunk(p.id, blocks)
@@ -206,6 +242,10 @@ func (self *EthProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "block validation %v: %v", msg, err)
}
hash := request.Block.Hash()
+ // Add the block hash as a known hash to the peer. This will later be used to detirmine
+ // who should receive this.
+ p.blockHashes.Add(hash)
+
_, chainHead, _ := self.chainman.Status()
jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
@@ -215,10 +255,45 @@ func (self *EthProtocolManager) handleMsg(p *peer) error {
BlockPrevHash: request.Block.ParentHash().Hex(),
RemoteId: p.ID().String(),
})
- self.downloader.AddBlock(p.id, request.Block, request.TD)
+ // Attempt to insert the newly received by checking if the parent exists.
+ // if the parent exists we process the block and propagate to our peers
+ // if the parent does not exists we delegate to the downloader.
+ // NOTE we can reduce chatter by dropping blocks with Td < currentTd
+ if self.chainman.HasBlock(request.Block.ParentHash()) {
+ if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
+ // handle error
+ return nil
+ }
+ self.BroadcastBlock(hash, request.Block)
+ } else {
+ self.downloader.AddBlock(p.id, request.Block, request.TD)
+ }
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}
+
+// BroadcastBlock will propagate the block to its connected peers. It will sort
+// out which peers do not contain the block in their block set and will do a
+// sqrt(peers) to determine the amount of peers we broadcast to.
+func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+
+ // Find peers who don't know anything about the given hash. Peers that
+ // don't know about the hash will be a candidate for the broadcast loop
+ var peers []*peer
+ for _, peer := range pm.peers {
+ if !peer.blockHashes.Has(hash) {
+ peers = append(peers, peer)
+ }
+ }
+ // Broadcast block to peer set
+ peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for _, peer := range peers {
+ peer.sendNewBlock(block)
+ }
+ glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers")
+}