aboutsummaryrefslogtreecommitdiffstats
path: root/light
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /light
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloaddexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'light')
-rw-r--r--light/lightchain.go57
-rw-r--r--light/lightchain_test.go7
-rw-r--r--light/odr_test.go6
-rw-r--r--light/trie_test.go3
-rw-r--r--light/txpool.go95
-rw-r--r--light/txpool_test.go8
6 files changed, 111 insertions, 65 deletions
diff --git a/light/lightchain.go b/light/lightchain.go
index a51043975..df194ecad 100644
--- a/light/lightchain.go
+++ b/light/lightchain.go
@@ -44,11 +44,14 @@ var (
// headers, downloading block bodies and receipts on demand through an ODR
// interface. It only does header validation during chain insertion.
type LightChain struct {
- hc *core.HeaderChain
- chainDb ethdb.Database
- odr OdrBackend
- eventMux *event.TypeMux
- genesisBlock *types.Block
+ hc *core.HeaderChain
+ chainDb ethdb.Database
+ odr OdrBackend
+ chainFeed event.Feed
+ chainSideFeed event.Feed
+ chainHeadFeed event.Feed
+ scope event.SubscriptionScope
+ genesisBlock *types.Block
mu sync.RWMutex
chainmu sync.RWMutex
@@ -69,7 +72,7 @@ type LightChain struct {
// NewLightChain returns a fully initialised light chain using information
// available in the database. It initialises the default Ethereum header
// validator.
-func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine, mux *event.TypeMux) (*LightChain, error) {
+func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.Engine) (*LightChain, error) {
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
@@ -77,7 +80,6 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
bc := &LightChain{
chainDb: odr.Database(),
odr: odr,
- eventMux: mux,
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
@@ -316,16 +318,18 @@ func (self *LightChain) Rollback(chain []common.Hash) {
}
// postChainEvents iterates over the events generated by a chain insertion and
-// posts them into the event mux.
+// posts them into the event feed.
func (self *LightChain) postChainEvents(events []interface{}) {
for _, event := range events {
- if event, ok := event.(core.ChainEvent); ok {
- if self.LastBlockHash() == event.Hash {
- self.eventMux.Post(core.ChainHeadEvent{Block: event.Block})
+ switch ev := event.(type) {
+ case core.ChainEvent:
+ if self.LastBlockHash() == ev.Hash {
+ self.chainHeadFeed.Send(core.ChainHeadEvent{Block: ev.Block})
}
+ self.chainFeed.Send(ev)
+ case core.ChainSideEvent:
+ self.chainSideFeed.Send(ev)
}
- // Fire the insertion events individually too
- self.eventMux.Post(event)
}
}
@@ -467,3 +471,30 @@ func (self *LightChain) LockChain() {
func (self *LightChain) UnlockChain() {
self.chainmu.RUnlock()
}
+
+// SubscribeChainEvent registers a subscription of ChainEvent.
+func (self *LightChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return self.scope.Track(self.chainFeed.Subscribe(ch))
+}
+
+// SubscribeChainHeadEvent registers a subscription of ChainHeadEvent.
+func (self *LightChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return self.scope.Track(self.chainHeadFeed.Subscribe(ch))
+}
+
+// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
+func (self *LightChain) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return self.scope.Track(self.chainSideFeed.Subscribe(ch))
+}
+
+// SubscribeLogsEvent implements the interface of filters.Backend
+// LightChain does not send logs events, so return an empty subscription.
+func (self *LightChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return self.scope.Track(new(event.Feed).Subscribe(ch))
+}
+
+// SubscribeRemovedLogsEvent implements the interface of filters.Backend
+// LightChain does not send core.RemovedLogsEvent, so return an empty subscription.
+func (self *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return self.scope.Track(new(event.Feed).Subscribe(ch))
+}
diff --git a/light/lightchain_test.go b/light/lightchain_test.go
index 0ad640525..40a4d396a 100644
--- a/light/lightchain_test.go
+++ b/light/lightchain_test.go
@@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)
@@ -55,7 +54,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) {
db, _ := ethdb.NewMemDatabase()
gspec := core.Genesis{Config: params.TestChainConfig}
genesis := gspec.MustCommit(db)
- blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker(), new(event.TypeMux))
+ blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker())
// Create and inject the requested chain
if n == 0 {
@@ -75,7 +74,7 @@ func newTestLightChain() *LightChain {
Config: params.TestChainConfig,
}
gspec.MustCommit(db)
- lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker(), new(event.TypeMux))
+ lc, err := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFullFaker())
if err != nil {
panic(err)
}
@@ -339,7 +338,7 @@ func TestReorgBadHeaderHashes(t *testing.T) {
defer func() { delete(core.BadHashes, headers[3].Hash()) }()
// Create a new LightChain and check that it rolled back the state.
- ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker(), new(event.TypeMux))
+ ncm, err := NewLightChain(&dummyOdr{db: bc.chainDb}, params.TestChainConfig, ethash.NewFaker())
if err != nil {
t.Fatalf("failed to create new chain manager: %v", err)
}
diff --git a/light/odr_test.go b/light/odr_test.go
index 544b64eff..bd1e976e8 100644
--- a/light/odr_test.go
+++ b/light/odr_test.go
@@ -33,7 +33,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
@@ -233,7 +232,6 @@ func testChainGen(i int, block *core.BlockGen) {
func testChainOdr(t *testing.T, protocol int, fn odrTestFn) {
var (
- evmux = new(event.TypeMux)
sdb, _ = ethdb.NewMemDatabase()
ldb, _ = ethdb.NewMemDatabase()
gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}}
@@ -241,14 +239,14 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) {
)
gspec.MustCommit(ldb)
// Assemble the test environment
- blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{})
+ blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, 4, testChainGen)
if _, err := blockchain.InsertChain(gchain); err != nil {
t.Fatal(err)
}
odr := &testOdr{sdb: sdb, ldb: ldb}
- lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux)
+ lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
if err != nil {
t.Fatal(err)
}
diff --git a/light/trie_test.go b/light/trie_test.go
index 9b2cf7c2b..5f45c01af 100644
--- a/light/trie_test.go
+++ b/light/trie_test.go
@@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
@@ -41,7 +40,7 @@ func TestNodeIterator(t *testing.T) {
genesis = gspec.MustCommit(fulldb)
)
gspec.MustCommit(lightdb)
- blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), new(event.TypeMux), vm.Config{})
+ blockchain, _ := core.NewBlockChain(fulldb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, fulldb, 4, testChainGen)
if _, err := blockchain.InsertChain(gchain); err != nil {
panic(err)
diff --git a/light/txpool.go b/light/txpool.go
index 7cbb991e8..bd215b992 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -33,6 +33,11 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
+const (
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+)
+
// txPermanent is the number of mined blocks after a mined transaction is
// considered permanent and no rollback is expected
var txPermanent = uint64(500)
@@ -43,21 +48,23 @@ var txPermanent = uint64(500)
// always receive all locally signed transactions in the same order as they are
// created.
type TxPool struct {
- config *params.ChainConfig
- signer types.Signer
- quit chan bool
- eventMux *event.TypeMux
- events *event.TypeMuxSubscription
- mu sync.RWMutex
- chain *LightChain
- odr OdrBackend
- chainDb ethdb.Database
- relay TxRelayBackend
- head common.Hash
- nonce map[common.Address]uint64 // "pending" nonce
- pending map[common.Hash]*types.Transaction // pending transactions by tx hash
- mined map[common.Hash][]*types.Transaction // mined transactions by block hash
- clearIdx uint64 // earliest block nr that can contain mined tx info
+ config *params.ChainConfig
+ signer types.Signer
+ quit chan bool
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ chainHeadCh chan core.ChainHeadEvent
+ chainHeadSub event.Subscription
+ mu sync.RWMutex
+ chain *LightChain
+ odr OdrBackend
+ chainDb ethdb.Database
+ relay TxRelayBackend
+ head common.Hash
+ nonce map[common.Address]uint64 // "pending" nonce
+ pending map[common.Hash]*types.Transaction // pending transactions by tx hash
+ mined map[common.Hash][]*types.Transaction // mined transactions by block hash
+ clearIdx uint64 // earliest block nr that can contain mined tx info
homestead bool
}
@@ -78,23 +85,24 @@ type TxRelayBackend interface {
}
// NewTxPool creates a new light transaction pool
-func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, chain *LightChain, relay TxRelayBackend) *TxPool {
+func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool {
pool := &TxPool{
- config: config,
- signer: types.NewEIP155Signer(config.ChainId),
- nonce: make(map[common.Address]uint64),
- pending: make(map[common.Hash]*types.Transaction),
- mined: make(map[common.Hash][]*types.Transaction),
- quit: make(chan bool),
- eventMux: eventMux,
- events: eventMux.Subscribe(core.ChainHeadEvent{}),
- chain: chain,
- relay: relay,
- odr: chain.Odr(),
- chainDb: chain.Odr().Database(),
- head: chain.CurrentHeader().Hash(),
- clearIdx: chain.CurrentHeader().Number.Uint64(),
- }
+ config: config,
+ signer: types.NewEIP155Signer(config.ChainId),
+ nonce: make(map[common.Address]uint64),
+ pending: make(map[common.Hash]*types.Transaction),
+ mined: make(map[common.Hash][]*types.Transaction),
+ quit: make(chan bool),
+ chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+ chain: chain,
+ relay: relay,
+ odr: chain.Odr(),
+ chainDb: chain.Odr().Database(),
+ head: chain.CurrentHeader().Hash(),
+ clearIdx: chain.CurrentHeader().Number.Uint64(),
+ }
+ // Subscribe events from blockchain
+ pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
go pool.eventLoop()
return pool
@@ -274,13 +282,17 @@ const blockCheckTimeout = time.Second * 3
// eventLoop processes chain head events and also notifies the tx relay backend
// about the new head hash and tx state changes
func (pool *TxPool) eventLoop() {
- for ev := range pool.events.Chan() {
- switch ev.Data.(type) {
- case core.ChainHeadEvent:
- pool.setNewHead(ev.Data.(core.ChainHeadEvent).Block.Header())
+ for {
+ select {
+ case ev := <-pool.chainHeadCh:
+ pool.setNewHead(ev.Block.Header())
// hack in order to avoid hogging the lock; this part will
// be replaced by a subsequent PR.
time.Sleep(time.Millisecond)
+
+ // System stopped
+ case <-pool.chainHeadSub.Err():
+ return
}
}
}
@@ -301,11 +313,20 @@ func (pool *TxPool) setNewHead(head *types.Header) {
// Stop stops the light transaction pool
func (pool *TxPool) Stop() {
+ // Unsubscribe all subscriptions registered from txpool
+ pool.scope.Close()
+ // Unsubscribe subscriptions registered from blockchain
+ pool.chainHeadSub.Unsubscribe()
close(pool.quit)
- pool.events.Unsubscribe()
log.Info("Transaction pool stopped")
}
+// SubscribeTxPreEvent registers a subscription of core.TxPreEvent and
+// starts sending event to the given channel.
+func (pool *TxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return pool.scope.Track(pool.txFeed.Subscribe(ch))
+}
+
// Stats returns the number of currently pending (locally created) transactions
func (pool *TxPool) Stats() (pending int) {
pool.mu.RLock()
@@ -388,7 +409,7 @@ func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
- go self.eventMux.Post(core.TxPreEvent{Tx: tx})
+ go self.txFeed.Send(core.TxPreEvent{Tx: tx})
}
// Print a log message if low enough level is set
diff --git a/light/txpool_test.go b/light/txpool_test.go
index f23832a41..fe7936ac2 100644
--- a/light/txpool_test.go
+++ b/light/txpool_test.go
@@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
)
@@ -82,7 +81,6 @@ func TestTxPool(t *testing.T) {
}
var (
- evmux = new(event.TypeMux)
sdb, _ = ethdb.NewMemDatabase()
ldb, _ = ethdb.NewMemDatabase()
gspec = core.Genesis{Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}}
@@ -90,7 +88,7 @@ func TestTxPool(t *testing.T) {
)
gspec.MustCommit(ldb)
// Assemble the test environment
- blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), evmux, vm.Config{})
+ blockchain, _ := core.NewBlockChain(sdb, params.TestChainConfig, ethash.NewFullFaker(), vm.Config{})
gchain, _ := core.GenerateChain(params.TestChainConfig, genesis, sdb, poolTestBlocks, txPoolTestChainGen)
if _, err := blockchain.InsertChain(gchain); err != nil {
panic(err)
@@ -102,9 +100,9 @@ func TestTxPool(t *testing.T) {
discard: make(chan int, 1),
mined: make(chan int, 1),
}
- lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker(), evmux)
+ lightchain, _ := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
txPermanent = 50
- pool := NewTxPool(params.TestChainConfig, evmux, lightchain, relay)
+ pool := NewTxPool(params.TestChainConfig, lightchain, relay)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()