diff options
Diffstat (limited to 'eth/filters/filter.go')
-rw-r--r-- | eth/filters/filter.go | 232 |
1 files changed, 152 insertions, 80 deletions
diff --git a/eth/filters/filter.go b/eth/filters/filter.go index f848bc6af..ea9ccf2f9 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,11 +18,14 @@ package filters import ( "context" - "math" "math/big" + "sync" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -34,58 +37,51 @@ type Backend interface { EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + BloomBitsSections() uint64 + BloomBitsConfig() BloomConfig SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + GetBloomBits(ctx context.Context, bitIdx uint64, sectionIdxList []uint64) ([][]byte, error) +} + +type BloomConfig struct { + SectionSize uint64 + MaxRequestLen int + MaxRequestWait time.Duration } // Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend - useMipMap bool + backend Backend + bloomBitsConfig BloomConfig db ethdb.Database begin, end int64 addresses []common.Address topics [][]common.Hash + + decompress func([]byte, int) ([]byte, error) + matcher *bloombits.Matcher } // New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. -// MipMaps allow past blocks to be searched much more efficiently, but are not available -// to light clients. -func New(backend Backend, useMipMap bool) *Filter { +func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ - backend: backend, - useMipMap: useMipMap, - db: backend.ChainDb(), + backend: backend, + begin: begin, + end: end, + addresses: addresses, + topics: topics, + bloomBitsConfig: backend.BloomBitsConfig(), + db: backend.ChainDb(), + matcher: bloombits.NewMatcher(backend.BloomBitsConfig().SectionSize, addresses, topics), + decompress: bitutil.DecompressBytes, } } -// SetBeginBlock sets the earliest block for filtering. -// -1 = latest block (i.e., the current block) -// hash = particular hash from-to -func (f *Filter) SetBeginBlock(begin int64) { - f.begin = begin -} - -// SetEndBlock sets the latest block for filtering. -func (f *Filter) SetEndBlock(end int64) { - f.end = end -} - -// SetAddresses matches only logs that are generated from addresses that are included -// in the given addresses. -func (f *Filter) SetAddresses(addr []common.Address) { - f.addresses = addr -} - -// SetTopics matches only logs that have topics matching the given topics. -func (f *Filter) SetTopics(topics [][]common.Hash) { - f.topics = topics -} - // FindOnce searches the blockchain for matching log entries, returning // all matching entries from the first block that contains matches, // updating the start point of the filter accordingly. If no results are @@ -106,18 +102,9 @@ func (f *Filter) FindOnce(ctx context.Context) ([]*types.Log, error) { endBlockNo = headBlockNumber } - // if no addresses are present we can't make use of fast search which - // uses the mipmap bloom filters to check for fast inclusion and uses - // higher range probability in order to ensure at least a false positive - if !f.useMipMap || len(f.addresses) == 0 { - logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo) - f.begin = int64(blockNumber + 1) - return logs, err - } - - logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0) + logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo) f.begin = int64(blockNumber + 1) - return logs, nil + return logs, err } // Run filters logs with the current parameters set @@ -131,43 +118,134 @@ func (f *Filter) Find(ctx context.Context) (logs []*types.Log, err error) { } } -func (f *Filter) mipFind(start, end uint64, depth int) (logs []*types.Log, blockNumber uint64) { - level := core.MIPMapLevels[depth] - // normalise numerator so we can work in level specific batches and - // work with the proper range checks - for num := start / level * level; num <= end; num += level { - // find addresses in bloom filters - bloom := core.GetMipmapBloom(f.db, num, level) - // Don't bother checking the first time through the loop - we're probably picking - // up where a previous run left off. - first := true - for _, addr := range f.addresses { - if first || bloom.TestBytes(addr[:]) { - first = false - // range check normalised values and make sure that - // we're resolving the correct range instead of the - // normalised values. - start := uint64(math.Max(float64(num), float64(start))) - end := uint64(math.Min(float64(num+level-1), float64(end))) - if depth+1 == len(core.MIPMapLevels) { - l, blockNumber, _ := f.getLogs(context.Background(), start, end) - if len(l) > 0 { - return l, blockNumber +// nextRequest returns the next request to retrieve for the bloombits matcher +func (f *Filter) nextRequest() (bloombits uint, sections []uint64) { + bloomIndex, ok := f.matcher.AllocSectionQueue() + if !ok { + return 0, nil + } + if f.bloomBitsConfig.MaxRequestWait > 0 && + (f.bloomBitsConfig.MaxRequestLen <= 1 || // SectionCount is always greater than zero after a successful alloc + f.matcher.SectionCount(bloomIndex) < f.bloomBitsConfig.MaxRequestLen) { + time.Sleep(f.bloomBitsConfig.MaxRequestWait) + } + return bloomIndex, f.matcher.FetchSections(bloomIndex, f.bloomBitsConfig.MaxRequestLen) +} + +// serveMatcher serves the bloombits matcher by fetching the requested vectors +// through the filter backend +func (f *Filter) serveMatcher(ctx context.Context, stop chan struct{}, wg *sync.WaitGroup) chan error { + errChn := make(chan error, 1) + wg.Add(10) + for i := 0; i < 10; i++ { + go func(i int) { + defer wg.Done() + + for { + b, s := f.nextRequest() + if s == nil { + return + } + data, err := f.backend.GetBloomBits(ctx, uint64(b), s) + if err != nil { + select { + case errChn <- err: + case <-stop: } - } else { - l, blockNumber := f.mipFind(start, end, depth+1) - if len(l) > 0 { - return l, blockNumber + return + } + decomp := make([][]byte, len(data)) + for i, d := range data { + var err error + if decomp[i], err = f.decompress(d, int(f.bloomBitsConfig.SectionSize/8)); err != nil { + select { + case errChn <- err: + case <-stop: + } + return } } + f.matcher.Deliver(b, s, decomp) } - } + }(i) } - return nil, end + return errChn +} + +// checkMatches checks if the receipts belonging to the given header contain any log events that +// match the filter criteria. This function is called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + // Get the logs of the block + receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil, err + } + var unfiltered []*types.Log + for _, receipt := range receipts { + unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + if len(logs) > 0 { + return logs, nil + } + return nil, nil } func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types.Log, blockNumber uint64, err error) { + haveBloomBitsBefore := f.backend.BloomBitsSections() * f.bloomBitsConfig.SectionSize + if haveBloomBitsBefore > start { + e := end + if haveBloomBitsBefore <= e { + e = haveBloomBitsBefore - 1 + } + + stop := make(chan struct{}) + var wg sync.WaitGroup + matches := f.matcher.Start(start, e) + errChn := f.serveMatcher(ctx, stop, &wg) + + defer func() { + f.matcher.Stop() + close(stop) + wg.Wait() + }() + + loop: + for { + select { + case i, ok := <-matches: + if !ok { + break loop + } + + blockNumber := rpc.BlockNumber(i) + header, err := f.backend.HeaderByNumber(ctx, blockNumber) + if header == nil || err != nil { + return logs, end, err + } + + logs, err := f.checkMatches(ctx, header) + if err != nil { + return nil, end, err + } + if logs != nil { + return logs, i, nil + } + case err := <-errChn: + return logs, end, err + case <-ctx.Done(): + return nil, end, ctx.Err() + } + } + + if end < haveBloomBitsBefore { + return logs, end, nil + } + start = haveBloomBitsBefore + } + + // search the rest with regular block-by-block bloom filtering for i := start; i <= end; i++ { blockNumber := rpc.BlockNumber(i) header, err := f.backend.HeaderByNumber(ctx, blockNumber) @@ -178,18 +256,12 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*types. // Use bloom filtering to see if this block is interesting given the // current parameters if f.bloomFilter(header.Bloom) { - // Get the logs of the block - receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + logs, err := f.checkMatches(ctx, header) if err != nil { return nil, end, err } - var unfiltered []*types.Log - for _, receipt := range receipts { - unfiltered = append(unfiltered, ([]*types.Log)(receipt.Logs)...) - } - logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - if len(logs) > 0 { - return logs, uint64(blockNumber), nil + if logs != nil { + return logs, i, nil } } } |