From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: core, light: send chain events using event.Feed (#14865) --- light/lightchain.go | 57 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 13 deletions(-) (limited to 'light/lightchain.go') diff --git a/light/lightchain.go b/light/lightchain.go index a51043975..df194ecad 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -44,11 +44,14 @@ var ( // headers, downloading block bodies and receipts on demand through an ODR // interface. It only does header validation during chain insertion. type LightChain struct { - hc *core.HeaderChain - chainDb ethdb.Database - odr OdrBackend - eventMux *event.TypeMux - genesisBlock *types.Block + hc *core.HeaderChain + chainDb ethdb.Database + odr OdrBackend + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex chainmu sync.RWMutex @@ -69,7 +72,7 @@ type LightChain struct { // NewLightChain returns a fully initialised light chain using information // available in the database. It initialises the default Ethereum header // validator. -func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) { +func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) { bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus. bc := &LightChain{ chainDb: odr.Database(), odr: odr, - eventMux: mux, quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) { } // postChainEvents iterates over the events generated by a chain insertion and -// posts them into the event mux. +// posts them into the event feed. func (self *LightChain) postChainEvents(events []interface{}) { for _, event := range events { - if event, ok := event.(core.ChainEvent); ok { - if self.LastBlockHash() == event.Hash { - self.eventMux.Post(core.ChainHeadEvent{Block: event.Block}) + switch ev := event.(type) { + case core.ChainEvent: + if self.LastBlockHash() == ev.Hash { + self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block}) } + self.chainFeed.Send(ev) + case core.ChainSideEvent: + self.chainSideFeed.Send(ev) } - // Fire the insertion events individually too - self.eventMux.Post(event) } } @@ -467,3 +471,30 @@ func (self *LightChain) LockChain() { func (self *LightChain) UnlockChain() { self.chainmu.RUnlock() } + +// SubscribeChainEvent registers a subscription of ChainEvent. +func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return self.scope.Track(self.chainFeed.Subscribe(ch)) +} + +// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent. +func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return self.scope.Track(self.chainHeadFeed.Subscribe(ch)) +} + +// SubscribeChainSideEvent registers a subscription of ChainSideEvent. +func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return self.scope.Track(self.chainSideFeed.Subscribe(ch)) +} + +// SubscribeLogsEvent implements the interface of filters.Backend +// LightChain does not send logs events, so return an empty subscription. +func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} + +// SubscribeRemovedLogsEvent implements the interface of filters.Backend +// LightChain does not send core.RemovedLogsEvent, so return an empty subscription. +func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return self.scope.Track(new(event.Feed).Subscribe(ch)) +} -- cgit