aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/backend.go29
-rw-r--r--eth/downloader/downloader.go70
-rw-r--r--eth/protocol.go12
3 files changed, 79 insertions, 32 deletions
diff --git a/eth/backend.go b/eth/backend.go
index a71d5721e..3d5c4ba09 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -4,6 +4,7 @@ import (
"crypto/ecdsa"
"fmt"
"io/ioutil"
+ "math"
"path"
"strings"
@@ -43,6 +44,9 @@ type Config struct {
ProtocolVersion int
NetworkId int
+ BlockChainVersion int
+ SkipBcVersionCheck bool // e.g. blockchain export
+
DataDir string
LogFile string
LogLevel int
@@ -151,7 +155,7 @@ type Ethereum struct {
}
func New(config *Config) (*Ethereum, error) {
- // Boostrap database
+ // Bootstrap database
logger.New(config.DataDir, config.LogFile, config.LogLevel)
if len(config.LogJSON) > 0 {
logger.NewJSONsystem(config.DataDir, config.LogJSON)
@@ -181,6 +185,16 @@ func New(config *Config) (*Ethereum, error) {
saveProtocolVersion(blockDb, config.ProtocolVersion)
glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId)
+ if !config.SkipBcVersionCheck {
+ b, _ := blockDb.Get([]byte("BlockchainVersion"))
+ bcVersion := int(common.NewValue(b).Uint())
+ if bcVersion != config.BlockChainVersion && bcVersion != 0 {
+ return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, config.BlockChainVersion)
+ }
+ saveBlockchainVersion(blockDb, config.BlockChainVersion)
+ }
+ glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
+
eth := &Ethereum{
shutdownChan: make(chan bool),
blockDb: blockDb,
@@ -439,7 +453,7 @@ func (self *Ethereum) txBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.txSub.Chan() {
event := obj.(core.TxPreEvent)
- self.net.Broadcast("eth", TxMsg, []*types.Transaction{event.Tx})
+ self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx})
self.syncAccounts(event.Tx)
}
}
@@ -463,7 +477,7 @@ func (self *Ethereum) blockBroadcastLoop() {
for obj := range self.blockSub.Chan() {
switch ev := obj.(type) {
case core.ChainHeadEvent:
- self.net.Broadcast("eth", NewBlockMsg, []interface{}{ev.Block, ev.Block.Td})
+ self.net.BroadcastLimited("eth", NewBlockMsg, math.Sqrt, []interface{}{ev.Block, ev.Block.Td})
}
}
}
@@ -476,3 +490,12 @@ func saveProtocolVersion(db common.Database, protov int) {
db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes())
}
}
+
+func saveBlockchainVersion(db common.Database, bcVersion int) {
+ d, _ := db.Get([]byte("BlockchainVersion"))
+ blockchainVersion := common.NewValue(d).Uint()
+
+ if blockchainVersion == 0 {
+ db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes())
+ }
+}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 83e6b8d32..1707e1395 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -54,8 +54,9 @@ type blockPack struct {
}
type syncPack struct {
- peer *peer
- hash common.Hash
+ peer *peer
+ hash common.Hash
+ ignoreInitial bool
}
func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader {
@@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) {
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTicker(5 * time.Second)
+ //itimer := time.NewTicker(5 * time.Second)
+ itimer := time.NewTimer(5 * time.Second)
out:
for {
select {
case <-d.newPeerCh:
+ itimer.Stop()
// Meet the `minDesiredPeerCount` before we select our best peer
if len(d.peers) < minDesiredPeerCount {
break
@@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) {
}
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
- d.syncCh <- syncPack{p, p.recentHash}
+ d.syncCh <- syncPack{p, p.recentHash, false}
}
}
@@ -147,11 +150,11 @@ out:
select {
case sync := <-d.syncCh:
selectedPeer := sync.peer
- glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id)
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id)
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
- if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil {
+ if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
// handle error
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
// XXX Reset
@@ -178,11 +181,18 @@ out:
}
// XXX Make synchronous
-func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error {
- glog.V(logger.Debug).Infoln("Downloading hashes")
+func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
+ glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
start := time.Now()
+ // We ignore the initial hash in some cases (e.g. we received a block without it's parent)
+ // In such circumstances we don't need to download the block so don't add it to the queue.
+ if !ignoreInitial {
+ // Add the hash to the queue first
+ d.queue.hashPool.Add(hash)
+ }
+
// Get the first batch of hashes
p.getHashes(hash)
atomic.StoreInt32(&d.fetchingHashes, 1)
@@ -195,7 +205,7 @@ out:
hashSet := set.New()
for _, hash := range hashes {
if d.hasBlock(hash) {
- glog.V(logger.Debug).Infof("Found common hash %x\n", hash)
+ glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
done = true
break
@@ -207,7 +217,7 @@ out:
// Add hashes to the chunk set
// Check if we're done fetching
- if !done {
+ if !done && len(hashes) > 0 {
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes
p.getHashes(hashes[len(hashes)-1])
@@ -218,7 +228,7 @@ out:
}
}
}
- glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start))
+ glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil
}
@@ -242,6 +252,10 @@ out:
// from the available peers.
if d.queue.hashPool.Size() > 0 {
availablePeers := d.peers.get(idleState)
+ if len(availablePeers) == 0 {
+ glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
+ }
+
for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
@@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
return
}
- glog.V(logger.Detail).Infoln("Inserting new block from:", id)
- d.queue.addBlock(id, block, td)
-
+ peer := d.peers.getPeer(id)
// if the peer is in our healthy list of peers; update the td
- // here is a good chance to add the peer back to the list
- if peer := d.peers.getPeer(id); peer != nil {
- peer.mu.Lock()
- peer.td = td
- peer.recentHash = block.Hash()
- peer.mu.Unlock()
+ // and add the block. Otherwise just ignore it
+ if peer == nil {
+ glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
+ return
}
+ peer.mu.Lock()
+ peer.td = td
+ peer.recentHash = block.Hash()
+ peer.mu.Unlock()
+
+ glog.V(logger.Detail).Infoln("Inserting new block from:", id)
+ d.queue.addBlock(id, block, td)
+
// if neither go ahead to process
if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
- d.process()
+ // Check if the parent of the received block is known.
+ // If the block is not know, request it otherwise, request.
+ phash := block.ParentHash()
+ if !d.hasBlock(phash) {
+ glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
+ d.syncCh <- syncPack{peer, peer.recentHash, true}
+ } else {
+ d.process()
+ }
}
}
@@ -369,7 +395,7 @@ func (d *Downloader) process() error {
// TODO change this. This shite
for i, block := range blocks[:max] {
if !d.hasBlock(block.ParentHash()) {
- d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()}
+ d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
// remove processed blocks
blocks = blocks[i:]
diff --git a/eth/protocol.go b/eth/protocol.go
index b15868898..a85d15a0c 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -324,7 +324,7 @@ func (self *ethProtocol) handle() error {
// to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
- if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best {
+ if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended {
self.blockPool.AddBlock(request.Block, self.id)
}
@@ -415,11 +415,9 @@ func (self *ethProtocol) sendStatus() error {
}
func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
- //err.Log(self.peer.Logger)
err.Log(glog.V(logger.Info))
- /*
- if err.Fatal() {
- self.peer.Disconnect(p2p.DiscSubprotocolError)
- }
- */
+ if err.Fatal() {
+ self.peer.Disconnect(p2p.DiscSubprotocolError)
+ }
+
}