aboutsummaryrefslogtreecommitdiffstats
path: root/miner
diff options
context:
space:
mode:
Diffstat (limited to 'miner')
-rw-r--r--miner/agent.go22
-rw-r--r--miner/miner.go35
-rw-r--r--miner/remote_agent.go4
-rw-r--r--miner/worker.go231
4 files changed, 175 insertions, 117 deletions
diff --git a/miner/agent.go b/miner/agent.go
index b2f89aaab..939f63fef 100644
--- a/miner/agent.go
+++ b/miner/agent.go
@@ -10,7 +10,7 @@ import (
"github.com/ethereum/go-ethereum/pow"
)
-type CpuMiner struct {
+type CpuAgent struct {
chMu sync.Mutex
c chan *types.Block
quit chan struct{}
@@ -21,8 +21,8 @@ type CpuMiner struct {
pow pow.PoW
}
-func NewCpuMiner(index int, pow pow.PoW) *CpuMiner {
- miner := &CpuMiner{
+func NewCpuAgent(index int, pow pow.PoW) *CpuAgent {
+ miner := &CpuAgent{
pow: pow,
index: index,
}
@@ -30,16 +30,16 @@ func NewCpuMiner(index int, pow pow.PoW) *CpuMiner {
return miner
}
-func (self *CpuMiner) Work() chan<- *types.Block { return self.c }
-func (self *CpuMiner) Pow() pow.PoW { return self.pow }
-func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch }
+func (self *CpuAgent) Work() chan<- *types.Block { return self.c }
+func (self *CpuAgent) Pow() pow.PoW { return self.pow }
+func (self *CpuAgent) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch }
-func (self *CpuMiner) Stop() {
+func (self *CpuAgent) Stop() {
close(self.quit)
close(self.quitCurrentOp)
}
-func (self *CpuMiner) Start() {
+func (self *CpuAgent) Start() {
self.quit = make(chan struct{})
self.quitCurrentOp = make(chan struct{}, 1)
self.c = make(chan *types.Block, 1)
@@ -47,7 +47,7 @@ func (self *CpuMiner) Start() {
go self.update()
}
-func (self *CpuMiner) update() {
+func (self *CpuAgent) update() {
out:
for {
select {
@@ -76,7 +76,7 @@ done:
}
}
-func (self *CpuMiner) mine(block *types.Block) {
+func (self *CpuAgent) mine(block *types.Block) {
glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index)
// Reset the channel
@@ -95,6 +95,6 @@ func (self *CpuMiner) mine(block *types.Block) {
}
}
-func (self *CpuMiner) GetHashRate() int64 {
+func (self *CpuAgent) GetHashRate() int64 {
return self.pow.GetHashrate()
}
diff --git a/miner/miner.go b/miner/miner.go
index d5ea9a146..09342e250 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -7,6 +7,8 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/pow"
)
@@ -21,16 +23,8 @@ type Miner struct {
pow pow.PoW
}
-func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner {
- // note: minerThreads is currently ignored because
- // ethash is not thread safe.
- miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)}
- for i := 0; i < minerThreads; i++ {
- miner.worker.register(NewCpuMiner(i, pow))
- }
- miner.threads = minerThreads
-
- return miner
+func New(eth core.Backend, pow pow.PoW) *Miner {
+ return &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)}
}
func (self *Miner) Mining() bool {
@@ -46,13 +40,27 @@ func (m *Miner) SetGasPrice(price *big.Int) {
m.worker.gasPrice = price
}
-func (self *Miner) Start(coinbase common.Address) {
+func (self *Miner) Start(coinbase common.Address, threads int) {
+
self.mining = true
+
+ for i := 0; i < threads; i++ {
+ self.worker.register(NewCpuAgent(i, self.pow))
+ }
+ self.threads = threads
+
+ glog.V(logger.Info).Infof("Starting mining operation (CPU=%d TOT=%d)\n", threads, len(self.worker.agents))
+
self.worker.coinbase = coinbase
self.worker.start()
self.worker.commitNewWork()
}
+func (self *Miner) Stop() {
+ self.worker.stop()
+ self.mining = false
+}
+
func (self *Miner) Register(agent Agent) {
if self.mining {
agent.Start()
@@ -61,11 +69,6 @@ func (self *Miner) Register(agent Agent) {
self.worker.register(agent)
}
-func (self *Miner) Stop() {
- self.mining = false
- self.worker.stop()
-}
-
func (self *Miner) HashRate() int64 {
return self.worker.HashRate()
}
diff --git a/miner/remote_agent.go b/miner/remote_agent.go
index 87456cfec..80cc9053e 100644
--- a/miner/remote_agent.go
+++ b/miner/remote_agent.go
@@ -64,13 +64,13 @@ func (a *RemoteAgent) GetWork() [3]string {
res[0] = a.work.HashNoNonce().Hex()
seedHash, _ := ethash.GetSeedHash(a.currentWork.NumberU64())
- res[1] = common.Bytes2Hex(seedHash)
+ res[1] = common.BytesToHash(seedHash).Hex()
// Calculate the "target" to be returned to the external miner
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, a.work.Difficulty())
n.Lsh(n, 1)
- res[2] = common.Bytes2Hex(n.Bytes())
+ res[2] = common.BytesToHash(n.Bytes()).Hex()
}
return res
diff --git a/miner/worker.go b/miner/worker.go
index 22493c235..f737be507 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"
+ "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@@ -20,15 +21,41 @@ import (
var jsonlogger = logger.NewJsonLogger()
+// Work holds the current work
+type Work struct {
+ Number uint64
+ Nonce uint64
+ MixDigest []byte
+ SeedHash []byte
+}
+
+// Agent can register themself with the worker
+type Agent interface {
+ Work() chan<- *types.Block
+ SetReturnCh(chan<- *types.Block)
+ Stop()
+ Start()
+ GetHashRate() int64
+}
+
+// environment is the workers current environment and holds
+// all of the current state information
type environment struct {
- totalUsedGas *big.Int
- state *state.StateDB
- coinbase *state.StateObject
- block *types.Block
- family *set.Set
- uncles *set.Set
+ totalUsedGas *big.Int // total gas usage in the cycle
+ state *state.StateDB // apply state changes here
+ coinbase *state.StateObject // the miner's account
+ block *types.Block // the new block
+ family *set.Set // family set (used for checking uncles)
+ uncles *set.Set // uncle set
+ remove *set.Set // tx which will be removed
+ tcount int // tx count in cycle
+ ignoredTransactors *set.Set
+ lowGasTransactors *set.Set
+ ownedAccounts *set.Set
+ lowGasTxs types.Transactions
}
+// env returns a new environment for the current cycle
func env(block *types.Block, eth core.Backend) *environment {
state := state.New(block.Root(), eth.StateDb())
env := &environment{
@@ -43,21 +70,7 @@ func env(block *types.Block, eth core.Backend) *environment {
return env
}
-type Work struct {
- Number uint64
- Nonce uint64
- MixDigest []byte
- SeedHash []byte
-}
-
-type Agent interface {
- Work() chan<- *types.Block
- SetReturnCh(chan<- *types.Block)
- Stop()
- Start()
- GetHashRate() int64
-}
-
+// worker is the main object which takes care of applying messages to the new state
type worker struct {
mu sync.Mutex
@@ -128,12 +141,12 @@ func (self *worker) start() {
self.mu.Lock()
defer self.mu.Unlock()
+ atomic.StoreInt32(&self.mining, 1)
+
// spin up agents
for _, agent := range self.agents {
agent.Start()
}
-
- atomic.StoreInt32(&self.mining, 1)
}
func (self *worker) stop() {
@@ -141,10 +154,16 @@ func (self *worker) stop() {
defer self.mu.Unlock()
if atomic.LoadInt32(&self.mining) == 1 {
+ var keep []Agent
// stop all agents
for _, agent := range self.agents {
agent.Stop()
+ // keep all that's not a cpu agent
+ if _, ok := agent.(*CpuAgent); !ok {
+ keep = append(keep, agent)
+ }
}
+ self.agents = keep
}
atomic.StoreInt32(&self.mining, 0)
@@ -174,8 +193,11 @@ out:
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
case core.TxPreEvent:
+ // Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
- self.commitNewWork()
+ self.mu.Lock()
+ self.commitTransactions(types.Transactions{ev.Tx})
+ self.mu.Unlock()
}
}
case <-self.quit:
@@ -241,19 +263,33 @@ func (self *worker) makeCurrent() {
}
block.Header().Extra = self.extra
- self.current = env(block, self.eth)
+ current := env(block, self.eth)
for _, ancestor := range self.chain.GetAncestors(block, 7) {
- self.current.family.Add(ancestor.Hash())
+ current.family.Add(ancestor.Hash())
}
+ accounts, _ := self.eth.AccountManager().Accounts()
+ // Keep track of transactions which return errors so they can be removed
+ current.remove = set.New()
+ current.tcount = 0
+ current.ignoredTransactors = set.New()
+ current.lowGasTransactors = set.New()
+ current.ownedAccounts = accountAddressesSet(accounts)
+
+ parent := self.chain.GetBlock(current.block.ParentHash())
+ current.coinbase.SetGasPool(core.CalcGasLimit(parent))
- parent := self.chain.GetBlock(self.current.block.ParentHash())
- self.current.coinbase.SetGasPool(core.CalcGasLimit(parent))
+ self.current = current
}
func (w *worker) setGasPrice(p *big.Int) {
w.mu.Lock()
defer w.mu.Unlock()
- w.gasPrice = p
+
+ // calculate the minimal gas price the miner accepts when sorting out transactions.
+ const pct = int64(90)
+ w.gasPrice = gasprice(p, pct)
+
+ w.mux.Post(core.GasPriceChanged{w.gasPrice})
}
func (self *worker) commitNewWork() {
@@ -265,68 +301,14 @@ func (self *worker) commitNewWork() {
defer self.currentMu.Unlock()
self.makeCurrent()
+ current := self.current
transactions := self.eth.TxPool().GetTransactions()
sort.Sort(types.TxByNonce{transactions})
- // Keep track of transactions which return errors so they can be removed
- var (
- remove = set.New()
- tcount = 0
- ignoredTransactors = set.New()
- )
-
- const pct = int64(90)
- // calculate the minimal gas price the miner accepts when sorting out transactions.
- minprice := gasprice(self.gasPrice, pct)
- for _, tx := range transactions {
- // We can skip err. It has already been validated in the tx pool
- from, _ := tx.From()
-
- // check if it falls within margin
- if tx.GasPrice().Cmp(minprice) < 0 {
- // ignore the transaction and transactor. We ignore the transactor
- // because nonce will fail after ignoring this transaction so there's
- // no point
- ignoredTransactors.Add(from)
- glog.V(logger.Info).Infof("transaction(%x) below gas price (<%d%% ask price). All sequential txs from this address(%x) will fail\n", tx.Hash().Bytes()[:4], pct, from[:4])
- continue
- }
-
- // Move on to the next transaction when the transactor is in ignored transactions set
- // This may occur when a transaction hits the gas limit. When a gas limit is hit and
- // the transaction is processed (that could potentially be included in the block) it
- // will throw a nonce error because the previous transaction hasn't been processed.
- // Therefor we need to ignore any transaction after the ignored one.
- if ignoredTransactors.Has(from) {
- continue
- }
-
- self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
-
- err := self.commitTransaction(tx)
- switch {
- case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
- // Remove invalid transactions
- from, _ := tx.From()
-
- self.chain.TxState().RemoveNonce(from, tx.Nonce())
- remove.Add(tx.Hash())
-
- if glog.V(logger.Detail) {
- glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
- }
- case state.IsGasLimitErr(err):
- from, _ := tx.From()
- // ignore the transactor so no nonce errors will be thrown for this account
- // next time the worker is run, they'll be picked up again.
- ignoredTransactors.Add(from)
-
- glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
- default:
- tcount++
- }
- }
+ // commit transactions for this run
+ self.commitTransactions(transactions)
+ self.eth.TxPool().RemoveTransactions(current.lowGasTxs)
var (
uncles []*types.Header
@@ -352,7 +334,7 @@ func (self *worker) commitNewWork() {
// We only care about logging if we're actually mining
if atomic.LoadInt32(&self.mining) == 1 {
- glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles))
+ glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles))
}
for _, hash := range badUncles {
@@ -392,6 +374,71 @@ func (self *worker) commitUncle(uncle *types.Header) error {
return nil
}
+func (self *worker) commitTransactions(transactions types.Transactions) {
+ current := self.current
+
+ for _, tx := range transactions {
+ // We can skip err. It has already been validated in the tx pool
+ from, _ := tx.From()
+
+ // Check if it falls within margin. Txs from owned accounts are always processed.
+ if tx.GasPrice().Cmp(self.gasPrice) < 0 && !current.ownedAccounts.Has(from) {
+ // ignore the transaction and transactor. We ignore the transactor
+ // because nonce will fail after ignoring this transaction so there's
+ // no point
+ current.lowGasTransactors.Add(from)
+
+ glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4])
+ }
+
+ // Continue with the next transaction if the transaction sender is included in
+ // the low gas tx set. This will also remove the tx and all sequential transaction
+ // from this transactor
+ if current.lowGasTransactors.Has(from) {
+ // add tx to the low gas set. This will be removed at the end of the run
+ // owned accounts are ignored
+ if !current.ownedAccounts.Has(from) {
+ current.lowGasTxs = append(current.lowGasTxs, tx)
+ }
+ continue
+ }
+
+ // Move on to the next transaction when the transactor is in ignored transactions set
+ // This may occur when a transaction hits the gas limit. When a gas limit is hit and
+ // the transaction is processed (that could potentially be included in the block) it
+ // will throw a nonce error because the previous transaction hasn't been processed.
+ // Therefor we need to ignore any transaction after the ignored one.
+ if current.ignoredTransactors.Has(from) {
+ continue
+ }
+
+ self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
+
+ err := self.commitTransaction(tx)
+ switch {
+ case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
+ // Remove invalid transactions
+ from, _ := tx.From()
+
+ self.chain.TxState().RemoveNonce(from, tx.Nonce())
+ current.remove.Add(tx.Hash())
+
+ if glog.V(logger.Detail) {
+ glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
+ }
+ case state.IsGasLimitErr(err):
+ from, _ := tx.From()
+ // ignore the transactor so no nonce errors will be thrown for this account
+ // next time the worker is run, they'll be picked up again.
+ current.ignoredTransactors.Add(from)
+
+ glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
+ default:
+ current.tcount++
+ }
+ }
+}
+
func (self *worker) commitTransaction(tx *types.Transaction) error {
snap := self.current.state.Copy()
receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
@@ -423,3 +470,11 @@ func gasprice(price *big.Int, pct int64) *big.Int {
p.Mul(p, big.NewInt(pct))
return p
}
+
+func accountAddressesSet(accounts []accounts.Account) *set.Set {
+ accountSet := set.New()
+ for _, account := range accounts {
+ accountSet.Add(account.Address)
+ }
+ return accountSet
+}