aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-11 20:56:08 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-15 14:22:36 +0800
commit66d3dc8690e0aa551e7b35a17006a2135b51c9bd (patch)
tree2e58d41dd5995e1b7c8d9c6f52b881e0c7934fbd
parent6f5c6150b7060b6b2ee68ac95b30f46c5c2c7f90 (diff)
downloaddexon-66d3dc8690e0aa551e7b35a17006a2135b51c9bd.tar.gz
dexon-66d3dc8690e0aa551e7b35a17006a2135b51c9bd.tar.zst
dexon-66d3dc8690e0aa551e7b35a17006a2135b51c9bd.zip
eth, eth/downloader: move peer removal into downloader
-rw-r--r--eth/backend.go8
-rw-r--r--eth/downloader/downloader.go87
-rw-r--r--eth/downloader/downloader_test.go32
-rw-r--r--eth/downloader/queue.go2
-rw-r--r--eth/handler.go4
-rw-r--r--eth/sync.go32
6 files changed, 83 insertions, 82 deletions
diff --git a/eth/backend.go b/eth/backend.go
index d2ec0cc62..4ebf21811 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -193,7 +193,6 @@ type Ethereum struct {
whisper *whisper.Whisper
pow *ethash.Ethash
protocolManager *ProtocolManager
- downloader *downloader.Downloader
SolcPath string
solc *compiler.Solidity
@@ -290,14 +289,13 @@ func New(config *Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
- eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor)
+ eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager)
+
eth.miner = miner.New(eth, eth.EventMux(), eth.pow)
eth.miner.SetGasPrice(config.GasPrice)
-
- eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader)
if config.Shh {
eth.whisper = whisper.New()
eth.shhVersionId = int(eth.whisper.Version())
@@ -447,7 +445,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio
func (s *Ethereum) EthVersion() int { return s.ethVersionId }
func (s *Ethereum) NetVersion() int { return s.netVersionId }
func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
-func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader }
+func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader }
// Start the ethereum
func (s *Ethereum) Start() error {
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index f0a515d12..499b3a585 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -32,28 +32,32 @@ var (
var (
errLowTd = errors.New("peers TD is too low")
- ErrBusy = errors.New("busy")
+ errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy")
- ErrBadPeer = errors.New("action from bad peer ignored")
- ErrStallingPeer = errors.New("peer is stalling")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errStallingPeer = errors.New("peer is stalling")
errBannedHead = errors.New("peer head hash already banned")
errNoPeers = errors.New("no peers to keep download active")
- ErrPendingQueue = errors.New("pending items in queue")
- ErrTimeout = errors.New("timeout")
- ErrEmptyHashSet = errors.New("empty hash set by peer")
+ errPendingQueue = errors.New("pending items in queue")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
- ErrInvalidChain = errors.New("retrieved hash chain is invalid")
- ErrCrossCheckFailed = errors.New("block cross-check failed")
+ errInvalidChain = errors.New("retrieved hash chain is invalid")
+ errCrossCheckFailed = errors.New("block cross-check failed")
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
errNoSyncActive = errors.New("no sync active")
)
+// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
type hashCheckFn func(common.Hash) bool
-type getBlockFn func(common.Hash) *types.Block
-type chainInsertFn func(types.Blocks) (int, error)
-type hashIterFn func() (common.Hash, error)
+
+// blockRetrievalFn is a callback type for retrieving a block from the local chain.
+type blockRetrievalFn func(common.Hash) *types.Block
+
+// peerDropFn is a callback type for dropping a peer detected as malicious.
+type peerDropFn func(id string)
type blockPack struct {
peerId string
@@ -85,8 +89,9 @@ type Downloader struct {
importLock sync.Mutex
// Callbacks
- hasBlock hashCheckFn
- getBlock getBlockFn
+ hasBlock hashCheckFn // Checks if a block is present in the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ dropPeer peerDropFn // Retrieved the TD of our own chain
// Status
synchronising int32
@@ -107,7 +112,8 @@ type Block struct {
OriginPeer string
}
-func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
+// New creates a new downloader to fetch hashes and blocks from remote peers.
+func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
// Create the base downloader
downloader := &Downloader{
mux: mux,
@@ -115,6 +121,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa
peers: newPeerSet(),
hasBlock: hasBlock,
getBlock: getBlock,
+ dropPeer: dropPeer,
newPeerCh: make(chan *peer, 1),
hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1),
@@ -183,19 +190,43 @@ func (d *Downloader) UnregisterPeer(id string) error {
return nil
}
-// Synchronise will select the peer and use it for synchronising. If an empty string is given
+// Synchronise tries to sync up our local block chain with a remote peer, both
+// adding various sanity checks as well as wrapping it with various log entries.
+func (d *Downloader) Synchronise(id string, head common.Hash) {
+ glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)
+
+ switch err := d.synchronise(id, head); err {
+ case nil:
+ glog.V(logger.Detail).Infof("Synchronisation completed")
+
+ case errBusy:
+ glog.V(logger.Detail).Infof("Synchronisation already in progress")
+
+ case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed:
+ glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
+ d.dropPeer(id)
+
+ case errPendingQueue:
+ glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
+
+ default:
+ glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
+ }
+}
+
+// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
-func (d *Downloader) Synchronise(id string, hash common.Hash) error {
+func (d *Downloader) synchronise(id string, hash common.Hash) error {
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
- return ErrBusy
+ return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
// If the head hash is banned, terminate immediately
if d.banned.Has(hash) {
- return ErrInvalidChain
+ return errInvalidChain
}
// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
@@ -209,7 +240,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
- return ErrPendingQueue
+ return errPendingQueue
}
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
@@ -342,7 +373,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
- return ErrEmptyHashSet
+ return errEmptyHashSet
}
for index, hash := range hashPack.hashes {
if d.banned.Has(hash) {
@@ -352,7 +383,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
if err := d.banBlocks(active.id, hash); err != nil {
glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
}
- return ErrInvalidChain
+ return errInvalidChain
}
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
@@ -369,12 +400,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
inserts := d.queue.Insert(hashPack.hashes)
if len(inserts) == 0 && !done {
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
- return ErrBadPeer
+ return errBadPeer
}
if !done {
// Check that the peer is not stalling the sync
if len(inserts) < MinHashFetch {
- return ErrStallingPeer
+ return errStallingPeer
}
// Try and fetch a random block to verify the hash batch
// Skip the last hash as the cross check races with the next hash fetch
@@ -408,7 +439,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
block := blockPack.blocks[0]
if check, ok := d.checks[block.Hash()]; ok {
if block.ParentHash() != check.parent {
- return ErrCrossCheckFailed
+ return errCrossCheckFailed
}
delete(d.checks, block.Hash())
}
@@ -418,7 +449,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
for hash, check := range d.checks {
if time.Now().After(check.expire) {
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
- return ErrCrossCheckFailed
+ return errCrossCheckFailed
}
}
@@ -438,7 +469,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
if p == nil || (head == common.Hash{}) {
- return ErrTimeout
+ return errTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
@@ -500,7 +531,7 @@ out:
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
- case ErrInvalidChain:
+ case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
return err
@@ -617,7 +648,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
return errCancelBlockFetch
case <-timeout:
- return ErrTimeout
+ return errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 5f10fb41f..5e79c10c9 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -73,7 +73,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
done: make(chan bool),
}
var mux event.TypeMux
- downloader := New(&mux, tester.hasBlock, tester.getBlock)
+ downloader := New(&mux, tester.hasBlock, tester.getBlock, nil)
tester.downloader = downloader
return tester
@@ -83,7 +83,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
// block until it returns
func (dl *downloadTester) sync(peerId string, head common.Hash) error {
dl.activePeerId = peerId
- return dl.downloader.Synchronise(peerId, head)
+ return dl.downloader.synchronise(peerId, head)
}
// syncTake is starts synchronising with a remote peer, but concurrently it also
@@ -415,8 +415,8 @@ func TestInvalidHashOrderAttack(t *testing.T) {
// Try and sync with the malicious node and check that it fails
tester := newTester(t, reverse, blocks)
tester.newPeer("attack", big.NewInt(10000), reverse[0])
- if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
+ if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
}
// Ensure that a valid chain can still pass sync
tester.hashes = hashes
@@ -438,8 +438,8 @@ func TestMadeupHashChainAttack(t *testing.T) {
// Try and sync with the malicious node and check that it fails
tester := newTester(t, hashes, nil)
tester.newPeer("attack", big.NewInt(10000), hashes[0])
- if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
}
@@ -455,8 +455,8 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) {
// Try and sync with the attacker, one hash at a time
tester.maxHashFetch = 1
tester.newPeer("attack", big.NewInt(10000), hashes[0])
- if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer)
+ if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
}
}
@@ -480,8 +480,8 @@ func TestMadeupBlockChainAttack(t *testing.T) {
// Try and sync with the malicious node and check that it fails
tester := newTester(t, gapped, blocks)
tester.newPeer("attack", big.NewInt(10000), gapped[0])
- if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
// Ensure that a valid chain can still pass sync
blockSoftTTL = defaultBlockTTL
@@ -514,8 +514,8 @@ func TestMadeupParentBlockChainAttack(t *testing.T) {
// Try and sync with the malicious node and check that it fails
tester := newTester(t, hashes, forges)
tester.newPeer("attack", big.NewInt(10000), hashes[0])
- if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
+ if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed)
}
// Ensure that a valid chain can still pass sync
blockSoftTTL = defaultBlockTTL
@@ -547,8 +547,8 @@ func TestBannedChainStarvationAttack(t *testing.T) {
tester.newPeer("attack", big.NewInt(10000), hashes[0])
for banned := tester.downloader.banned.Size(); ; {
// Try to sync with the attacker, check hash chain failure
- if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
+ if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
}
// Check that the ban list grew with at least 1 new item, or all banned
bans := tester.downloader.banned.Size()
@@ -592,8 +592,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) {
tester.newPeer("attack", big.NewInt(10000), hashes[0])
for {
// Try to sync with the attacker, check hash chain failure
- if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain {
- t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
+ if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain {
+ t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain)
}
// Short circuit if the entire chain was banned
if tester.downloader.banned.Has(hashes[0]) {
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 7abbd42fd..903f043eb 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -320,7 +320,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
// If a requested block falls out of the range, the hash chain is invalid
index := int(block.NumberU64()) - q.blockOffset
if index >= len(q.blockCache) || index < 0 {
- return ErrInvalidChain
+ return errInvalidChain
}
// Otherwise merge the block and mark the hash block
q.blockCache[index] = &Block{
diff --git a/eth/handler.go b/eth/handler.go
index f002727f3..ac7fb8fcf 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -68,12 +68,11 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
+func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
manager := &ProtocolManager{
eventMux: mux,
txpool: txpool,
chainman: chainman,
- downloader: downloader,
peers: newPeerSet(),
newPeerCh: make(chan *peer, 1),
newHashCh: make(chan []*blockAnnounce, 1),
@@ -81,6 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
+ manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer)
manager.SubProtocol = p2p.Protocol{
Name: "eth",
Version: uint(protocolVersion),
diff --git a/eth/sync.go b/eth/sync.go
index 8fee21d7b..b127ca979 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -8,7 +8,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p/discover"
@@ -332,33 +331,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
if peer.Td().Cmp(pm.chainman.Td()) <= 0 {
return
}
- // FIXME if we have the hash in our chain and the TD of the peer is
- // much higher than ours, something is wrong with us or the peer.
- // Check if the hash is on our own chain
- head := peer.Head()
- if pm.chainman.HasBlock(head) {
- glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known")
- return
- }
- // Get the hashes from the peer (synchronously)
- glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head)
-
- err := pm.downloader.Synchronise(peer.id, head)
- switch err {
- case nil:
- glog.V(logger.Detail).Infof("Synchronisation completed")
-
- case downloader.ErrBusy:
- glog.V(logger.Detail).Infof("Synchronisation already in progress")
-
- case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
- glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
- pm.removePeer(peer.id)
-
- case downloader.ErrPendingQueue:
- glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
-
- default:
- glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
- }
+ // Otherwise try to sync with the downloader
+ pm.downloader.Synchronise(peer.id, peer.Head())
}