aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZsolt Felfoldi <zsfelfoldi@gmail.com>2017-09-06 08:33:10 +0800
committerPéter Szilágyi <peterke@gmail.com>2017-09-06 16:14:21 +0800
commit451ffdb62b43bab66188f3f7eeb2131b65415ccb (patch)
treed13c6e0a01d22e90c0f3222dcd261255b5dd1efb
parent6ff2c02991e60a8db54c2f60027442277fd889c0 (diff)
downloaddexon-451ffdb62b43bab66188f3f7eeb2131b65415ccb.tar.gz
dexon-451ffdb62b43bab66188f3f7eeb2131b65415ccb.tar.zst
dexon-451ffdb62b43bab66188f3f7eeb2131b65415ccb.zip
core/bloombits: use general filters instead of addresses and topics
-rw-r--r--core/bloombits/matcher.go54
-rw-r--r--core/bloombits/matcher_test.go6
-rw-r--r--eth/filters/filter.go19
3 files changed, 33 insertions, 46 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index e365fd6d0..df0967a12 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -24,7 +24,6 @@ import (
"sync/atomic"
"time"
- "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/crypto"
)
@@ -68,8 +67,7 @@ type Retrieval struct {
type Matcher struct {
sectionSize uint64 // Size of the data batches to filter on
- addresses []bloomIndexes // Addresses the system is filtering for
- topics [][]bloomIndexes // Topics the system is filtering for
+ filters [][]bloomIndexes // Filter the system is matching for
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
retrievers chan chan uint // Retriever processes waiting for bit allocations
@@ -82,7 +80,8 @@ type Matcher struct {
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
// address and topic filtering on them.
-func NewMatcher(sectionSize uint64, addresses []common.Address, topics [][]common.Hash) *Matcher {
+func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
+ // Create the matcher instance
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
@@ -91,48 +90,25 @@ func NewMatcher(sectionSize uint64, addresses []common.Address, topics [][]commo
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
- m.setAddresses(addresses)
- m.setTopics(topics)
- return m
-}
+ // Calculate the bloom bit indexes for the groups we're interested in
+ m.filters = nil
-// setAddresses configures the matcher to only return logs that are generated
-// from addresses that are included in the given list.
-func (m *Matcher) setAddresses(addresses []common.Address) {
- // Calculate the bloom bit indexes for the addresses we're interested in
- m.addresses = make([]bloomIndexes, len(addresses))
- for i, address := range addresses {
- m.addresses[i] = calcBloomIndexes(address.Bytes())
- }
- // For every bit, create a scheduler to load/download the bit vectors
- for _, bloomIndexList := range m.addresses {
- for _, bloomIndex := range bloomIndexList {
- m.addScheduler(bloomIndex)
+ for _, filter := range filters {
+ bloomBits := make([]bloomIndexes, len(filter))
+ for i, clause := range filter {
+ bloomBits[i] = calcBloomIndexes(clause)
}
- }
-}
-
-// setTopics configures the matcher to only return logs that have topics matching
-// the given list.
-func (m *Matcher) setTopics(topicsList [][]common.Hash) {
- // Calculate the bloom bit indexes for the topics we're interested in
- m.topics = nil
-
- for _, topics := range topicsList {
- bloomBits := make([]bloomIndexes, len(topics))
- for i, topic := range topics {
- bloomBits[i] = calcBloomIndexes(topic.Bytes())
- }
- m.topics = append(m.topics, bloomBits)
+ m.filters = append(m.filters, bloomBits)
}
// For every bit, create a scheduler to load/download the bit vectors
- for _, bloomIndexLists := range m.topics {
+ for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
+ return m
}
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
@@ -250,14 +226,10 @@ func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) ch
}
}()
// Assemble the daisy-chained filtering pipeline
- blooms := m.topics
- if len(m.addresses) > 0 {
- blooms = append([][]bloomIndexes{m.addresses}, blooms...)
- }
next := source
dist := make(chan *request, buffer)
- for _, bloom := range blooms {
+ for _, bloom := range m.filters {
next = m.subMatch(next, dist, bloom, session)
}
// Start the request distribution
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index fc49b43b8..177e1b792 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -94,10 +94,8 @@ func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64,
// number of requests made for cross validation between different modes.
func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
// Create a new matcher an simulate our explicit random bitsets
- matcher := NewMatcher(testSectionSize, nil, nil)
-
- matcher.addresses = filter[0]
- matcher.topics = filter[1:]
+ matcher := NewMatcher(testSectionSize, nil)
+ matcher.filters = filter
for _, rule := range filter {
for _, topic := range rule {
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 3a2226f6b..4f6c30058 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -60,6 +60,23 @@ type Filter struct {
// New creates a new filter which uses a bloom filter on blocks to figure out whether
// a particular block is interesting or not.
func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
+ // Flatten the address and topic filter clauses into a single filter system
+ var filters [][][]byte
+ if len(addresses) > 0 {
+ filter := make([][]byte, len(addresses))
+ for i, address := range addresses {
+ filter[i] = address.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ for _, topicList := range topics {
+ filter := make([][]byte, len(topicList))
+ for i, topic := range topicList {
+ filter[i] = topic.Bytes()
+ }
+ filters = append(filters, filter)
+ }
+ // Assemble and return the filter
size, _ := backend.BloomStatus()
return &Filter{
@@ -69,7 +86,7 @@ func New(backend Backend, begin, end int64, addresses []common.Address, topics [
addresses: addresses,
topics: topics,
db: backend.ChainDb(),
- matcher: bloombits.NewMatcher(size, addresses, topics),
+ matcher: bloombits.NewMatcher(size, filters),
}
}