aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
Diffstat (limited to 'eth')
-rw-r--r--eth/api.go7
-rw-r--r--eth/api_backend.go40
-rw-r--r--eth/backend.go53
-rw-r--r--eth/backend_test.go74
-rw-r--r--eth/bloombits.go142
-rw-r--r--eth/config.go1
-rw-r--r--eth/db_upgrade.go44
-rw-r--r--eth/downloader/downloader.go11
-rw-r--r--eth/downloader/downloader_test.go2
-rw-r--r--eth/downloader/statesync.go77
-rw-r--r--eth/filters/api.go47
-rw-r--r--eth/filters/bench_test.go201
-rw-r--r--eth/filters/filter.go251
-rw-r--r--eth/filters/filter_system.go98
-rw-r--r--eth/filters/filter_system_test.go151
-rw-r--r--eth/filters/filter_test.go124
-rw-r--r--eth/gen_config.go4
-rw-r--r--eth/handler.go36
-rw-r--r--eth/handler_test.go6
-rw-r--r--eth/helper_test.go15
-rw-r--r--eth/protocol.go6
-rw-r--r--eth/sync.go20
22 files changed, 907 insertions, 503 deletions
diff --git a/eth/api.go b/eth/api.go
index f5214fc37..a5b6e7076 100644
--- a/eth/api.go
+++ b/eth/api.go
@@ -241,7 +241,7 @@ func (api *PrivateAdminAPI) ExportChain(file string) (bool, error) {
func hasAllBlocks(chain *core.BlockChain, bs []*types.Block) bool {
for _, b := range bs {
- if !chain.HasBlock(b.Hash()) {
+ if !chain.HasBlock(b.Hash(), b.NumberU64()) {
return false
}
}
@@ -523,7 +523,8 @@ func (api *PrivateDebugAPI) TraceTransaction(ctx context.Context, txHash common.
// Run the transaction with tracing enabled.
vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{Debug: true, Tracer: tracer})
- ret, gas, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()))
+ // TODO utilize failed flag
+ ret, gas, _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()))
if err != nil {
return nil, fmt.Errorf("tracing failed: %v", err)
}
@@ -570,7 +571,7 @@ func (api *PrivateDebugAPI) computeTxEnv(blockHash common.Hash, txIndex int) (co
vmenv := vm.NewEVM(context, statedb, api.config, vm.Config{})
gp := new(core.GasPool).AddGas(tx.Gas())
- _, _, err := core.ApplyMessage(vmenv, msg, gp)
+ _, _, _, err := core.ApplyMessage(vmenv, msg, gp)
if err != nil {
return nil, vm.Context{}, nil, fmt.Errorf("tx %x failed: %v", tx.Hash(), err)
}
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 7ef7c030d..91f392f94 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@@ -115,12 +116,28 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
}
-func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
- return b.eth.txPool.AddLocal(signedTx)
+func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
+}
+
+func (b *EthApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription {
+ return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}
-func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
- b.eth.txPool.Remove(txHash)
+func (b *EthApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.eth.BlockChain().SubscribeLogsEvent(ch)
+}
+
+func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
+ return b.eth.txPool.AddLocal(signedTx)
}
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
@@ -151,6 +168,10 @@ func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions,
return b.eth.TxPool().Content()
}
+func (b *EthApiBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.eth.TxPool().SubscribeTxPreEvent(ch)
+}
+
func (b *EthApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}
@@ -174,3 +195,14 @@ func (b *EthApiBackend) EventMux() *event.TypeMux {
func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}
+
+func (b *EthApiBackend) BloomStatus() (uint64, uint64) {
+ sections, _, _ := b.eth.bloomIndexer.Sections()
+ return params.BloomBitsBlocks, sections
+}
+
+func (b *EthApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ for i := 0; i < bloomFilterThreads; i++ {
+ go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
+ }
+}
diff --git a/eth/backend.go b/eth/backend.go
index 8a837f7b8..99a1928fe 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@@ -57,15 +58,19 @@ type LesServer interface {
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
+ config *Config
chainConfig *params.ChainConfig
+
// Channel for shutting down the service
shutdownChan chan bool // Channel for shutting down the ethereum
stopDbUpgrade func() error // stop chain db sequential key upgrade
+
// Handlers
txPool *core.TxPool
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
+
// DB interfaces
chainDb ethdb.Database // Block chain database
@@ -73,6 +78,9 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
+ bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
+ bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
+
ApiBackend *EthApiBackend
miner *miner.Miner
@@ -98,7 +106,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
-
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
@@ -111,6 +118,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
+ config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
@@ -121,11 +129,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
+ bloomRequests: make(chan chan *bloombits.Retrieval),
+ bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
- if err := addMipmapBloomBins(chainDb); err != nil {
- return nil, err
- }
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
@@ -137,7 +144,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
- eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, eth.eventMux, vmConfig)
+ eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
@@ -147,27 +154,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
+ eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
- eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
-
- maxPeers := config.MaxPeers
- if config.LightServ > 0 {
- // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections
- // temporary solution until the new peer connectivity API is finished
- halfPeers := maxPeers / 2
- maxPeers -= config.LightPeers
- if maxPeers < halfPeers {
- maxPeers = halfPeers
- }
- }
+ eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
- if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
+ if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
-
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
@@ -366,17 +362,29 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
- } else {
- return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
+ return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
+ // Start the bloom bits servicing goroutines
+ s.startBloomHandlers()
+
+ // Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
- s.protocolManager.Start()
+ // Figure out a max peers count based on the server limits
+ maxPeers := srvr.MaxPeers
+ if s.config.LightServ > 0 {
+ maxPeers -= s.config.LightPeers
+ if maxPeers < srvr.MaxPeers/2 {
+ maxPeers = srvr.MaxPeers / 2
+ }
+ }
+ // Start the networking layer and the light server if requested
+ s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
@@ -389,6 +397,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
+ s.bloomIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
diff --git a/eth/backend_test.go b/eth/backend_test.go
deleted file mode 100644
index 4351b24cf..000000000
--- a/eth/backend_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package eth
-
-import (
- "math/big"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/params"
-)
-
-func TestMipmapUpgrade(t *testing.T) {
- db, _ := ethdb.NewMemDatabase()
- addr := common.BytesToAddress([]byte("jeff"))
- genesis := new(core.Genesis).MustCommit(db)
-
- chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
- switch i {
- case 1:
- receipt := types.NewReceipt(nil, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- case 2:
- receipt := types.NewReceipt(nil, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- }
- })
- for i, block := range chain {
- core.WriteBlock(db, block)
- if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil {
- t.Fatal("error writing block receipts:", err)
- }
- }
-
- err := addMipmapBloomBins(db)
- if err != nil {
- t.Fatal(err)
- }
-
- bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0])
- if (bloom == types.Bloom{}) {
- t.Error("got empty bloom filter")
- }
-
- data, _ := db.Get([]byte("setting-mipmap-version"))
- if len(data) == 0 {
- t.Error("setting-mipmap-version not written to database")
- }
-}
diff --git a/eth/bloombits.go b/eth/bloombits.go
new file mode 100644
index 000000000..32f6c7b31
--- /dev/null
+++ b/eth/bloombits.go
@@ -0,0 +1,142 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+const (
+ // bloomServiceThreads is the number of goroutines used globally by an Ethereum
+ // instance to service bloombits lookups for all running filters.
+ bloomServiceThreads = 16
+
+ // bloomFilterThreads is the number of goroutines used locally per filter to
+ // multiplex requests onto the global servicing goroutines.
+ bloomFilterThreads = 3
+
+ // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
+ // in a single batch.
+ bloomRetrievalBatch = 16
+
+ // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
+ // to accumulate request an entire batch (avoiding hysteresis).
+ bloomRetrievalWait = time.Duration(0)
+)
+
+// startBloomHandlers starts a batch of goroutines to accept bloom bit database
+// retrievals from possibly a range of filters and serving the data to satisfy.
+func (eth *Ethereum) startBloomHandlers() {
+ for i := 0; i < bloomServiceThreads; i++ {
+ go func() {
+ for {
+ select {
+ case <-eth.shutdownChan:
+ return
+
+ case request := <-eth.bloomRequests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ head := core.GetCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
+ blob, err := bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb, task.Bit, section, head), int(params.BloomBitsBlocks)/8)
+ if err != nil {
+ panic(err)
+ }
+ task.Bitsets[i] = blob
+ }
+ request <- task
+ }
+ }
+ }()
+ }
+}
+
+const (
+ // bloomConfirms is the number of confirmation blocks before a bloom section is
+ // considered probably final and its rotated bits are calculated.
+ bloomConfirms = 256
+
+ // bloomThrottling is the time to wait between processing two consecutive index
+ // sections. It's useful during chain upgrades to prevent disk overload.
+ bloomThrottling = 100 * time.Millisecond
+)
+
+// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
+// for the Ethereum header bloom filters, permitting blazing fast filtering.
+type BloomIndexer struct {
+ size uint64 // section size to generate bloombits for
+
+ db ethdb.Database // database instance to write index data and metadata into
+ gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
+
+ section uint64 // Section is the section number being processed currently
+ head common.Hash // Head is the hash of the last header processed
+}
+
+// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
+// canonical chain for fast logs filtering.
+func NewBloomIndexer(db ethdb.Database, size uint64) *core.ChainIndexer {
+ backend := &BloomIndexer{
+ db: db,
+ size: size,
+ }
+ table := ethdb.NewTable(db, string(core.BloomBitsIndexPrefix))
+
+ return core.NewChainIndexer(db, table, backend, size, bloomConfirms, bloomThrottling, "bloombits")
+}
+
+// Reset implements core.ChainIndexerBackend, starting a new bloombits index
+// section.
+func (b *BloomIndexer) Reset(section uint64) {
+ gen, err := bloombits.NewGenerator(uint(b.size))
+ if err != nil {
+ panic(err)
+ }
+ b.gen, b.section, b.head = gen, section, common.Hash{}
+}
+
+// Process implements core.ChainIndexerBackend, adding a new header's bloom into
+// the index.
+func (b *BloomIndexer) Process(header *types.Header) {
+ b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
+ b.head = header.Hash()
+}
+
+// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
+// writing it out into the database.
+func (b *BloomIndexer) Commit() error {
+ batch := b.db.NewBatch()
+
+ for i := 0; i < types.BloomBitLength; i++ {
+ bits, err := b.gen.Bitset(uint(i))
+ if err != nil {
+ return err
+ }
+ core.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
+ }
+ return batch.Write()
+}
diff --git a/eth/config.go b/eth/config.go
index 4109cff8b..7bcfd403e 100644
--- a/eth/config.go
+++ b/eth/config.go
@@ -79,7 +79,6 @@ type Config struct {
// Light client options
LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
- MaxPeers int `toml:"-"` // Maximum number of global peers
// Database options
SkipBcVersionCheck bool `toml:"-"`
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index 90111b2b3..d41afa17c 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -19,7 +19,6 @@ package eth
import (
"bytes"
- "fmt"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -134,46 +133,3 @@ func upgradeDeduplicateData(db ethdb.Database) func() error {
return <-errc
}
}
-
-func addMipmapBloomBins(db ethdb.Database) (err error) {
- const mipmapVersion uint = 2
-
- // check if the version is set. We ignore data for now since there's
- // only one version so we can easily ignore it for now
- var data []byte
- data, _ = db.Get([]byte("setting-mipmap-version"))
- if len(data) > 0 {
- var version uint
- if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion {
- return nil
- }
- }
-
- defer func() {
- if err == nil {
- var val []byte
- val, err = rlp.EncodeToBytes(mipmapVersion)
- if err == nil {
- err = db.Put([]byte("setting-mipmap-version"), val)
- }
- return
- }
- }()
- latestHash := core.GetHeadBlockHash(db)
- latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash))
- if latestBlock == nil { // clean database
- return
- }
-
- tstart := time.Now()
- log.Warn("Upgrading db log bloom bins")
- for i := uint64(0); i <= latestBlock.NumberU64(); i++ {
- hash := core.GetCanonicalHash(db, i)
- if (hash == common.Hash{}) {
- return fmt.Errorf("chain db corrupted. Could not find block %d.", i)
- }
- core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i))
- }
- log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart)))
- return nil
-}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 13f3b11c9..5782d4cf5 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -156,7 +156,7 @@ type Downloader struct {
// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// HasHeader verifies a header's presence in the local chain.
- HasHeader(common.Hash) bool
+ HasHeader(h common.Hash, number uint64) bool
// GetHeaderByHash retrieves a header from the local chain.
GetHeaderByHash(common.Hash) *types.Header
@@ -666,7 +666,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
continue
}
// Otherwise check if we already know the header or not
- if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash())) {
+ if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
// If every header is known, even future ones, the peer straight out lied about its head
@@ -731,7 +731,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err
arrived = true
// Modify the search interval based on the response
- if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash())) {
+ if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) {
end = check
break
}
@@ -1174,7 +1174,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// If we're already past the pivot point, this could be an attack, thread carefully
if rollback[len(rollback)-1].Number.Uint64() > pivot {
- // If we didn't ever fail, lock in te pivot header (must! not! change!)
+ // If we didn't ever fail, lock in the pivot header (must! not! change!)
if atomic.LoadUint32(&d.fsPivotFails) == 0 {
for _, header := range rollback {
if header.Number.Uint64() == pivot {
@@ -1260,7 +1260,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(headers))
for _, header := range chunk {
- if !d.lightchain.HasHeader(header.Hash()) {
+ if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
unknown = append(unknown, header)
}
}
@@ -1396,7 +1396,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync.Cancel()
if err := d.commitPivotBlock(P); err != nil {
return err
-
}
}
if err := d.importBlockResults(afterP); err != nil {
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index d66aafe94..58f6e9a62 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -217,7 +217,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error {
}
// HasHeader checks if a header is present in the testers canonical chain.
-func (dl *downloadTester) HasHeader(hash common.Hash) bool {
+func (dl *downloadTester) HasHeader(hash common.Hash, number uint64) bool {
return dl.GetHeaderByHash(hash) != nil
}
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go
index a5ce8c42d..eb5416f63 100644
--- a/eth/downloader/statesync.go
+++ b/eth/downloader/statesync.go
@@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto/sha3"
+ "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
)
@@ -187,10 +188,13 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync {
type stateSync struct {
d *Downloader // Downloader instance to access and manage current peerset
- sched *state.StateSync // State trie sync scheduler defining the tasks
+ sched *trie.TrieSync // State trie sync scheduler defining the tasks
keccak hash.Hash // Keccak256 hasher to verify deliveries with
tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval
+ numUncommitted int
+ bytesUncommitted int
+
deliver chan *stateReq // Delivery channel multiplexing peer responses
cancel chan struct{} // Channel to signal a termination request
cancelOnce sync.Once // Ensures cancel only ever gets called once
@@ -252,9 +256,10 @@ func (s *stateSync) loop() error {
// Keep assigning new tasks until the sync completes or aborts
for s.sched.Pending() > 0 {
- if err := s.assignTasks(); err != nil {
+ if err := s.commit(false); err != nil {
return err
}
+ s.assignTasks()
// Tasks assigned, wait for something to happen
select {
case <-newPeer:
@@ -284,12 +289,28 @@ func (s *stateSync) loop() error {
}
}
}
+ return s.commit(true)
+}
+
+func (s *stateSync) commit(force bool) error {
+ if !force && s.bytesUncommitted < ethdb.IdealBatchSize {
+ return nil
+ }
+ start := time.Now()
+ b := s.d.stateDB.NewBatch()
+ s.sched.Commit(b)
+ if err := b.Write(); err != nil {
+ return fmt.Errorf("DB write error: %v", err)
+ }
+ s.updateStats(s.numUncommitted, 0, 0, time.Since(start))
+ s.numUncommitted = 0
+ s.bytesUncommitted = 0
return nil
}
// assignTasks attempts to assing new tasks to all idle peers, either from the
// batch currently being retried, or fetching new data from the trie sync itself.
-func (s *stateSync) assignTasks() error {
+func (s *stateSync) assignTasks() {
// Iterate over all idle peers and try to assign them state fetches
peers, _ := s.d.peers.NodeDataIdlePeers()
for _, p := range peers {
@@ -301,7 +322,6 @@ func (s *stateSync) assignTasks() error {
// If the peer was assigned tasks to fetch, send the network request
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
-
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
@@ -309,7 +329,6 @@ func (s *stateSync) assignTasks() error {
}
}
}
- return nil
}
// fillTasks fills the given request object with a maximum of n state download
@@ -347,11 +366,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// delivered.
func (s *stateSync) process(req *stateReq) (bool, error) {
// Collect processing stats and update progress if valid data was received
- processed, written, duplicate, unexpected := 0, 0, 0, 0
+ duplicate, unexpected := 0, 0
defer func(start time.Time) {
- if processed+written+duplicate+unexpected > 0 {
- s.updateStats(processed, written, duplicate, unexpected, time.Since(start))
+ if duplicate > 0 || unexpected > 0 {
+ s.updateStats(0, duplicate, unexpected, time.Since(start))
}
}(time.Now())
@@ -362,7 +381,9 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
prog, hash, err := s.processNodeData(blob)
switch err {
case nil:
- processed++
+ s.numUncommitted++
+ s.bytesUncommitted += len(blob)
+ progress = progress || prog
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
@@ -370,38 +391,20 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
default:
return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
- if prog {
- progress = true
- }
// If the node delivered a requested item, mark the delivery non-stale
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
stale = false
}
}
- // If some data managed to hit the database, flush and reset failure counters
- if progress {
- // Flush any accumulated data out to disk
- batch := s.d.stateDB.NewBatch()
-
- count, err := s.sched.Commit(batch)
- if err != nil {
- return stale, err
- }
- if err := batch.Write(); err != nil {
- return stale, err
- }
- written = count
-
- // If we're inside the critical section, reset fail counter since we progressed
- if atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
- log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
- atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
- }
+ // If we're inside the critical section, reset fail counter since we progressed.
+ if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 {
+ log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails))
+ atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
}
+
// Put unfulfilled tasks back into the retry queue
npeers := s.d.peers.Len()
-
for hash, task := range req.tasks {
// If the node did deliver something, missing items may be due to a protocol
// limit or a previous timeout + delayed delivery. Both cases should permit
@@ -425,25 +428,25 @@ func (s *stateSync) process(req *stateReq) (bool, error) {
// error occurred.
func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
res := trie.SyncResult{Data: blob}
-
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Sum(res.Hash[:0])
-
committed, _, err := s.sched.Process([]trie.SyncResult{res})
return committed, res.Hash, err
}
// updateStats bumps the various state sync progress counters and displays a log
// message for the user to see.
-func (s *stateSync) updateStats(processed, written, duplicate, unexpected int, duration time.Duration) {
+func (s *stateSync) updateStats(written, duplicate, unexpected int, duration time.Duration) {
s.d.syncStatsLock.Lock()
defer s.d.syncStatsLock.Unlock()
s.d.syncStatsState.pending = uint64(s.sched.Pending())
- s.d.syncStatsState.processed += uint64(processed)
+ s.d.syncStatsState.processed += uint64(written)
s.d.syncStatsState.duplicate += uint64(duplicate)
s.d.syncStatsState.unexpected += uint64(unexpected)
- log.Info("Imported new state entries", "count", processed, "flushed", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+ if written > 0 || duplicate > 0 || unexpected > 0 {
+ log.Info("Imported new state entries", "count", written, "elapsed", common.PrettyDuration(duration), "processed", s.d.syncStatsState.processed, "pending", s.d.syncStatsState.pending, "retry", len(s.tasks), "duplicate", s.d.syncStatsState.duplicate, "unexpected", s.d.syncStatsState.unexpected)
+ }
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index fff58a268..6e1d48adb 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -52,8 +52,8 @@ type filter struct {
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
- useMipMap bool
mux *event.TypeMux
+ quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
@@ -63,14 +63,12 @@ type PublicFilterAPI struct {
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- useMipMap: !lightMode,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
-
go api.timeoutLoop()
return api
@@ -325,20 +323,20 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
+ // Convert the RPC block numbers into internal representations
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
- filter := New(api.backend, api.useMipMap)
- filter.SetBeginBlock(crit.FromBlock.Int64())
- filter.SetEndBlock(crit.ToBlock.Int64())
- filter.SetAddresses(crit.Addresses)
- filter.SetTopics(crit.Topics)
-
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
+ if err != nil {
+ return nil, err
+ }
return returnLogs(logs), err
}
@@ -372,21 +370,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- filter := New(api.backend, api.useMipMap)
+ begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
- filter.SetBeginBlock(f.crit.FromBlock.Int64())
- } else {
- filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
+ begin = f.crit.FromBlock.Int64()
}
+ end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
- filter.SetEndBlock(f.crit.ToBlock.Int64())
- } else {
- filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
+ end = f.crit.ToBlock.Int64()
}
- filter.SetAddresses(f.crit.Addresses)
- filter.SetTopics(f.crit.Topics)
+ // Create and run the filter to get all the logs
+ filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
- logs, err := filter.Find(ctx)
+ logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
@@ -394,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
}
// GetFilterChanges returns the logs for the filter with the given id since
-// last time is was called. This can be used for polling.
+// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
new file mode 100644
index 000000000..abbf4593e
--- /dev/null
+++ b/eth/filters/bench_test.go
@@ -0,0 +1,201 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+)
+
+func BenchmarkBloomBits512(b *testing.B) {
+ benchmarkBloomBits(b, 512)
+}
+
+func BenchmarkBloomBits1k(b *testing.B) {
+ benchmarkBloomBits(b, 1024)
+}
+
+func BenchmarkBloomBits2k(b *testing.B) {
+ benchmarkBloomBits(b, 2048)
+}
+
+func BenchmarkBloomBits4k(b *testing.B) {
+ benchmarkBloomBits(b, 4096)
+}
+
+func BenchmarkBloomBits8k(b *testing.B) {
+ benchmarkBloomBits(b, 8192)
+}
+
+func BenchmarkBloomBits16k(b *testing.B) {
+ benchmarkBloomBits(b, 16384)
+}
+
+func BenchmarkBloomBits32k(b *testing.B) {
+ benchmarkBloomBits(b, 32768)
+}
+
+const benchFilterCnt = 2000
+
+func benchmarkBloomBits(b *testing.B, sectionSize uint64) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running bloombits benchmark section size:", sectionSize)
+
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+
+ clearBloomBits(db)
+ fmt.Println("Generating bloombits data...")
+ headNum := core.GetBlockNumber(db, head)
+ if headNum < sectionSize+512 {
+ b.Fatalf("not enough blocks for running a benchmark")
+ }
+
+ start := time.Now()
+ cnt := (headNum - 512) / sectionSize
+ var dataSize, compSize uint64
+ for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
+ bc, err := bloombits.NewGenerator(uint(sectionSize))
+ if err != nil {
+ b.Fatalf("failed to create generator: %v", err)
+ }
+ var header *types.Header
+ for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
+ hash := core.GetCanonicalHash(db, i)
+ header = core.GetHeader(db, hash, i)
+ if header == nil {
+ b.Fatalf("Error creating bloomBits data")
+ }
+ bc.AddBloom(uint(i-sectionIdx*sectionSize), header.Bloom)
+ }
+ sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
+ for i := 0; i < types.BloomBitLength; i++ {
+ data, err := bc.Bitset(uint(i))
+ if err != nil {
+ b.Fatalf("failed to retrieve bitset: %v", err)
+ }
+ comp := bitutil.CompressBytes(data)
+ dataSize += uint64(len(data))
+ compSize += uint64(len(comp))
+ core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp)
+ }
+ //if sectionIdx%50 == 0 {
+ // fmt.Println(" section", sectionIdx, "/", cnt)
+ //}
+ }
+
+ d := time.Since(start)
+ fmt.Println("Finished generating bloombits data")
+ fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
+ fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))
+
+ fmt.Println("Running filter benchmarks...")
+ start = time.Now()
+ mux := new(event.TypeMux)
+ var backend *testBackend
+
+ for i := 0; i < benchFilterCnt; i++ {
+ if i%20 == 0 {
+ db.Close()
+ db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ }
+ var addr common.Address
+ addr[0] = byte(i)
+ addr[1] = byte(i / 256)
+ filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
+ if _, err := filter.Logs(context.Background()); err != nil {
+ b.Error("filter.Find error:", err)
+ }
+ }
+ d = time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
+ db.Close()
+}
+
+func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) {
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(startPrefix)
+ for it.Valid() {
+ key := it.Key()
+ cmpLen := len(key)
+ if len(endPrefix) < cmpLen {
+ cmpLen = len(endPrefix)
+ }
+ if bytes.Compare(key[:cmpLen], endPrefix) == 1 {
+ break
+ }
+ fn(common.CopyBytes(key))
+ it.Next()
+ }
+ it.Release()
+}
+
+var bloomBitsPrefix = []byte("bloomBits-")
+
+func clearBloomBits(db ethdb.Database) {
+ fmt.Println("Clearing bloombits data...")
+ forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) {
+ db.Delete(key)
+ })
+}
+
+func BenchmarkNoBloomBits(b *testing.B) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running benchmark without bloombits")
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+ headNum := core.GetBlockNumber(db, head)
+
+ clearBloomBits(db)
+
+ fmt.Println("Running filter benchmarks...")
+ start := time.Now()
+ mux := new(event.TypeMux)
+ backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil)
+ filter.Logs(context.Background())
+ d := time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
+ db.Close()
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f27b76929..4f6c30058 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,11 +18,12 @@ package filters
import (
"context"
- "math"
"math/big"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -34,163 +35,179 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
+ SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
+ SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
+ SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+
+ BloomStatus() (uint64, uint64)
+ ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
+ backend Backend
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single filter system
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ // Assemble and return the filter
+ size, _ := backend.BloomStatus()
+
return &Filter{
backend: backend,
- useMipMap: useMipMap,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(size, filters),
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
-// FindOnce searches the blockchain for matching log entries, returning
-// all matching entries from the first block that contains matches,
-// updating the start point of the filter accordingly. If no results are
-// found, a nil slice is returned.
-func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
- head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
- if head == nil {
+// Logs searches the blockchain for matching log entries, returning all from the
+// first block that contains matches, updating the start of the filter accordingly.
+func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
+ // Figure out the limits of the filter range
+ header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
+ if header == nil {
return nil, nil
}
- headBlockNumber := head.Number.Uint64()
+ head := header.Number.Uint64()
- var beginBlockNo uint64 = uint64(f.begin)
if f.begin == -1 {
- beginBlockNo = headBlockNumber
+ f.begin = int64(head)
}
- var endBlockNo uint64 = uint64(f.end)
+ end := uint64(f.end)
if f.end == -1 {
- endBlockNo = headBlockNumber
- }
-
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
+ end = head
}
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
- f.begin = int64(blockNumber + 1)
- return logs, nil
-}
-
-// Run filters logs with the current parameters set
-func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
- for {
- newLogs, err := f.FindOnce(ctx)
- if len(newLogs) == 0 || err != nil {
+ // Gather all indexed logs, and finish with non indexed ones
+ var (
+ logs []*types.Log
+ err error
+ )
+ size, sections := f.backend.BloomStatus()
+ if indexed := sections * size; indexed > uint64(f.begin) {
+ if indexed > end {
+ logs, err = f.indexedLogs(ctx, end)
+ } else {
+ logs, err = f.indexedLogs(ctx, indexed-1)
+ }
+ if err != nil {
return logs, err
}
- logs = append(logs, newLogs...)
}
+ rest, err := f.unindexedLogs(ctx, end)
+ logs = append(logs, rest...)
+ return logs, err
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
- }
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
- }
- }
+// indexedLogs returns the logs matching the filter criteria based on the bloom
+// bits indexed available locally or via the network.
+func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ // Create a matcher session and request servicing from the backend
+ matches := make(chan uint64, 64)
+
+ session, err := f.matcher.Start(uint64(f.begin), end, matches)
+ if err != nil {
+ return nil, err
+ }
+ defer session.Close(time.Second)
+
+ f.backend.ServiceFilter(ctx, session)
+
+ // Iterate over the matches until exhausted or context closed
+ var logs []*types.Log
+
+ for {
+ select {
+ case number, ok := <-matches:
+ // Abort if all matches have been fulfilled
+ if !ok {
+ f.begin = int64(end) + 1
+ return logs, nil
+ }
+ // Retrieve the suggested block and pull any truly matching logs
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
+ if header == nil || err != nil {
+ return logs, err
+ }
+ found, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return logs, err
}
+ logs = append(logs, found...)
+
+ case <-ctx.Done():
+ return logs, ctx.Err()
}
}
-
- return nil, end
}
-func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
- for i := start; i <= end; i++ {
- blockNumber := rpc.BlockNumber(i)
- header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+// indexedLogs returns the logs matching the filter criteria based on raw block
+// iteration and bloom matching.
+func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
+ var logs []*types.Log
+
+ for ; f.begin <= int64(end); f.begin++ {
+ header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
- return logs, end, err
+ return logs, err
}
-
- // Use bloom filtering to see if this block is interesting given the
- // current parameters
- if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if bloomFilter(header.Bloom, f.addresses, f.topics) {
+ found, err := f.checkMatches(ctx, header)
if err != nil {
- return nil, end, err
- }
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ return logs, err
}
+ logs = append(logs, found...)
}
}
+ return logs, nil
+}
- return logs, end, nil
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func includes(addresses []common.Address, a common.Address) bool {
@@ -247,10 +264,6 @@ Logs:
return ret
}
-func (f *Filter) bloomFilter(bloom types.Bloom) bool {
- return bloomFilter(bloom, f.addresses, f.topics)
-}
-
func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
if len(addresses) > 0 {
var included bool
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index ab0b7473e..00ade0ffb 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -54,6 +54,19 @@ const (
LastIndexSubscription
)
+const (
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
+ rmLogsChanSize = 10
+ // logsChanSize is the size of channel listening to LogsEvent.
+ logsChanSize = 10
+ // chainEvChanSize is the size of channel listening to ChainEvent.
+ chainEvChanSize = 10
+)
+
var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
@@ -276,57 +289,50 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
+func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
return
}
- switch e := ev.Data.(type) {
+ switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
- case core.PendingLogsEvent:
- for _, f := range filters[PendingLogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
+ case *event.TypeMuxEvent:
+ switch muxe := e.Data.(type) {
+ case core.PendingLogsEvent:
+ for _, f := range filters[PendingLogsSubscription] {
+ if e.Time.After(f.created) {
+ if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
+ }
}
}
}
case core.TxPreEvent:
for _, f := range filters[PendingTransactionsSubscription] {
- if ev.Time.After(f.created) {
- f.hashes <- e.Tx.Hash()
- }
+ f.hashes <- e.Tx.Hash()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
- if ev.Time.After(f.created) {
- f.headers <- e.Block.Header()
- }
+ f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
- if ev.Time.After(f.created) {
- if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
- f.logs <- matchedLogs
- }
+ if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
+ f.logs <- matchedLogs
}
}
})
@@ -395,9 +401,28 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
var (
index = make(filterIndex)
- sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, []*types.Log{}, core.TxPreEvent{}, core.ChainEvent{})
+ sub = es.mux.Subscribe(core.PendingLogsEvent{})
+ // Subscribe TxPreEvent form txpool
+ txCh = make(chan core.TxPreEvent, txChanSize)
+ txSub = es.backend.SubscribeTxPreEvent(txCh)
+ // Subscribe RemovedLogsEvent
+ rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize)
+ rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh)
+ // Subscribe []*types.Log
+ logsCh = make(chan []*types.Log, logsChanSize)
+ logsSub = es.backend.SubscribeLogsEvent(logsCh)
+ // Subscribe ChainEvent
+ chainEvCh = make(chan core.ChainEvent, chainEvChanSize)
+ chainEvSub = es.backend.SubscribeChainEvent(chainEvCh)
)
+ // Unsubscribe all events
+ defer sub.Unsubscribe()
+ defer txSub.Unsubscribe()
+ defer rmLogsSub.Unsubscribe()
+ defer logsSub.Unsubscribe()
+ defer chainEvSub.Unsubscribe()
+
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
@@ -409,6 +434,17 @@ func (es *EventSystem) eventLoop() {
return
}
es.broadcast(index, ev)
+
+ // Handle subscribed events
+ case ev := <-txCh:
+ es.broadcast(index, ev)
+ case ev := <-rmLogsCh:
+ es.broadcast(index, ev)
+ case ev := <-logsCh:
+ es.broadcast(index, ev)
+ case ev := <-chainEvCh:
+ es.broadcast(index, ev)
+
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
@@ -427,6 +463,16 @@ func (es *EventSystem) eventLoop() {
delete(index[f.typ], f.id)
}
close(f.err)
+
+ // System stopped
+ case <-txSub.Err():
+ return
+ case <-rmLogsSub.Err():
+ return
+ case <-logsSub.Err():
+ return
+ case <-chainEvSub.Err():
+ return
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index 23e6d66e1..664ce07a5 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -20,12 +20,14 @@ import (
"context"
"fmt"
"math/big"
+ "math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -34,8 +36,13 @@ import (
)
type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
+ mux *event.TypeMux
+ db ethdb.Database
+ sections uint64
+ txFeed *event.Feed
+ rmLogsFeed *event.Feed
+ logsFeed *event.Feed
+ chainFeed *event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@@ -64,6 +71,53 @@ func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (t
return core.GetBlockReceipts(b.db, blockHash, num), nil
}
+func (b *testBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return b.txFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
+ return b.rmLogsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
+ return b.logsFeed.Subscribe(ch)
+}
+
+func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
+ return b.chainFeed.Subscribe(ch)
+}
+
+func (b *testBackend) BloomStatus() (uint64, uint64) {
+ return params.BloomBitsBlocks, b.sections
+}
+
+func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
+ requests := make(chan chan *bloombits.Retrieval)
+
+ go session.Multiplex(16, 0, requests)
+ go func() {
+ for {
+ // Wait for a service request or a shutdown
+ select {
+ case <-ctx.Done():
+ return
+
+ case request := <-requests:
+ task := <-request
+
+ task.Bitsets = make([][]byte, len(task.Sections))
+ for i, section := range task.Sections {
+ if rand.Int()%4 != 0 { // Handle occasional missing deliveries
+ head := core.GetCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
+ task.Bitsets[i] = core.GetBloomBits(b.db, task.Bit, section, head)
+ }
+ }
+ request <- task
+ }
+ }
+ }()
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -75,7 +129,11 @@ func TestBlockSubscription(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@@ -114,7 +172,7 @@ func TestBlockSubscription(t *testing.T) {
time.Sleep(1 * time.Second)
for _, e := range chainEvents {
- mux.Post(e)
+ chainFeed.Send(e)
}
<-sub0.Err()
@@ -126,10 +184,14 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -147,9 +209,10 @@ func TestPendingTxFilter(t *testing.T) {
time.Sleep(1 * time.Second)
for _, tx := range transactions {
ev := core.TxPreEvent{Tx: tx}
- mux.Post(ev)
+ txFeed.Send(ev)
}
+ timeout := time.Now().Add(1 * time.Second)
for {
results, err := api.GetFilterChanges(fid0)
if err != nil {
@@ -161,10 +224,18 @@ func TestPendingTxFilter(t *testing.T) {
if len(hashes) >= len(transactions) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
+ if len(hashes) != len(transactions) {
+ t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
+ return
+ }
for i := range hashes {
if hashes[i] != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
@@ -176,10 +247,14 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@@ -221,10 +296,14 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@@ -242,15 +321,19 @@ func TestInvalidLogFilterCreation(t *testing.T) {
}
}
-// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
+// TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -311,8 +394,8 @@ func TestLogFilter(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
- if err := mux.Post(allLogs); err != nil {
- t.Fatal(err)
+ if nsend := logsFeed.Send(allLogs); nsend == 0 {
+ t.Fatal("Shoud have at least one subscription")
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
@@ -320,6 +403,7 @@ func TestLogFilter(t *testing.T) {
for i, tt := range testCases {
var fetched []*types.Log
+ timeout := time.Now().Add(1 * time.Second)
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
@@ -330,6 +414,10 @@ func TestLogFilter(t *testing.T) {
if len(fetched) >= len(tt.expected) {
break
}
+ // check timeout
+ if time.Now().After(timeout) {
+ break
+ }
time.Sleep(100 * time.Millisecond)
}
@@ -350,15 +438,19 @@ func TestLogFilter(t *testing.T) {
}
}
-// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux.
+// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -456,6 +548,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// raise events
time.Sleep(1 * time.Second)
+ // allLogs are type of core.PendingLogsEvent
for _, l := range allLogs {
if err := mux.Post(l); err != nil {
t.Fatal(err)
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index b6cfd4bbc..11235e95a 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -33,7 +33,7 @@ import (
)
func makeReceipt(addr common.Address) *types.Receipt {
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{Address: addr},
}
@@ -41,48 +41,46 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
-func BenchmarkMipmaps(b *testing.B) {
- dir, err := ioutil.TempDir("", "mipmap")
+func BenchmarkFilters(b *testing.B) {
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
b.Fatal(err)
}
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- addr2 = common.BytesToAddress([]byte("jeff"))
- addr3 = common.BytesToAddress([]byte("ethereum"))
- addr4 = common.BytesToAddress([]byte("random addresses please"))
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr1 = crypto.PubkeyToAddress(key1.PublicKey)
+ addr2 = common.BytesToAddress([]byte("jeff"))
+ addr3 = common.BytesToAddress([]byte("ethereum"))
+ addr4 = common.BytesToAddress([]byte("random addresses please"))
)
defer db.Close()
genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 100010, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 2403:
receipt := makeReceipt(addr1)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 1034:
receipt := makeReceipt(addr2)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 34:
receipt := makeReceipt(addr3)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 99999:
receipt := makeReceipt(addr4)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
}
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -98,13 +96,10 @@ func BenchmarkMipmaps(b *testing.B) {
}
b.ResetTimer()
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
b.Fatal("expected 4 logs, got", len(logs))
}
@@ -112,18 +107,22 @@ func BenchmarkMipmaps(b *testing.B) {
}
func TestFilters(t *testing.T) {
- dir, err := ioutil.TempDir("", "mipmap")
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
var (
- db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
- mux = new(event.TypeMux)
- backend = &testBackend{mux, db}
- key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
- addr = crypto.PubkeyToAddress(key1.PublicKey)
+ db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
+ txFeed = new(event.Feed)
+ rmLogsFeed = new(event.Feed)
+ logsFeed = new(event.Feed)
+ chainFeed = new(event.Feed)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ addr = crypto.PubkeyToAddress(key1.PublicKey)
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))
@@ -134,10 +133,9 @@ func TestFilters(t *testing.T) {
genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 1000, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 1:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -145,9 +143,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 2:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -155,9 +152,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 998:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -165,9 +161,8 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 999:
- receipt := types.NewReceipt(nil, new(big.Int))
+ receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
{
Address: addr,
@@ -175,12 +170,7 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
}
- // i is used as block number for the writes but since the i
- // starts at 0 and block 0 (genesis) is already present increment
- // by one
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -195,23 +185,15 @@ func TestFilters(t *testing.T) {
}
}
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash1, hash2, hash3, hash4}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
- logs, _ := filter.Find(context.Background())
+ logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(900)
- filter.SetEndBlock(999)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -219,12 +201,8 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(990)
- filter.SetEndBlock(-1)
- logs, _ = filter.Find(context.Background())
+ filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
}
@@ -232,44 +210,32 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{hash1, hash2}})
- filter.SetBeginBlock(1)
- filter.SetEndBlock(10)
+ filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 2 {
t.Error("expected 2 log, got", len(logs))
}
failHash := common.BytesToHash([]byte("fail"))
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
failAddr := common.BytesToAddress([]byte("failmenow"))
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{failAddr})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}, {hash1}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
- logs, _ = filter.Find(context.Background())
+ logs, _ = filter.Logs(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
diff --git a/eth/gen_config.go b/eth/gen_config.go
index 477479419..4a4cd7b9c 100644
--- a/eth/gen_config.go
+++ b/eth/gen_config.go
@@ -47,7 +47,6 @@ func (c Config) MarshalTOML() (interface{}, error) {
enc.SyncMode = c.SyncMode
enc.LightServ = c.LightServ
enc.LightPeers = c.LightPeers
- enc.MaxPeers = c.MaxPeers
enc.SkipBcVersionCheck = c.SkipBcVersionCheck
enc.DatabaseHandles = c.DatabaseHandles
enc.DatabaseCache = c.DatabaseCache
@@ -119,9 +118,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
if dec.LightPeers != nil {
c.LightPeers = *dec.LightPeers
}
- if dec.MaxPeers != nil {
- c.MaxPeers = *dec.MaxPeers
- }
if dec.SkipBcVersionCheck != nil {
c.SkipBcVersionCheck = *dec.SkipBcVersionCheck
}
diff --git a/eth/handler.go b/eth/handler.go
index 6c6449340..cee719ddb 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -45,6 +45,10 @@ import (
const (
softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
)
var (
@@ -78,7 +82,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txSub *event.TypeMuxSubscription
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@@ -94,7 +99,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, maxPeers int, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
@@ -103,7 +108,6 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
blockchain: blockchain,
chaindb: chaindb,
chainconfig: config,
- maxPeers: maxPeers,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -198,10 +202,14 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) Start() {
+func (pm *ProtocolManager) Start(maxPeers int) {
+ pm.maxPeers = maxPeers
+
// broadcast transactions
- pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ pm.txCh = make(chan core.TxPreEvent, txChanSize)
+ pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
+
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
@@ -601,7 +609,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
- if !pm.blockchain.HasBlock(block.Hash) {
+ if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
@@ -688,9 +696,10 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
peer.SendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ return
}
// Otherwise if the block is indeed in out own chain, announce it
- if pm.blockchain.HasBlock(hash) {
+ if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
}
@@ -723,10 +732,15 @@ func (self *ProtocolManager) minedBroadcastLoop() {
}
func (self *ProtocolManager) txBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.txSub.Chan() {
- event := obj.Data.(core.TxPreEvent)
- self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ for {
+ select {
+ case event := <-self.txCh:
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-self.txSub.Err():
+ return
+ }
}
}
diff --git a/eth/handler_test.go b/eth/handler_test.go
index ca9c9e1b4..6752cd2a8 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -474,13 +474,13 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
gspec = &core.Genesis{Config: config}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, config, pow, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, config, pow, vm.Config{})
)
- pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db)
+ pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
- pm.Start()
+ pm.Start(1000)
defer pm.Stop()
// Connect a new peer and check that we receive the DAO challenge
diff --git a/eth/helper_test.go b/eth/helper_test.go
index 546478a3e..b66553135 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -59,18 +59,18 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
}
genesis = gspec.MustCommit(db)
- blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, evmux, vm.Config{})
+ blockchain, _ = core.NewBlockChain(db, gspec.Config, engine, vm.Config{})
)
chain, _ := core.GenerateChain(gspec.Config, genesis, db, blocks, generator)
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, 1000, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
if err != nil {
return nil, err
}
- pm.Start()
+ pm.Start(1000)
return pm, nil
}
@@ -88,8 +88,9 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
- pool []*types.Transaction // Collection of all transactions
- added chan<- []*types.Transaction // Notification channel for new transactions
+ txFeed event.Feed
+ pool []*types.Transaction // Collection of all transactions
+ added chan<- []*types.Transaction // Notification channel for new transactions
lock sync.RWMutex // Protects the transaction pool
}
@@ -124,6 +125,10 @@ func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
return batches, nil
}
+func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
+ return p.txFeed.Subscribe(ch)
+}
+
// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), big.NewInt(100000), big.NewInt(0), make([]byte, datasize))
diff --git a/eth/protocol.go b/eth/protocol.go
index 376e4663e..2c41376fa 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -22,7 +22,9 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -100,6 +102,10 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending() (map[common.Address]types.Transactions, error)
+
+ // SubscribeTxPreEvent should return an event subscription of
+ // TxPreEvent and send events to the given channel.
+ SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
}
// statusData is the network packet for the status message.
diff --git a/eth/sync.go b/eth/sync.go
index 7442f912c..a8ae64617 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -188,7 +188,17 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}
- if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
+ // Run the sync cycle, and disable fast sync if we've went past the pivot block
+ err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
+
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Disable fast sync if we indeed have something in our chain
+ if pm.blockchain.CurrentBlock().NumberU64() > 0 {
+ log.Info("Fast sync complete, auto disabling")
+ atomic.StoreUint32(&pm.fastSync, 0)
+ }
+ }
+ if err != nil {
return
}
atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
@@ -201,12 +211,4 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
// more reliably update peers or the local TD state.
go pm.BroadcastBlock(head, false)
}
- // If fast sync was enabled, and we synced up, disable it
- if atomic.LoadUint32(&pm.fastSync) == 1 {
- // Disable fast sync if we indeed have something in our chain
- if pm.blockchain.CurrentBlock().NumberU64() > 0 {
- log.Info("Fast sync complete, auto disabling")
- atomic.StoreUint32(&pm.fastSync, 0)
- }
- }
}