diff options
Diffstat (limited to 'eth/filters/filter_system.go')
-rw-r--r-- | eth/filters/filter_system.go | 82 |
1 files changed, 79 insertions, 3 deletions
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 04a55fd09..1e330b24f 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // Type determines the kind of filter and is used to put the filter in to @@ -95,6 +96,9 @@ type subscription struct { type EventSystem struct { mux *event.TypeMux sub event.Subscription + backend Backend + lightMode bool + lastHead *types.Header install chan *subscription // install filter for event notification uninstall chan *subscription // remove filter for event notification } @@ -105,9 +109,11 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux) *EventSystem { +func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ mux: mux, + backend: backend, + lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), } @@ -235,7 +241,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func broadcast(filters filterIndex, ev *event.Event) { +func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if ev == nil { return } @@ -279,7 +285,77 @@ func broadcast(filters filterIndex, ev *event.Event) { f.headers <- e.Block.Header() } } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + }) + } + } +} + +func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { + oldh := es.lastHead + es.lastHead = newHeader + if oldh == nil { + return + } + newh := newHeader + // find common ancestor, create list of rolled back and new block hashes + var oldHeaders, newHeaders []*types.Header + for oldh.Hash() != newh.Hash() { + if oldh.GetNumberU64() >= newh.GetNumberU64() { + oldHeaders = append(oldHeaders, oldh) + oldh = core.GetHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) + } + if oldh.GetNumberU64() < newh.GetNumberU64() { + newHeaders = append(newHeaders, newh) + newh = core.GetHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) + if newh == nil { + // happens when CHT syncing, nothing to do + newh = oldh + } + } + } + // roll back old blocks + for _, h := range oldHeaders { + callBack(h, true) + } + // check new blocks (array is in reverse order) + for i := len(newHeaders) - 1; i >= 0; i-- { + callBack(newHeaders[i], false) + } +} + +// filter logs of a single header in light client mode +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log { + //fmt.Println("lightFilterLogs", header.Number.Uint64(), remove) + if bloomFilter(header.Bloom, addresses, topics) { + //fmt.Println("bloom match") + // Get the logs of the block + ctx, _ := context.WithTimeout(context.Background(), time.Second*5) + receipts, err := es.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil + } + var unfiltered []Log + for _, receipt := range receipts { + rl := make([]Log, len(receipt.Logs)) + for i, l := range receipt.Logs { + rl[i] = Log{l, remove} + } + unfiltered = append(unfiltered, rl...) + } + logs := filterLogs(unfiltered, addresses, topics) + //fmt.Println("found", len(logs)) + return logs } + return nil } // eventLoop (un)installs filters and processes mux events. @@ -294,7 +370,7 @@ func (es *EventSystem) eventLoop() { if !active { // system stopped return } - broadcast(index, ev) + es.broadcast(index, ev) case f := <-es.install: if _, found := index[f.typ]; !found { index[f.typ] = make(map[rpc.ID]*subscription) |