aboutsummaryrefslogtreecommitdiffstats
path: root/core/blockchain.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/blockchain.go')
-rw-r--r--core/blockchain.go92
1 files changed, 69 insertions, 23 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 3fb8be15f..f3ca4e08c 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -79,10 +79,16 @@ const (
type BlockChain struct {
config *params.ChainConfig // chain & network configuration
- hc *HeaderChain
- chainDb ethdb.Database
- eventMux *event.TypeMux
- genesisBlock *types.Block
+ hc *HeaderChain
+ chainDb ethdb.Database
+ rmTxFeed event.Feed
+ rmLogsFeed event.Feed
+ chainFeed event.Feed
+ chainSideFeed event.Feed
+ chainHeadFeed event.Feed
+ logsFeed event.Feed
+ scope event.SubscriptionScope
+ genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
@@ -115,7 +121,7 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
-func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux, vmConfig vm.Config) (*BlockChain, error) {
+func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
@@ -126,7 +132,6 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
config: config,
chainDb: chainDb,
stateCache: state.NewDatabase(chainDb),
- eventMux: mux,
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
@@ -594,6 +599,8 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
+ // Unsubscribe all subscriptions registered from blockchain
+ bc.scope.Close()
close(bc.quit)
atomic.StoreInt32(&bc.procInterrupt, 1)
@@ -1000,6 +1007,12 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
+ // We need some control over the mining operation. Acquiring locks and waiting
+ // for the miner to create new block takes too long and in most cases isn't
+ // even necessary.
+ if bc.LastBlockHash() == block.Hash() {
+ events = append(events, ChainHeadEvent{block})
+ }
// Write the positional metadata for transaction and receipt lookups
if err := WriteTxLookupEntries(bc.chainDb, block); err != nil {
@@ -1024,7 +1037,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
stats.usedGas += usedGas.Uint64()
stats.report(chain, i)
}
- go bc.postChainEvents(events, coalescedLogs)
+ go bc.PostChainEvents(events, coalescedLogs)
return 0, nil
}
@@ -1184,16 +1197,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// Must be posted in a goroutine because of the transaction pool trying
// to acquire the chain manager lock
if len(diff) > 0 {
- go bc.eventMux.Post(RemovedTransactionEvent{diff})
+ go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
- go bc.eventMux.Post(RemovedLogsEvent{deletedLogs})
+ go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
- bc.eventMux.Post(ChainSideEvent{Block: block})
+ bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}
@@ -1201,22 +1214,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return nil
}
-// postChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event mux.
-func (bc *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) {
+// PostChainEvents iterates over the events generated by a chain insertion and
+// posts them into the event feed.
+// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
+func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
// post event logs for further processing
- bc.eventMux.Post(logs)
+ if logs != nil {
+ bc.logsFeed.Send(logs)
+ }
for _, event := range events {
- if event, ok := event.(ChainEvent); ok {
- // We need some control over the mining operation. Acquiring locks and waiting
- // for the miner to create new block takes too long and in most cases isn't
- // even necessary.
- if bc.LastBlockHash() == event.Hash {
- bc.eventMux.Post(ChainHeadEvent{event.Block})
- }
+ switch ev := event.(type) {
+ case ChainEvent:
+ bc.chainFeed.Send(ev)
+
+ case ChainHeadEvent:
+ bc.chainHeadFeed.Send(ev)
+
+ case ChainSideEvent:
+ bc.chainSideFeed.Send(ev)
}
- // Fire the insertion events individually too
- bc.eventMux.Post(event)
}
}
@@ -1384,3 +1400,33 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
// Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
+
+// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
+func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
+ return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
+}
+
+// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
+func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
+ return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
+}
+
+// SubscribeChainEvent registers a subscription of ChainEvent.
+func (bc *BlockChain) SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription {
+ return bc.scope.Track(bc.chainFeed.Subscribe(ch))
+}
+
+// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
+func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
+ return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
+}
+
+// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
+func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
+ return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
+}
+
+// SubscribeLogsEvent registers a subscription of []*types.Log.
+func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return bc.scope.Track(bc.logsFeed.Subscribe(ch))
+}