diff options
Diffstat (limited to 'ethstats/ethstats.go')
-rw-r--r-- | ethstats/ethstats.go | 63 |
1 files changed, 44 insertions, 19 deletions
diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index b75c5e6da..bb03dc72b 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -44,9 +44,27 @@ import ( "golang.org/x/net/websocket" ) -// historyUpdateRange is the number of blocks a node should report upon login or -// history request. -const historyUpdateRange = 50 +const ( + // historyUpdateRange is the number of blocks a node should report upon login or + // history request. + historyUpdateRange = 50 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 +) + +type txPool interface { + // SubscribeTxPreEvent should return an event subscription of + // TxPreEvent and send events to the given channel. + SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription +} + +type blockChain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription +} // Service implements an Ethereum netstats reporting daemon that pushes local // chain statistics up to a monitoring server. @@ -118,16 +136,22 @@ func (s *Service) Stop() error { // until termination. func (s *Service) loop() { // Subscribe to chain events to execute updates on - var emux *event.TypeMux + var blockchain blockChain + var txpool txPool if s.eth != nil { - emux = s.eth.EventMux() + blockchain = s.eth.BlockChain() + txpool = s.eth.TxPool() } else { - emux = s.les.EventMux() + blockchain = s.les.BlockChain() + txpool = s.les.TxPool() } - headSub := emux.Subscribe(core.ChainHeadEvent{}) + + chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + headSub := blockchain.SubscribeChainHeadEvent(chainHeadCh) defer headSub.Unsubscribe() - txSub := emux.Subscribe(core.TxPreEvent{}) + txEventCh := make(chan core.TxPreEvent, txChanSize) + txSub := txpool.SubscribeTxPreEvent(txEventCh) defer txSub.Unsubscribe() // Start a goroutine that exhausts the subsciptions to avoid events piling up @@ -139,25 +163,18 @@ func (s *Service) loop() { go func() { var lastTx mclock.AbsTime + HandleLoop: for { select { // Notify of chain head events, but drop if too frequent - case head, ok := <-headSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case head := <-chainHeadCh: select { - case headCh <- head.Data.(core.ChainHeadEvent).Block: + case headCh <- head.Block: default: } // Notify of new transaction events, but drop if too frequent - case _, ok := <-txSub.Chan(): - if !ok { // node stopped - close(quitCh) - return - } + case <-txEventCh: if time.Duration(mclock.Now()-lastTx) < time.Second { continue } @@ -167,8 +184,16 @@ func (s *Service) loop() { case txCh <- struct{}{}: default: } + + // node stopped + case <-txSub.Err(): + break HandleLoop + case <-headSub.Err(): + break HandleLoop } } + close(quitCh) + return }() // Loop reporting until termination for { |