diff options
Diffstat (limited to 'miner')
-rw-r--r-- | miner/agent.go | 22 | ||||
-rw-r--r-- | miner/miner.go | 35 | ||||
-rw-r--r-- | miner/remote_agent.go | 4 | ||||
-rw-r--r-- | miner/worker.go | 231 |
4 files changed, 175 insertions, 117 deletions
diff --git a/miner/agent.go b/miner/agent.go index b2f89aaab..939f63fef 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -10,7 +10,7 @@ import ( "github.com/ethereum/go-ethereum/pow" ) -type CpuMiner struct { +type CpuAgent struct { chMu sync.Mutex c chan *types.Block quit chan struct{} @@ -21,8 +21,8 @@ type CpuMiner struct { pow pow.PoW } -func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { - miner := &CpuMiner{ +func NewCpuAgent(index int, pow pow.PoW) *CpuAgent { + miner := &CpuAgent{ pow: pow, index: index, } @@ -30,16 +30,16 @@ func NewCpuMiner(index int, pow pow.PoW) *CpuMiner { return miner } -func (self *CpuMiner) Work() chan<- *types.Block { return self.c } -func (self *CpuMiner) Pow() pow.PoW { return self.pow } -func (self *CpuMiner) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } +func (self *CpuAgent) Work() chan<- *types.Block { return self.c } +func (self *CpuAgent) Pow() pow.PoW { return self.pow } +func (self *CpuAgent) SetReturnCh(ch chan<- *types.Block) { self.returnCh = ch } -func (self *CpuMiner) Stop() { +func (self *CpuAgent) Stop() { close(self.quit) close(self.quitCurrentOp) } -func (self *CpuMiner) Start() { +func (self *CpuAgent) Start() { self.quit = make(chan struct{}) self.quitCurrentOp = make(chan struct{}, 1) self.c = make(chan *types.Block, 1) @@ -47,7 +47,7 @@ func (self *CpuMiner) Start() { go self.update() } -func (self *CpuMiner) update() { +func (self *CpuAgent) update() { out: for { select { @@ -76,7 +76,7 @@ done: } } -func (self *CpuMiner) mine(block *types.Block) { +func (self *CpuAgent) mine(block *types.Block) { glog.V(logger.Debug).Infof("(re)started agent[%d]. mining...\n", self.index) // Reset the channel @@ -95,6 +95,6 @@ func (self *CpuMiner) mine(block *types.Block) { } } -func (self *CpuMiner) GetHashRate() int64 { +func (self *CpuAgent) GetHashRate() int64 { return self.pow.GetHashrate() } diff --git a/miner/miner.go b/miner/miner.go index d5ea9a146..09342e250 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -7,6 +7,8 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/pow" ) @@ -21,16 +23,8 @@ type Miner struct { pow pow.PoW } -func New(eth core.Backend, pow pow.PoW, minerThreads int) *Miner { - // note: minerThreads is currently ignored because - // ethash is not thread safe. - miner := &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} - for i := 0; i < minerThreads; i++ { - miner.worker.register(NewCpuMiner(i, pow)) - } - miner.threads = minerThreads - - return miner +func New(eth core.Backend, pow pow.PoW) *Miner { + return &Miner{eth: eth, pow: pow, worker: newWorker(common.Address{}, eth)} } func (self *Miner) Mining() bool { @@ -46,13 +40,27 @@ func (m *Miner) SetGasPrice(price *big.Int) { m.worker.gasPrice = price } -func (self *Miner) Start(coinbase common.Address) { +func (self *Miner) Start(coinbase common.Address, threads int) { + self.mining = true + + for i := 0; i < threads; i++ { + self.worker.register(NewCpuAgent(i, self.pow)) + } + self.threads = threads + + glog.V(logger.Info).Infof("Starting mining operation (CPU=%d TOT=%d)\n", threads, len(self.worker.agents)) + self.worker.coinbase = coinbase self.worker.start() self.worker.commitNewWork() } +func (self *Miner) Stop() { + self.worker.stop() + self.mining = false +} + func (self *Miner) Register(agent Agent) { if self.mining { agent.Start() @@ -61,11 +69,6 @@ func (self *Miner) Register(agent Agent) { self.worker.register(agent) } -func (self *Miner) Stop() { - self.mining = false - self.worker.stop() -} - func (self *Miner) HashRate() int64 { return self.worker.HashRate() } diff --git a/miner/remote_agent.go b/miner/remote_agent.go index 87456cfec..80cc9053e 100644 --- a/miner/remote_agent.go +++ b/miner/remote_agent.go @@ -64,13 +64,13 @@ func (a *RemoteAgent) GetWork() [3]string { res[0] = a.work.HashNoNonce().Hex() seedHash, _ := ethash.GetSeedHash(a.currentWork.NumberU64()) - res[1] = common.Bytes2Hex(seedHash) + res[1] = common.BytesToHash(seedHash).Hex() // Calculate the "target" to be returned to the external miner n := big.NewInt(1) n.Lsh(n, 255) n.Div(n, a.work.Difficulty()) n.Lsh(n, 1) - res[2] = common.Bytes2Hex(n.Bytes()) + res[2] = common.BytesToHash(n.Bytes()).Hex() } return res diff --git a/miner/worker.go b/miner/worker.go index 22493c235..f737be507 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -7,6 +7,7 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -20,15 +21,41 @@ import ( var jsonlogger = logger.NewJsonLogger() +// Work holds the current work +type Work struct { + Number uint64 + Nonce uint64 + MixDigest []byte + SeedHash []byte +} + +// Agent can register themself with the worker +type Agent interface { + Work() chan<- *types.Block + SetReturnCh(chan<- *types.Block) + Stop() + Start() + GetHashRate() int64 +} + +// environment is the workers current environment and holds +// all of the current state information type environment struct { - totalUsedGas *big.Int - state *state.StateDB - coinbase *state.StateObject - block *types.Block - family *set.Set - uncles *set.Set + totalUsedGas *big.Int // total gas usage in the cycle + state *state.StateDB // apply state changes here + coinbase *state.StateObject // the miner's account + block *types.Block // the new block + family *set.Set // family set (used for checking uncles) + uncles *set.Set // uncle set + remove *set.Set // tx which will be removed + tcount int // tx count in cycle + ignoredTransactors *set.Set + lowGasTransactors *set.Set + ownedAccounts *set.Set + lowGasTxs types.Transactions } +// env returns a new environment for the current cycle func env(block *types.Block, eth core.Backend) *environment { state := state.New(block.Root(), eth.StateDb()) env := &environment{ @@ -43,21 +70,7 @@ func env(block *types.Block, eth core.Backend) *environment { return env } -type Work struct { - Number uint64 - Nonce uint64 - MixDigest []byte - SeedHash []byte -} - -type Agent interface { - Work() chan<- *types.Block - SetReturnCh(chan<- *types.Block) - Stop() - Start() - GetHashRate() int64 -} - +// worker is the main object which takes care of applying messages to the new state type worker struct { mu sync.Mutex @@ -128,12 +141,12 @@ func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() + atomic.StoreInt32(&self.mining, 1) + // spin up agents for _, agent := range self.agents { agent.Start() } - - atomic.StoreInt32(&self.mining, 1) } func (self *worker) stop() { @@ -141,10 +154,16 @@ func (self *worker) stop() { defer self.mu.Unlock() if atomic.LoadInt32(&self.mining) == 1 { + var keep []Agent // stop all agents for _, agent := range self.agents { agent.Stop() + // keep all that's not a cpu agent + if _, ok := agent.(*CpuAgent); !ok { + keep = append(keep, agent) + } } + self.agents = keep } atomic.StoreInt32(&self.mining, 0) @@ -174,8 +193,11 @@ out: self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case core.TxPreEvent: + // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { - self.commitNewWork() + self.mu.Lock() + self.commitTransactions(types.Transactions{ev.Tx}) + self.mu.Unlock() } } case <-self.quit: @@ -241,19 +263,33 @@ func (self *worker) makeCurrent() { } block.Header().Extra = self.extra - self.current = env(block, self.eth) + current := env(block, self.eth) for _, ancestor := range self.chain.GetAncestors(block, 7) { - self.current.family.Add(ancestor.Hash()) + current.family.Add(ancestor.Hash()) } + accounts, _ := self.eth.AccountManager().Accounts() + // Keep track of transactions which return errors so they can be removed + current.remove = set.New() + current.tcount = 0 + current.ignoredTransactors = set.New() + current.lowGasTransactors = set.New() + current.ownedAccounts = accountAddressesSet(accounts) + + parent := self.chain.GetBlock(current.block.ParentHash()) + current.coinbase.SetGasPool(core.CalcGasLimit(parent)) - parent := self.chain.GetBlock(self.current.block.ParentHash()) - self.current.coinbase.SetGasPool(core.CalcGasLimit(parent)) + self.current = current } func (w *worker) setGasPrice(p *big.Int) { w.mu.Lock() defer w.mu.Unlock() - w.gasPrice = p + + // calculate the minimal gas price the miner accepts when sorting out transactions. + const pct = int64(90) + w.gasPrice = gasprice(p, pct) + + w.mux.Post(core.GasPriceChanged{w.gasPrice}) } func (self *worker) commitNewWork() { @@ -265,68 +301,14 @@ func (self *worker) commitNewWork() { defer self.currentMu.Unlock() self.makeCurrent() + current := self.current transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) - // Keep track of transactions which return errors so they can be removed - var ( - remove = set.New() - tcount = 0 - ignoredTransactors = set.New() - ) - - const pct = int64(90) - // calculate the minimal gas price the miner accepts when sorting out transactions. - minprice := gasprice(self.gasPrice, pct) - for _, tx := range transactions { - // We can skip err. It has already been validated in the tx pool - from, _ := tx.From() - - // check if it falls within margin - if tx.GasPrice().Cmp(minprice) < 0 { - // ignore the transaction and transactor. We ignore the transactor - // because nonce will fail after ignoring this transaction so there's - // no point - ignoredTransactors.Add(from) - glog.V(logger.Info).Infof("transaction(%x) below gas price (<%d%% ask price). All sequential txs from this address(%x) will fail\n", tx.Hash().Bytes()[:4], pct, from[:4]) - continue - } - - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if ignoredTransactors.Has(from) { - continue - } - - self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) - - err := self.commitTransaction(tx) - switch { - case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) - remove.Add(tx.Hash()) - - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } - case state.IsGasLimitErr(err): - from, _ := tx.From() - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - ignoredTransactors.Add(from) - - glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) - default: - tcount++ - } - } + // commit transactions for this run + self.commitTransactions(transactions) + self.eth.TxPool().RemoveTransactions(current.lowGasTxs) var ( uncles []*types.Header @@ -352,7 +334,7 @@ func (self *worker) commitNewWork() { // We only care about logging if we're actually mining if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles)) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles)) } for _, hash := range badUncles { @@ -392,6 +374,71 @@ func (self *worker) commitUncle(uncle *types.Header) error { return nil } +func (self *worker) commitTransactions(transactions types.Transactions) { + current := self.current + + for _, tx := range transactions { + // We can skip err. It has already been validated in the tx pool + from, _ := tx.From() + + // Check if it falls within margin. Txs from owned accounts are always processed. + if tx.GasPrice().Cmp(self.gasPrice) < 0 && !current.ownedAccounts.Has(from) { + // ignore the transaction and transactor. We ignore the transactor + // because nonce will fail after ignoring this transaction so there's + // no point + current.lowGasTransactors.Add(from) + + glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) + } + + // Continue with the next transaction if the transaction sender is included in + // the low gas tx set. This will also remove the tx and all sequential transaction + // from this transactor + if current.lowGasTransactors.Has(from) { + // add tx to the low gas set. This will be removed at the end of the run + // owned accounts are ignored + if !current.ownedAccounts.Has(from) { + current.lowGasTxs = append(current.lowGasTxs, tx) + } + continue + } + + // Move on to the next transaction when the transactor is in ignored transactions set + // This may occur when a transaction hits the gas limit. When a gas limit is hit and + // the transaction is processed (that could potentially be included in the block) it + // will throw a nonce error because the previous transaction hasn't been processed. + // Therefor we need to ignore any transaction after the ignored one. + if current.ignoredTransactors.Has(from) { + continue + } + + self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) + + err := self.commitTransaction(tx) + switch { + case core.IsNonceErr(err) || core.IsInvalidTxErr(err): + // Remove invalid transactions + from, _ := tx.From() + + self.chain.TxState().RemoveNonce(from, tx.Nonce()) + current.remove.Add(tx.Hash()) + + if glog.V(logger.Detail) { + glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + } + case state.IsGasLimitErr(err): + from, _ := tx.From() + // ignore the transactor so no nonce errors will be thrown for this account + // next time the worker is run, they'll be picked up again. + current.ignoredTransactors.Add(from) + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + default: + current.tcount++ + } + } +} + func (self *worker) commitTransaction(tx *types.Transaction) error { snap := self.current.state.Copy() receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) @@ -423,3 +470,11 @@ func gasprice(price *big.Int, pct int64) *big.Int { p.Mul(p, big.NewInt(pct)) return p } + +func accountAddressesSet(accounts []accounts.Account) *set.Set { + accountSet := set.New() + for _, account := range accounts { + accountSet.Add(account.Address) + } + return accountSet +} |