aboutsummaryrefslogtreecommitdiffstats
path: root/eth
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2017-08-19 03:52:20 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-06 16:13:13 +0800
commit4ea4d2dc3473afd9d2eda6ef6b359accce1f0946 (patch)
treee651cfc2e3aa36083b333bf34dc3cccef2623f26 /eth
parent1e67378df879b1ce566f17dd95a3b126056254b5 (diff)
downloaddexon-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.gz
dexon-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.tar.zst
dexon-4ea4d2dc3473afd9d2eda6ef6b359accce1f0946.zip
core, eth: add bloombit indexer, filter based on it
Diffstat (limited to 'eth')
-rw-r--r--eth/api_backend.go27
-rw-r--r--eth/backend.go10
-rw-r--r--eth/backend_test.go74
-rw-r--r--eth/db_upgrade.go70
-rw-r--r--eth/filters/api.go48
-rw-r--r--eth/filters/bench_test.go237
-rw-r--r--eth/filters/filter.go232
-rw-r--r--eth/filters/filter_system_test.go50
-rw-r--r--eth/filters/filter_test.go70
-rw-r--r--eth/handler.go5
10 files changed, 534 insertions, 289 deletions
diff --git a/eth/api_backend.go b/eth/api_backend.go
index 19ef79f23..fa3cf3f80 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -194,3 +195,29 @@ func (b *EthApiBackend) EventMux() *event.TypeMux {
func (b *EthApiBackend) AccountManager() *accounts.Manager {
return b.eth.AccountManager()
}
+
+func (b *EthApiBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
+ results := make([][]byte, len(sectionIdxList))
+ var err error
+ for i, sectionIdx := range sectionIdxList {
+ sectionHead := core.GetCanonicalHash(b.eth.chainDb, (sectionIdx+1)*bloomBitsSection-1)
+ results[i], err = core.GetBloomBits(b.eth.chainDb, bitIdx, sectionIdx, sectionHead)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return results, nil
+}
+
+func (b *EthApiBackend) BloomBitsSections() uint64 {
+ sections, _, _ := b.eth.bbIndexer.Sections()
+ return sections
+}
+
+func (b *EthApiBackend) BloomBitsConfig() filters.BloomConfig {
+ return filters.BloomConfig{
+ SectionSize: bloomBitsSection,
+ MaxRequestLen: 16,
+ MaxRequestWait: 0,
+ }
+}
diff --git a/eth/backend.go b/eth/backend.go
index 5837c8564..efc0a2317 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -77,6 +77,8 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
+ bbIndexer *core.ChainIndexer
+
ApiBackend *EthApiBackend
miner *miner.Miner
@@ -125,11 +127,9 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
networkId: config.NetworkId,
gasPrice: config.GasPrice,
etherbase: config.Etherbase,
+ bbIndexer: NewBloomBitsProcessor(chainDb, bloomBitsSection),
}
- if err := addMipmapBloomBins(chainDb); err != nil {
- return nil, err
- }
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck {
@@ -151,6 +151,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
+ eth.bbIndexer.Start(eth.blockchain)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
@@ -260,7 +261,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Version: "1.0",
- Service: filters.NewPublicFilterAPI(s.ApiBackend, false),
+ Service: filters.NewPublicFilterAPI(s.ApiBackend, false, bloomBitsSection),
Public: true,
}, {
Namespace: "admin",
@@ -389,6 +390,7 @@ func (s *Ethereum) Stop() error {
if s.stopDbUpgrade != nil {
s.stopDbUpgrade()
}
+ s.bbIndexer.Close()
s.blockchain.Stop()
s.protocolManager.Stop()
if s.lesServer != nil {
diff --git a/eth/backend_test.go b/eth/backend_test.go
deleted file mode 100644
index 1fd25e95a..000000000
--- a/eth/backend_test.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package eth
-
-import (
- "math/big"
- "testing"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/params"
-)
-
-func TestMipmapUpgrade(t *testing.T) {
- db, _ := ethdb.NewMemDatabase()
- addr := common.BytesToAddress([]byte("jeff"))
- genesis := new(core.Genesis).MustCommit(db)
-
- chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {
- switch i {
- case 1:
- receipt := types.NewReceipt(nil, false, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- case 2:
- receipt := types.NewReceipt(nil, false, new(big.Int))
- receipt.Logs = []*types.Log{{Address: addr}}
- gen.AddUncheckedReceipt(receipt)
- }
- })
- for i, block := range chain {
- core.WriteBlock(db, block)
- if err := core.WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteHeadBlockHash(db, block.Hash()); err != nil {
- t.Fatalf("failed to insert block number: %v", err)
- }
- if err := core.WriteBlockReceipts(db, block.Hash(), block.NumberU64(), receipts[i]); err != nil {
- t.Fatal("error writing block receipts:", err)
- }
- }
-
- err := addMipmapBloomBins(db)
- if err != nil {
- t.Fatal(err)
- }
-
- bloom := core.GetMipmapBloom(db, 1, core.MIPMapLevels[0])
- if (bloom == types.Bloom{}) {
- t.Error("got empty bloom filter")
- }
-
- data, _ := db.Get([]byte("setting-mipmap-version"))
- if len(data) == 0 {
- t.Error("setting-mipmap-version not written to database")
- }
-}
diff --git a/eth/db_upgrade.go b/eth/db_upgrade.go
index 90111b2b3..ce8ce699a 100644
--- a/eth/db_upgrade.go
+++ b/eth/db_upgrade.go
@@ -19,11 +19,13 @@ package eth
import (
"bytes"
- "fmt"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
@@ -135,45 +137,37 @@ func upgradeDeduplicateData(db ethdb.Database) func() error {
}
}
-func addMipmapBloomBins(db ethdb.Database) (err error) {
- const mipmapVersion uint = 2
-
- // check if the version is set. We ignore data for now since there's
- // only one version so we can easily ignore it for now
- var data []byte
- data, _ = db.Get([]byte("setting-mipmap-version"))
- if len(data) > 0 {
- var version uint
- if err := rlp.DecodeBytes(data, &version); err == nil && version == mipmapVersion {
- return nil
- }
- }
+// BloomBitsIndex implements ChainIndex
+type BloomBitsIndex struct {
+ db ethdb.Database
+ bc *bloombits.BloomBitsCreator
+ section, sectionSize uint64
+ sectionHead common.Hash
+}
- defer func() {
- if err == nil {
- var val []byte
- val, err = rlp.EncodeToBytes(mipmapVersion)
- if err == nil {
- err = db.Put([]byte("setting-mipmap-version"), val)
- }
- return
- }
- }()
- latestHash := core.GetHeadBlockHash(db)
- latestBlock := core.GetBlock(db, latestHash, core.GetBlockNumber(db, latestHash))
- if latestBlock == nil { // clean database
- return
- }
+// number of confirmation blocks before a section is considered probably final and its bloom bits are calculated
+const bloomBitsConfirmations = 256
- tstart := time.Now()
- log.Warn("Upgrading db log bloom bins")
- for i := uint64(0); i <= latestBlock.NumberU64(); i++ {
- hash := core.GetCanonicalHash(db, i)
- if (hash == common.Hash{}) {
- return fmt.Errorf("chain db corrupted. Could not find block %d.", i)
- }
- core.WriteMipmapBloom(db, i, core.GetBlockReceipts(db, hash, i))
+// NewBloomBitsProcessor returns a chain processor that generates bloom bits data for the canonical chain
+func NewBloomBitsProcessor(db ethdb.Database, sectionSize uint64) *core.ChainIndexer {
+ backend := &BloomBitsIndex{db: db, sectionSize: sectionSize}
+ return core.NewChainIndexer(db, ethdb.NewTable(db, "bbIndex-"), backend, sectionSize, bloomBitsConfirmations, time.Millisecond*100, "bloombits")
+}
+
+func (b *BloomBitsIndex) Reset(section uint64, lastSectionHead common.Hash) {
+ b.bc = bloombits.NewBloomBitsCreator(b.sectionSize)
+ b.section = section
+}
+
+func (b *BloomBitsIndex) Process(header *types.Header) {
+ b.bc.AddHeaderBloom(header.Bloom)
+ b.sectionHead = header.Hash()
+}
+
+func (b *BloomBitsIndex) Commit() error {
+ for i := 0; i < bloombits.BloomLength; i++ {
+ compVector := bitutil.CompressBytes(b.bc.GetBitVector(uint(i)))
+ core.StoreBloomBits(b.db, uint64(i), b.section, b.sectionHead, compVector)
}
- log.Info("Bloom-bin upgrade completed", "elapsed", common.PrettyDuration(time.Since(tstart)))
return nil
}
diff --git a/eth/filters/api.go b/eth/filters/api.go
index fff58a268..11767753e 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -51,24 +51,25 @@ type filter struct {
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
- backend Backend
- useMipMap bool
- mux *event.TypeMux
- chainDb ethdb.Database
- events *EventSystem
- filtersMu sync.Mutex
- filters map[rpc.ID]*filter
+ backend Backend
+ bloomBitsSection uint64
+ mux *event.TypeMux
+ quit chan struct{}
+ chainDb ethdb.Database
+ events *EventSystem
+ filtersMu sync.Mutex
+ filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
-func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
+func NewPublicFilterAPI(backend Backend, lightMode bool, bloomBitsSection uint64) *PublicFilterAPI {
api := &PublicFilterAPI{
- backend: backend,
- useMipMap: !lightMode,
- mux: backend.EventMux(),
- chainDb: backend.ChainDb(),
- events: NewEventSystem(backend.EventMux(), backend, lightMode),
- filters: make(map[rpc.ID]*filter),
+ backend: backend,
+ bloomBitsSection: bloomBitsSection,
+ mux: backend.EventMux(),
+ chainDb: backend.ChainDb(),
+ events: NewEventSystem(backend.EventMux(), backend, lightMode),
+ filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()
@@ -332,11 +333,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
- filter := New(api.backend, api.useMipMap)
- filter.SetBeginBlock(crit.FromBlock.Int64())
- filter.SetEndBlock(crit.ToBlock.Int64())
- filter.SetAddresses(crit.Addresses)
- filter.SetTopics(crit.Topics)
+ filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
logs, err := filter.Find(ctx)
return returnLogs(logs), err
@@ -372,19 +369,18 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
return nil, fmt.Errorf("filter not found")
}
- filter := New(api.backend, api.useMipMap)
+ var begin, end int64
if f.crit.FromBlock != nil {
- filter.SetBeginBlock(f.crit.FromBlock.Int64())
+ begin = f.crit.FromBlock.Int64()
} else {
- filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
+ begin = rpc.LatestBlockNumber.Int64()
}
if f.crit.ToBlock != nil {
- filter.SetEndBlock(f.crit.ToBlock.Int64())
+ end = f.crit.ToBlock.Int64()
} else {
- filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
+ end = rpc.LatestBlockNumber.Int64()
}
- filter.SetAddresses(f.crit.Addresses)
- filter.SetTopics(f.crit.Topics)
+ filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
logs, err := filter.Find(ctx)
if err != nil {
diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go
new file mode 100644
index 000000000..2487bc0eb
--- /dev/null
+++ b/eth/filters/bench_test.go
@@ -0,0 +1,237 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package filters
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/golang/snappy"
+)
+
+func BenchmarkBloomBits512(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 512)
+}
+
+func BenchmarkBloomBits1k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 1024)
+}
+
+func BenchmarkBloomBits2k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 2048)
+}
+
+func BenchmarkBloomBits4k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 4096)
+}
+
+func BenchmarkBloomBits8k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 8192)
+}
+
+func BenchmarkBloomBits16k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 16384)
+}
+
+func BenchmarkBloomBits32k(b *testing.B) {
+ benchmarkBloomBitsForSize(b, 32768)
+}
+
+func benchmarkBloomBitsForSize(b *testing.B, sectionSize uint64) {
+ benchmarkBloomBits(b, sectionSize, 0)
+ benchmarkBloomBits(b, sectionSize, 1)
+ benchmarkBloomBits(b, sectionSize, 2)
+}
+
+const benchFilterCnt = 2000
+
+func benchmarkBloomBits(b *testing.B, sectionSize uint64, comp int) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running bloombits benchmark section size:", sectionSize, " compression method:", comp)
+
+ var (
+ compressFn func([]byte) []byte
+ decompressFn func([]byte, int) ([]byte, error)
+ )
+ switch comp {
+ case 0:
+ // no compression
+ compressFn = func(data []byte) []byte {
+ return data
+ }
+ decompressFn = func(data []byte, target int) ([]byte, error) {
+ if len(data) != target {
+ panic(nil)
+ }
+ return data, nil
+ }
+ case 1:
+ // bitutil/compress.go
+ compressFn = bitutil.CompressBytes
+ decompressFn = bitutil.DecompressBytes
+ case 2:
+ // go snappy
+ compressFn = func(data []byte) []byte {
+ return snappy.Encode(nil, data)
+ }
+ decompressFn = func(data []byte, target int) ([]byte, error) {
+ decomp, err := snappy.Decode(nil, data)
+ if err != nil || len(decomp) != target {
+ panic(err)
+ }
+ return decomp, nil
+ }
+ }
+
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+
+ clearBloomBits(db)
+ fmt.Println("Generating bloombits data...")
+ headNum := core.GetBlockNumber(db, head)
+ if headNum < sectionSize+512 {
+ b.Fatalf("not enough blocks for running a benchmark")
+ }
+
+ start := time.Now()
+ cnt := (headNum - 512) / sectionSize
+ var dataSize, compSize uint64
+ for sectionIdx := uint64(0); sectionIdx < cnt; sectionIdx++ {
+ bc := bloombits.NewBloomBitsCreator(sectionSize)
+ var header *types.Header
+ for i := sectionIdx * sectionSize; i < (sectionIdx+1)*sectionSize; i++ {
+ hash := core.GetCanonicalHash(db, i)
+ header = core.GetHeader(db, hash, i)
+ if header == nil {
+ b.Fatalf("Error creating bloomBits data")
+ }
+ bc.AddHeaderBloom(header.Bloom)
+ }
+ sectionHead := core.GetCanonicalHash(db, (sectionIdx+1)*sectionSize-1)
+ for i := 0; i < bloombits.BloomLength; i++ {
+ data := bc.GetBitVector(uint(i))
+ comp := compressFn(data)
+ dataSize += uint64(len(data))
+ compSize += uint64(len(comp))
+ core.StoreBloomBits(db, uint64(i), sectionIdx, sectionHead, comp)
+ }
+ //if sectionIdx%50 == 0 {
+ // fmt.Println(" section", sectionIdx, "/", cnt)
+ //}
+ }
+
+ d := time.Since(start)
+ fmt.Println("Finished generating bloombits data")
+ fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block")
+ fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize))
+
+ fmt.Println("Running filter benchmarks...")
+ start = time.Now()
+ mux := new(event.TypeMux)
+ var backend *testBackend
+
+ for i := 0; i < benchFilterCnt; i++ {
+ if i%20 == 0 {
+ db.Close()
+ db, _ = ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ }
+ var addr common.Address
+ addr[0] = byte(i)
+ addr[1] = byte(i / 256)
+ filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil)
+ filter.decompress = decompressFn
+ if _, err := filter.Find(context.Background()); err != nil {
+ b.Error("filter.Find error:", err)
+ }
+ }
+ d = time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks")
+ db.Close()
+}
+
+func forEachKey(db ethdb.Database, startPrefix, endPrefix []byte, fn func(key []byte)) {
+ it := db.(*ethdb.LDBDatabase).NewIterator()
+ it.Seek(startPrefix)
+ for it.Valid() {
+ key := it.Key()
+ cmpLen := len(key)
+ if len(endPrefix) < cmpLen {
+ cmpLen = len(endPrefix)
+ }
+ if bytes.Compare(key[:cmpLen], endPrefix) == 1 {
+ break
+ }
+ fn(common.CopyBytes(key))
+ it.Next()
+ }
+ it.Release()
+}
+
+var bloomBitsPrefix = []byte("bloomBits-")
+
+func clearBloomBits(db ethdb.Database) {
+ fmt.Println("Clearing bloombits data...")
+ forEachKey(db, bloomBitsPrefix, bloomBitsPrefix, func(key []byte) {
+ db.Delete(key)
+ })
+}
+
+func BenchmarkNoBloomBits(b *testing.B) {
+ benchDataDir := node.DefaultDataDir() + "/geth/chaindata"
+ fmt.Println("Running benchmark without bloombits")
+ db, err := ethdb.NewLDBDatabase(benchDataDir, 128, 1024)
+ if err != nil {
+ b.Fatalf("error opening database at %v: %v", benchDataDir, err)
+ }
+ head := core.GetHeadBlockHash(db)
+ if head == (common.Hash{}) {
+ b.Fatalf("chain data not found at %v", benchDataDir)
+ }
+ headNum := core.GetBlockNumber(db, head)
+
+ clearBloomBits(db)
+
+ fmt.Println("Running filter benchmarks...")
+ start := time.Now()
+ mux := new(event.TypeMux)
+ backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
+ filter := New(backend, 0, int64(headNum), []common.Address{common.Address{}}, nil)
+ filter.Find(context.Background())
+ d := time.Since(start)
+ fmt.Println("Finished running filter benchmarks")
+ fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks")
+ db.Close()
+}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index f848bc6af..ea9ccf2f9 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -18,11 +18,14 @@ package filters
import (
"context"
- "math"
"math/big"
+ "sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
@@ -34,58 +37,51 @@ type Backend interface {
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
+ BloomBitsSections() uint64
+ BloomBitsConfig() BloomConfig
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
+ GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error)
+}
+
+type BloomConfig struct {
+ SectionSize uint64
+ MaxRequestLen int
+ MaxRequestWait time.Duration
}
// Filter can be used to retrieve and filter logs.
type Filter struct {
- backend Backend
- useMipMap bool
+ backend Backend
+ bloomBitsConfig BloomConfig
db ethdb.Database
begin, end int64
addresses []common.Address
topics [][]common.Hash
+
+ decompress func([]byte, int) ([]byte, error)
+ matcher *bloombits.Matcher
}
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
-// MipMaps allow past blocks to be searched much more efficiently, but are not available
-// to light clients.
-func New(backend Backend, useMipMap bool) *Filter {
+func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
return &Filter{
- backend: backend,
- useMipMap: useMipMap,
- db: backend.ChainDb(),
+ backend: backend,
+ begin: begin,
+ end: end,
+ addresses: addresses,
+ topics: topics,
+ bloomBitsConfig: backend.BloomBitsConfig(),
+ db: backend.ChainDb(),
+ matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics),
+ decompress: bitutil.DecompressBytes,
}
}
-// SetBeginBlock sets the earliest block for filtering.
-// -1 = latest block (i.e., the current block)
-// hash = particular hash from-to
-func (f *Filter) SetBeginBlock(begin int64) {
- f.begin = begin
-}
-
-// SetEndBlock sets the latest block for filtering.
-func (f *Filter) SetEndBlock(end int64) {
- f.end = end
-}
-
-// SetAddresses matches only logs that are generated from addresses that are included
-// in the given addresses.
-func (f *Filter) SetAddresses(addr []common.Address) {
- f.addresses = addr
-}
-
-// SetTopics matches only logs that have topics matching the given topics.
-func (f *Filter) SetTopics(topics [][]common.Hash) {
- f.topics = topics
-}
-
// FindOnce searches the blockchain for matching log entries, returning
// all matching entries from the first block that contains matches,
// updating the start point of the filter accordingly. If no results are
@@ -106,18 +102,9 @@ func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) {
endBlockNo = headBlockNumber
}
- // if no addresses are present we can't make use of fast search which
- // uses the mipmap bloom filters to check for fast inclusion and uses
- // higher range probability in order to ensure at least a false positive
- if !f.useMipMap || len(f.addresses) == 0 {
- logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
- f.begin = int64(blockNumber + 1)
- return logs, err
- }
-
- logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0)
+ logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo)
f.begin = int64(blockNumber + 1)
- return logs, nil
+ return logs, err
}
// Run filters logs with the current parameters set
@@ -131,43 +118,134 @@ func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) {
}
}
-func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) {
- level := core.MIPMapLevels[depth]
- // normalise numerator so we can work in level specific batches and
- // work with the proper range checks
- for num := start / level * level; num <= end; num += level {
- // find addresses in bloom filters
- bloom := core.GetMipmapBloom(f.db, num, level)
- // Don't bother checking the first time through the loop - we're probably picking
- // up where a previous run left off.
- first := true
- for _, addr := range f.addresses {
- if first || bloom.TestBytes(addr[:]) {
- first = false
- // range check normalised values and make sure that
- // we're resolving the correct range instead of the
- // normalised values.
- start := uint64(math.Max(float64(num), float64(start)))
- end := uint64(math.Min(float64(num+level-1), float64(end)))
- if depth+1 == len(core.MIPMapLevels) {
- l, blockNumber, _ := f.getLogs(context.Background(), start, end)
- if len(l) > 0 {
- return l, blockNumber
+// nextRequest returns the next request to retrieve for the bloombits matcher
+func (f *Filter) nextRequest() (bloombits uint, sections []uint64) {
+ bloomIndex, ok := f.matcher.AllocSectionQueue()
+ if !ok {
+ return 0, nil
+ }
+ if f.bloomBitsConfig.MaxRequestWait > 0 &&
+ (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc
+ f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) {
+ time.Sleep(f.bloomBitsConfig.MaxRequestWait)
+ }
+ return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen)
+}
+
+// serveMatcher serves the bloombits matcher by fetching the requested vectors
+// through the filter backend
+func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error {
+ errChn := make(chan error, 1)
+ wg.Add(10)
+ for i := 0; i < 10; i++ {
+ go func(i int) {
+ defer wg.Done()
+
+ for {
+ b, s := f.nextRequest()
+ if s == nil {
+ return
+ }
+ data, err := f.backend.GetBloomBits(ctx, uint64(b), s)
+ if err != nil {
+ select {
+ case errChn <- err:
+ case <-stop:
}
- } else {
- l, blockNumber := f.mipFind(start, end, depth+1)
- if len(l) > 0 {
- return l, blockNumber
+ return
+ }
+ decomp := make([][]byte, len(data))
+ for i, d := range data {
+ var err error
+ if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil {
+ select {
+ case errChn <- err:
+ case <-stop:
+ }
+ return
}
}
+ f.matcher.Deliver(b, s, decomp)
}
- }
+ }(i)
}
- return nil, end
+ return errChn
+}
+
+// checkMatches checks if the receipts belonging to the given header contain any log events that
+// match the filter criteria. This function is called when the bloom filter signals a potential match.
+func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
+ // Get the logs of the block
+ receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ if err != nil {
+ return nil, err
+ }
+ var unfiltered []*types.Log
+ for _, receipt := range receipts {
+ unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
+ }
+ logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
+ if len(logs) > 0 {
+ return logs, nil
+ }
+ return nil, nil
}
func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) {
+ haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize
+ if haveBloomBitsBefore > start {
+ e := end
+ if haveBloomBitsBefore <= e {
+ e = haveBloomBitsBefore - 1
+ }
+
+ stop := make(chan struct{})
+ var wg sync.WaitGroup
+ matches := f.matcher.Start(start, e)
+ errChn := f.serveMatcher(ctx, stop, &wg)
+
+ defer func() {
+ f.matcher.Stop()
+ close(stop)
+ wg.Wait()
+ }()
+
+ loop:
+ for {
+ select {
+ case i, ok := <-matches:
+ if !ok {
+ break loop
+ }
+
+ blockNumber := rpc.BlockNumber(i)
+ header, err := f.backend.HeaderByNumber(ctx, blockNumber)
+ if header == nil || err != nil {
+ return logs, end, err
+ }
+
+ logs, err := f.checkMatches(ctx, header)
+ if err != nil {
+ return nil, end, err
+ }
+ if logs != nil {
+ return logs, i, nil
+ }
+ case err := <-errChn:
+ return logs, end, err
+ case <-ctx.Done():
+ return nil, end, ctx.Err()
+ }
+ }
+
+ if end < haveBloomBitsBefore {
+ return logs, end, nil
+ }
+ start = haveBloomBitsBefore
+ }
+
+ // search the rest with regular block-by-block bloom filtering
for i := start; i <= end; i++ {
blockNumber := rpc.BlockNumber(i)
header, err := f.backend.HeaderByNumber(ctx, blockNumber)
@@ -178,18 +256,12 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.
// Use bloom filtering to see if this block is interesting given the
// current parameters
if f.bloomFilter(header.Bloom) {
- // Get the logs of the block
- receipts, err := f.backend.GetReceipts(ctx, header.Hash())
+ logs, err := f.checkMatches(ctx, header)
if err != nil {
return nil, end, err
}
- var unfiltered []*types.Log
- for _, receipt := range receipts {
- unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...)
- }
- logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
- if len(logs) > 0 {
- return logs, uint64(blockNumber), nil
+ if logs != nil {
+ return logs, i, nil
}
}
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index fcc888b8c..140fad555 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -36,6 +36,7 @@ import (
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
+ sections uint64
txFeed *event.Feed
rmLogsFeed *event.Feed
logsFeed *event.Feed
@@ -84,6 +85,31 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
+func (b *testBackend) GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) {
+ results := make([][]byte, len(sectionIdxList))
+ var err error
+ for i, sectionIdx := range sectionIdxList {
+ sectionHead := core.GetCanonicalHash(b.db, (sectionIdx+1)*testBloomBitsSection-1)
+ results[i], err = core.GetBloomBits(b.db, bitIdx, sectionIdx, sectionHead)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return results, nil
+}
+
+func (b *testBackend) BloomBitsSections() uint64 {
+ return b.sections
+}
+
+func (b *testBackend) BloomBitsConfig() BloomConfig {
+ return BloomConfig{
+ SectionSize: testBloomBitsSection,
+ MaxRequestLen: 16,
+ MaxRequestWait: 0,
+ }
+}
+
// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
// It creates multiple subscriptions:
// - one at the start and should receive all posted chain events and a second (blockHashes)
@@ -99,8 +125,8 @@ func TestBlockSubscription(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
genesis = new(core.Genesis).MustCommit(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
@@ -156,8 +182,8 @@ func TestPendingTxFilter(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -219,8 +245,8 @@ func TestLogFilterCreation(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
testCases = []struct {
crit FilterCriteria
@@ -268,8 +294,8 @@ func TestInvalidLogFilterCreation(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
)
// different situations where log filter creation should fail.
@@ -298,8 +324,8 @@ func TestLogFilter(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@@ -415,8 +441,8 @@ func TestPendingLogsSubscription(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ api = NewPublicFilterAPI(backend, false, 0)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index cf508a218..f1c6481d7 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -32,6 +32,8 @@ import (
"github.com/ethereum/go-ethereum/params"
)
+const testBloomBitsSection = 4096
+
func makeReceipt(addr common.Address) *types.Receipt {
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -41,8 +43,8 @@ func makeReceipt(addr common.Address) *types.Receipt {
return receipt
}
-func BenchmarkMipmaps(b *testing.B) {
- dir, err := ioutil.TempDir("", "mipmap")
+func BenchmarkFilters(b *testing.B) {
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
b.Fatal(err)
}
@@ -55,7 +57,7 @@ func BenchmarkMipmaps(b *testing.B) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
@@ -66,27 +68,21 @@ func BenchmarkMipmaps(b *testing.B) {
genesis := core.GenesisBlockForTesting(db, addr1, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 100010, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 2403:
receipt := makeReceipt(addr1)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 1034:
receipt := makeReceipt(addr2)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 34:
receipt := makeReceipt(addr3)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
case 99999:
receipt := makeReceipt(addr4)
- receipts = types.Receipts{receipt}
gen.AddUncheckedReceipt(receipt)
}
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -102,10 +98,7 @@ func BenchmarkMipmaps(b *testing.B) {
}
b.ResetTimer()
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
for i := 0; i < b.N; i++ {
logs, _ := filter.Find(context.Background())
@@ -116,7 +109,7 @@ func BenchmarkMipmaps(b *testing.B) {
}
func TestFilters(t *testing.T) {
- dir, err := ioutil.TempDir("", "mipmap")
+ dir, err := ioutil.TempDir("", "filtertest")
if err != nil {
t.Fatal(err)
}
@@ -129,7 +122,7 @@ func TestFilters(t *testing.T) {
rmLogsFeed = new(event.Feed)
logsFeed = new(event.Feed)
chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, txFeed, rmLogsFeed, logsFeed, chainFeed}
+ backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
@@ -142,7 +135,6 @@ func TestFilters(t *testing.T) {
genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, db, 1000, func(i int, gen *core.BlockGen) {
- var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, false, new(big.Int))
@@ -153,7 +145,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 2:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -163,7 +154,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 998:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -173,7 +163,6 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
case 999:
receipt := types.NewReceipt(nil, false, new(big.Int))
receipt.Logs = []*types.Log{
@@ -183,12 +172,7 @@ func TestFilters(t *testing.T) {
},
}
gen.AddUncheckedReceipt(receipt)
- receipts = types.Receipts{receipt}
}
- // i is used as block number for the writes but since the i
- // starts at 0 and block 0 (genesis) is already present increment
- // by one
- core.WriteMipmapBloom(db, uint64(i+1), receipts)
})
for i, block := range chain {
core.WriteBlock(db, block)
@@ -203,22 +187,14 @@ func TestFilters(t *testing.T) {
}
}
- filter := New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash1, hash2, hash3, hash4}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
logs, _ := filter.Find(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(900)
- filter.SetEndBlock(999)
+ filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}})
logs, _ = filter.Find(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
@@ -227,11 +203,7 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{addr})
- filter.SetTopics([][]common.Hash{{hash3}})
- filter.SetBeginBlock(990)
- filter.SetEndBlock(-1)
+ filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}})
logs, _ = filter.Find(context.Background())
if len(logs) != 1 {
t.Error("expected 1 log, got", len(logs))
@@ -240,10 +212,7 @@ func TestFilters(t *testing.T) {
t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0])
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{hash1, hash2}})
- filter.SetBeginBlock(1)
- filter.SetEndBlock(10)
+ filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}})
logs, _ = filter.Find(context.Background())
if len(logs) != 2 {
@@ -251,10 +220,7 @@ func TestFilters(t *testing.T) {
}
failHash := common.BytesToHash([]byte("fail"))
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}})
logs, _ = filter.Find(context.Background())
if len(logs) != 0 {
@@ -262,20 +228,14 @@ func TestFilters(t *testing.T) {
}
failAddr := common.BytesToAddress([]byte("failmenow"))
- filter = New(backend, true)
- filter.SetAddresses([]common.Address{failAddr})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, []common.Address{failAddr}, nil)
logs, _ = filter.Find(context.Background())
if len(logs) != 0 {
t.Error("expected 0 log, got", len(logs))
}
- filter = New(backend, true)
- filter.SetTopics([][]common.Hash{{failHash}, {hash1}})
- filter.SetBeginBlock(0)
- filter.SetEndBlock(-1)
+ filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}})
logs, _ = filter.Find(context.Background())
if len(logs) != 0 {
diff --git a/eth/handler.go b/eth/handler.go
index 28ae208c0..aadd771df 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -49,6 +49,8 @@ const (
// txChanSize is the size of channel listening to TxPreEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
+
+ bloomBitsSection = 4096
)
var (
@@ -92,6 +94,8 @@ type ProtocolManager struct {
quitSync chan struct{}
noMorePeers chan struct{}
+ lesServer LesServer
+
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -114,6 +118,7 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
+
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")