From 843db4978e876674ca111706880a58c84202880d Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 16 Mar 2015 23:10:26 +0100 Subject: updated blockpool --- blockpool/peers.go | 49 ++++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 25 deletions(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index 5f4889792..d94d6ac46 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -1,16 +1,15 @@ package blockpool import ( - "bytes" "math/big" "math/rand" "sort" "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" - "github.com/ethereum/go-ethereum/common" ) type peer struct { @@ -18,20 +17,20 @@ type peer struct { // last known blockchain status td *big.Int - currentBlockHash []byte + currentBlockHash common.Hash currentBlock *types.Block - parentHash []byte + parentHash common.Hash headSection *section id string // peer callbacks - requestBlockHashes func([]byte) error - requestBlocks func([][]byte) error + requestBlockHashes func(common.Hash) error + requestBlocks func([]common.Hash) error peerError func(*errs.Error) errors *errs.Errors - sections [][]byte + sections []common.Hash // channels to push new head block and head section for peer a currentBlockC chan *types.Block @@ -66,10 +65,10 @@ type peers struct { // peer constructor func (self *peers) newPeer( td *big.Int, - currentBlockHash []byte, + currentBlockHash common.Hash, id string, - requestBlockHashes func([]byte) error, - requestBlocks func([][]byte) error, + requestBlockHashes func(common.Hash) error, + requestBlocks func([]common.Hash) error, peerError func(*errs.Error), ) (p *peer) { @@ -107,7 +106,7 @@ func (self *peer) addError(code int, format string, params ...interface{}) { self.peerError(err) } -func (self *peer) setChainInfo(td *big.Int, c []byte) { +func (self *peer) setChainInfo(td *big.Int, c common.Hash) { self.lock.Lock() defer self.lock.Unlock() @@ -115,7 +114,7 @@ func (self *peer) setChainInfo(td *big.Int, c []byte) { self.currentBlockHash = c self.currentBlock = nil - self.parentHash = nil + self.parentHash = common.Hash{} self.headSection = nil } @@ -139,7 +138,7 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) { }() } -func (self *peers) requestBlocks(attempts int, hashes [][]byte) { +func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { // distribute block request among known peers self.lock.RLock() defer self.lock.RUnlock() @@ -178,18 +177,18 @@ func (self *peers) requestBlocks(attempts int, hashes [][]byte) { // returns true iff peer is promoted as best peer in the pool func (self *peers) addPeer( td *big.Int, - currentBlockHash []byte, + currentBlockHash common.Hash, id string, - requestBlockHashes func([]byte) error, - requestBlocks func([][]byte) error, + requestBlockHashes func(common.Hash) error, + requestBlocks func([]common.Hash) error, peerError func(*errs.Error), ) (best bool) { - var previousBlockHash []byte + var previousBlockHash common.Hash self.lock.Lock() p, found := self.peers[id] if found { - if !bytes.Equal(p.currentBlockHash, currentBlockHash) { + if p.currentBlockHash != currentBlockHash { previousBlockHash = p.currentBlockHash plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash)) p.setChainInfo(td, currentBlockHash) @@ -221,7 +220,7 @@ func (self *peers) addPeer( // new block update for active current best peer -> request hashes plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash)) - if previousBlockHash != nil { + if (previousBlockHash != common.Hash{}) { if entry := self.bp.get(previousBlockHash); entry != nil { p.headSectionC <- nil self.bp.activateChain(entry.section, p, nil) @@ -318,15 +317,15 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { } var connected = make(map[string]*section) - var sections [][]byte + var sections []common.Hash for _, hash := range newp.sections { plog.DebugDetailf("activate chain starting from section [%s]", hex(hash)) // if section not connected (ie, top of a contiguous sequence of sections) - if connected[string(hash)] == nil { + if connected[hash.Str()] == nil { // if not deleted, then reread from pool (it can be orphaned top half of a split section) if entry := self.get(hash); entry != nil { self.activateChain(entry.section, newp, connected) - connected[string(hash)] = entry.section + connected[hash.Str()] = entry.section sections = append(sections, hash) } } @@ -396,7 +395,7 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) { plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash)) } else { plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash)) - self.requestBlocks([][]byte{self.currentBlockHash}) + self.requestBlocks([]common.Hash{self.currentBlockHash}) self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval) return } @@ -427,9 +426,9 @@ func (self *peer) getBlockHashes() { self.addError(ErrInvalidBlock, "%v", err) self.bp.status.badPeers[self.id]++ } else { - headKey := string(self.parentHash) + headKey := self.parentHash.Str() height := self.bp.status.chain[headKey] + 1 - self.bp.status.chain[string(self.currentBlockHash)] = height + self.bp.status.chain[self.currentBlockHash.Str()] = height if height > self.bp.status.values.LongestChain { self.bp.status.values.LongestChain = height } -- cgit From 50661f0e683b4975894a0e8fe16024724adef72d Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 22:46:54 +0000 Subject: peer suspension to disallow reconnect after disconnect on fatal error for set period (PeerSuspensionInterval) --- blockpool/peers.go | 49 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 10 deletions(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index d94d6ac46..81bab31e7 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -47,6 +47,8 @@ type peer struct { blocksRequestTimer <-chan time.Time suicideC <-chan time.Time + addToBlacklist func(id string) + idle bool } @@ -55,11 +57,12 @@ type peer struct { type peers struct { lock sync.RWMutex - bp *BlockPool - errors *errs.Errors - peers map[string]*peer - best *peer - status *status + bp *BlockPool + errors *errs.Errors + peers map[string]*peer + best *peer + status *status + blacklist map[string]time.Time } // peer constructor @@ -84,26 +87,46 @@ func (self *peers) newPeer( headSectionC: make(chan *section), bp: self.bp, idle: true, + addToBlacklist: self.addToBlacklist, } // at creation the peer is recorded in the peer pool self.peers[id] = p return } -// dispatches an error to a peer if still connected +// dispatches an error to a peer if still connected, adds it to the blacklist func (self *peers) peerError(id string, code int, format string, params ...interface{}) { self.lock.RLock() - defer self.lock.RUnlock() peer, ok := self.peers[id] + self.lock.RUnlock() if ok { peer.addError(code, format, params) } - // blacklisting comes here + self.addToBlacklist(id) +} + +func (self *peers) addToBlacklist(id string) { + self.lock.Lock() + defer self.lock.Unlock() + self.blacklist[id] = time.Now() +} + +func (self *peers) suspended(id string) (s bool) { + self.lock.Lock() + defer self.lock.Unlock() + if suspendedAt, ok := self.blacklist[id]; ok { + if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s { + // no longer suspended, delete entry + delete(self.blacklist, id) + } + } + return } func (self *peer) addError(code int, format string, params ...interface{}) { err := self.errors.New(code, format, params...) self.peerError(err) + self.addToBlacklist(self.id) } func (self *peer) setChainInfo(td *big.Int, c common.Hash) { @@ -182,9 +205,13 @@ func (self *peers) addPeer( requestBlockHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error), -) (best bool) { +) (best bool, suspended bool) { var previousBlockHash common.Hash + if self.suspended(id) { + suspended = true + return + } self.lock.Lock() p, found := self.peers[id] if found { @@ -213,7 +240,7 @@ func (self *peers) addPeer( if self.bp.hasBlock(currentBlockHash) { // peer not ahead plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) - return false + return false, false } if self.best == p { @@ -248,8 +275,10 @@ func (self *peers) addPeer( // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects func (self *peers) removePeer(id string) { + plog.Debugf("addPeer: remove peer 0 <%v>", id) self.lock.Lock() defer self.lock.Unlock() + plog.Debugf("addPeer: remove peer 1 <%v>", id) p, found := self.peers[id] if !found { -- cgit From 391e89d70a43b4a2153db8acac9a6af7a4f76adf Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 22:53:15 +0000 Subject: use own total difficulty to limit best peer - update blockpool td by subscribing to ChainHeadEvent - if ahead of best peer, demote it - addPeer now take own td as current td - removePeer now take own td as current td - add relevant tests to peers_test - eth: backend now calls blockpool with eth.eventMux and chainManager.Td --- blockpool/peers.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index 81bab31e7..41782983c 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" ) @@ -256,7 +255,7 @@ func (self *peers) addPeer( } best = true } else { - currentTD := common.Big0 + currentTD := self.bp.getTD() if self.best != nil { currentTD = self.best.td } @@ -264,7 +263,7 @@ func (self *peers) addPeer( self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> promoted best peer", id) + plog.Debugf("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD) self.bp.switchPeer(self.best, p) self.best = p best = true @@ -275,10 +274,8 @@ func (self *peers) addPeer( // removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects func (self *peers) removePeer(id string) { - plog.Debugf("addPeer: remove peer 0 <%v>", id) self.lock.Lock() defer self.lock.Unlock() - plog.Debugf("addPeer: remove peer 1 <%v>", id) p, found := self.peers[id] if !found { @@ -286,13 +283,13 @@ func (self *peers) removePeer(id string) { } delete(self.peers, id) - plog.Debugf("addPeer: remove peer <%v>", id) + plog.Debugf("addPeer: remove peer <%v> (td: %v)", id, p.td) // if current best peer is removed, need to find a better one if self.best == p { var newp *peer - // FIXME: own TD - max := common.Big0 + // only peers that are ahead of us are considered + max := self.bp.getTD() // peer with the highest self-acclaimed TD is chosen for _, pp := range self.peers { if pp.td.Cmp(max) > 0 { @@ -304,7 +301,7 @@ func (self *peers) removePeer(id string) { self.status.lock.Lock() self.status.bestPeers[p.id]++ self.status.lock.Unlock() - plog.Debugf("addPeer: peer <%v> with td %v promoted best peer", newp.id, newp.td) + plog.Debugf("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td) } else { plog.Warnln("addPeer: no suitable peers found") } -- cgit From a9926a289dd21bcfd8e2def8f4005b43b728cb3d Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 02:00:34 +0530 Subject: fix missing hexification on IdleTooLong error log --- blockpool/peers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index 41782983c..b463137e3 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -564,7 +564,7 @@ LOOP: // quit case <-quit: - self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, self.currentBlockHash)) + self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash))) self.bp.status.lock.Lock() self.bp.status.badPeers[self.id]++ -- cgit From 137a9c9365dd9ec76d4a4aab7475d716457d00ae Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 23:00:19 +0000 Subject: check and penalise td misreporting - add ErrIncorrectTD - checkTD called after insertChain successful - fix tests, use blockPoolTester.tds to map block index to TD --- blockpool/peers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index b463137e3..5cc483a3b 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -452,8 +452,12 @@ func (self *peer) getBlockHashes() { self.addError(ErrInvalidBlock, "%v", err) self.bp.status.badPeers[self.id]++ } else { + if self.currentBlock.Td != nil { + if self.td.Cmp(self.currentBlock.Td) != 0 { + self.addError(ErrIncorrectTD, "on block %x", self.currentBlockHash) + } + } headKey := self.parentHash.Str() - height := self.bp.status.chain[headKey] + 1 self.bp.status.chain[self.currentBlockHash.Str()] = height if height > self.bp.status.values.LongestChain { self.bp.status.values.LongestChain = height @@ -471,6 +475,7 @@ func (self *peer) getBlockHashes() { block: self.currentBlock, hashBy: self.id, blockBy: self.id, + td: self.td, } self.bp.newSection([]*node{n}).activate(self) } else { -- cgit From a578db5dae0ae82ac8e06be8a29c48a7db22ebe0 Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 23:14:08 +0000 Subject: improve documentation and move one test --- blockpool/peers.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index 5cc483a3b..1ace01fdf 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/errs" ) +// the blockpool's model of a peer type peer struct { lock sync.RWMutex @@ -104,12 +105,14 @@ func (self *peers) peerError(id string, code int, format string, params ...inter self.addToBlacklist(id) } +// record time of offence in blacklist to implement suspension for PeerSuspensionInterval func (self *peers) addToBlacklist(id string) { self.lock.Lock() defer self.lock.Unlock() self.blacklist[id] = time.Now() } +// suspended checks if peer is still suspended func (self *peers) suspended(id string) (s bool) { self.lock.Lock() defer self.lock.Unlock() @@ -160,8 +163,8 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) { }() } +// distribute block request among known peers func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { - // distribute block request among known peers self.lock.RLock() defer self.lock.RUnlock() peerCount := len(self.peers) @@ -196,7 +199,9 @@ func (self *peers) requestBlocks(attempts int, hashes []common.Hash) { } // addPeer implements the logic for blockpool.AddPeer -// returns true iff peer is promoted as best peer in the pool +// returns 2 bool values +// 1. true iff peer is promoted as best peer in the pool +// 2. true iff peer is still suspended func (self *peers) addPeer( td *big.Int, currentBlockHash common.Hash, @@ -214,10 +219,13 @@ func (self *peers) addPeer( self.lock.Lock() p, found := self.peers[id] if found { + // when called on an already connected peer, it means a newBlockMsg is received + // peer head info is updated if p.currentBlockHash != currentBlockHash { previousBlockHash = p.currentBlockHash plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash)) p.setChainInfo(td, currentBlockHash) + self.status.lock.Lock() self.status.values.NewBlocks++ self.status.lock.Unlock() @@ -235,7 +243,7 @@ func (self *peers) addPeer( } self.lock.Unlock() - // check peer current head + // check if peer's current head block is known if self.bp.hasBlock(currentBlockHash) { // peer not ahead plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash)) @@ -255,6 +263,7 @@ func (self *peers) addPeer( } best = true } else { + // baseline is our own TD currentTD := self.bp.getTD() if self.best != nil { currentTD = self.best.td @@ -314,6 +323,7 @@ func (self *peers) removePeer(id string) { func (self *BlockPool) switchPeer(oldp, newp *peer) { // first quit AddBlockHashes, requestHeadSection and activateChain + // by closing the old peer's switchC channel if oldp != nil { plog.DebugDetailf("<%s> quit peer processes", oldp.id) close(oldp.switchC) @@ -366,11 +376,12 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) { // newp activating section process changes the quit channel for this reason if oldp != nil { plog.DebugDetailf("<%s> quit section processes", oldp.id) - // close(oldp.idleC) } } +// getPeer looks up peer by id, returns peer and a bool value +// that is true iff peer is current best peer func (self *peers) getPeer(id string) (p *peer, best bool) { self.lock.RLock() defer self.lock.RUnlock() @@ -381,6 +392,8 @@ func (self *peers) getPeer(id string) (p *peer, best bool) { return } +// head section process + func (self *peer) handleSection(sec *section) { self.lock.Lock() defer self.lock.Unlock() @@ -516,7 +529,7 @@ func (self *peer) run() { LOOP: for { select { - // to minitor section process behaviou + // to minitor section process behaviour case <-ping.C: plog.Debugf("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle) -- cgit From d7564a9a25c06f0c9ad9440f02b09e20e0ca30bc Mon Sep 17 00:00:00 2001 From: zelig Date: Thu, 19 Mar 2015 23:33:52 +0000 Subject: fix common.Hash conversion --- blockpool/peers.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'blockpool/peers.go') diff --git a/blockpool/peers.go b/blockpool/peers.go index 1ace01fdf..80168b206 100644 --- a/blockpool/peers.go +++ b/blockpool/peers.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/errs" ) @@ -471,6 +472,7 @@ func (self *peer) getBlockHashes() { } } headKey := self.parentHash.Str() + height := self.bp.status.chain[headKey] + 1 self.bp.status.chain[self.currentBlockHash.Str()] = height if height > self.bp.status.values.LongestChain { self.bp.status.values.LongestChain = height -- cgit