From b5be6b72cb06ded22075a41db6abc670d33c5ea7 Mon Sep 17 00:00:00 2001 From: bas-vk Date: Mon, 28 Nov 2016 14:59:06 +0100 Subject: eth/filter: add support for pending logs (#3219) --- eth/filters/api.go | 80 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 33 deletions(-) (limited to 'eth/filters/api.go') diff --git a/eth/filters/api.go b/eth/filters/api.go index 584f55afd..d5dd57743 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - rpcSub := notifier.CreateSubscription() + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []Log) + ) + + logsSub, err := api.events.SubscribeLogs(crit, matchedLogs) + if err != nil { + return nil, err + } go func() { - matchedLogs := make(chan []Log) - logsSub := api.events.SubscribeLogs(crit, matchedLogs) for { select { @@ -276,18 +282,20 @@ type FilterCriteria struct { // used to retrieve logs when the state changes. This method cannot be // used to fetch logs that are already stored in the state. // +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter -func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { - var ( - logs = make(chan []Log) - logsSub = api.events.SubscribeLogs(crit, logs) - ) - - if crit.FromBlock == nil { - crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } - if crit.ToBlock == nil { - crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { + logs := make(chan []Log) + logsSub, err := api.events.SubscribeLogs(crit, logs) + if err != nil { + return rpc.ID(""), err } api.filtersMu.Lock() @@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { } }() - return logsSub.ID + return logsSub.ID, nil } // GetLogs returns logs matching the given argument that are stored within the state. @@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log api.filtersMu.Unlock() if !found || f.typ != LogsSubscription { - return []Log{}, nil + return nil, fmt.Errorf("filter not found") } filter := New(api.backend, api.useMipMap) - filter.SetBeginBlock(f.crit.FromBlock.Int64()) - filter.SetEndBlock(f.crit.ToBlock.Int64()) + if f.crit.FromBlock != nil { + filter.SetBeginBlock(f.crit.FromBlock.Int64()) + } else { + filter.SetBeginBlock(rpc.LatestBlockNumber.Int64()) + } + if f.crit.ToBlock != nil { + filter.SetEndBlock(f.crit.ToBlock.Int64()) + } else { + filter.SetEndBlock(rpc.LatestBlockNumber.Int64()) + } filter.SetAddresses(f.crit.Addresses) filter.SetTopics(f.crit.Topics) - logs, err := filter.Find(ctx) - return returnLogs(logs), err + logs, err:= filter.Find(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil } // GetFilterChanges returns the logs for the filter with the given id since // last time is was called. This can be used for polling. // // For pending transaction and block filters the result is []common.Hash. -// (pending)Log filters return []Log. If the filter could not be found -// []interface{}{} is returned. +// (pending)Log filters return []Log. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges -func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() @@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { case PendingTransactionsSubscription, BlocksSubscription: hashes := f.hashes f.hashes = nil - return returnHashes(hashes) - case PendingLogsSubscription, LogsSubscription: + return returnHashes(hashes), nil + case LogsSubscription: logs := f.logs f.logs = nil - return returnLogs(logs) + return returnLogs(logs), nil } } - return []interface{}{} + return []interface{}{}, fmt.Errorf("filter not found") } // returnHashes is a helper that will return an empty hash array case the given hash array is nil, @@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { return err } - if raw.From == nil || raw.From.Int64() < 0 { - args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } else { + if raw.From != nil { args.FromBlock = big.NewInt(raw.From.Int64()) } - if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { - args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } else { + if raw.ToBlock != nil { args.ToBlock = big.NewInt(raw.ToBlock.Int64()) } -- cgit