aboutsummaryrefslogtreecommitdiffstats
path: root/les
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /les
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloaddexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'les')
-rw-r--r--les/api_backend.go24
-rw-r--r--les/backend.go4
-rw-r--r--les/handler.go1
-rw-r--r--les/helper_test.go4
-rw-r--r--les/server.go9
5 files changed, 34 insertions, 8 deletions
diff --git a/les/api_backend.go b/les/api_backend.go
index 7a3c2447c..1323e8864 100644
--- a/les/api_backend.go
+++ b/les/api_backend.go
@@ -124,6 +124,30 @@ func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.txPool.Content()
}
+func (b *LesApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.eth.txPool.SubscribeTxPreEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.eth.blockchain.SubscribeChainEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return b.eth.blockchain.SubscribeChainHeadEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return b.eth.blockchain.SubscribeChainSideEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.eth.blockchain.SubscribeLogsEvent(ch)
+}
+
+func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
+}
+
func (b *LesApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}
diff --git a/les/backend.go b/les/backend.go
index a4e772671..4c33417c0 100644
--- a/les/backend.go
+++ b/les/backend.go
@@ -103,7 +103,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
eth.serverPool = newServerPool(chainDb, quitSync, &eth.wg)
eth.retriever = newRetrieveManager(peers, eth.reqDist, eth.serverPool)
eth.odr = NewLesOdr(chainDb, eth.retriever)
- if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine, eth.eventMux); err != nil {
+ if eth.blockchain, err = light.NewLightChain(eth.odr, eth.chainConfig, eth.engine); err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
@@ -113,7 +113,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
- eth.txPool = light.NewTxPool(eth.chainConfig, eth.eventMux, eth.blockchain, eth.relay)
+ eth.txPool = light.NewTxPool(eth.chainConfig, eth.blockchain, eth.relay)
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, true, config.NetworkId, eth.eventMux, eth.engine, eth.peers, eth.blockchain, nil, chainDb, eth.odr, eth.relay, quitSync, &eth.wg); err != nil {
return nil, err
}
diff --git a/les/handler.go b/les/handler.go
index 234b6e998..1a75cd369 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -82,6 +82,7 @@ type BlockChain interface {
GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash
LastBlockHash() common.Hash
Genesis() *types.Block
+ SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
}
type txPool interface {
diff --git a/les/helper_test.go b/les/helper_test.go
index 7dccfc458..b33454e1d 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -146,9 +146,9 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
}
if lightSync {
- chain, _ = light.NewLightChain(odr, gspec.Config, engine, evmux)
+ chain, _ = light.NewLightChain(odr, gspec.Config, engine)
} else {
- blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+ blockchain, _ := core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
gchain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(gchain); err != nil {
panic(err)
diff --git a/les/server.go b/les/server.go
index 39ef0efff..8b2730714 100644
--- a/les/server.go
+++ b/les/server.go
@@ -271,7 +271,8 @@ func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) {
func (pm *ProtocolManager) blockLoop() {
pm.wg.Add(1)
- sub := pm.eventMux.Subscribe(core.ChainHeadEvent{})
+ headCh := make(chan core.ChainHeadEvent, 10)
+ headSub := pm.blockchain.SubscribeChainHeadEvent(headCh)
newCht := make(chan struct{}, 10)
newCht <- struct{}{}
go func() {
@@ -280,10 +281,10 @@ func (pm *ProtocolManager) blockLoop() {
lastBroadcastTd := common.Big0
for {
select {
- case ev := <-sub.Chan():
+ case ev := <-headCh:
peers := pm.peers.AllPeers()
if len(peers) > 0 {
- header := ev.Data.(core.ChainHeadEvent).Block.Header()
+ header := ev.Block.Header()
hash := header.Hash()
number := header.Number.Uint64()
td := core.GetTd(pm.chainDb, hash, number)
@@ -319,7 +320,7 @@ func (pm *ProtocolManager) blockLoop() {
}
}()
case <-pm.quitSync:
- sub.Unsubscribe()
+ headSub.Unsubscribe()
pm.wg.Done()
return
}