diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-07-27 23:47:46 +0800 |
---|---|---|
committer | Bas van Kervel <basvankervel@gmail.com> | 2016-08-17 18:59:58 +0800 |
commit | 47ff8130124b479f1f051312eed50c33f0a38e6f (patch) | |
tree | cb29e4550f63f3a763dd04b267261e354e56d7eb /eth/filters/api.go | |
parent | 3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff) | |
download | go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst go-tangerine-47ff8130124b479f1f051312eed50c33f0a38e6f.zip |
rpc: refactor subscriptions and filters
Diffstat (limited to 'eth/filters/api.go')
-rw-r--r-- | eth/filters/api.go | 784 |
1 files changed, 327 insertions, 457 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go index 65c5b9380..3bc220348 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,292 +17,414 @@ package filters import ( - "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" + "math/big" "sync" "time" + "golang.org/x/net/context" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" - - "golang.org/x/net/context" ) var ( - filterTickerTime = 5 * time.Minute + deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) -// byte will be inferred -const ( - unknownFilterTy = iota - blockFilterTy - transactionFilterTy - logFilterTy -) +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ Type + deadline *time.Timer // filter is inactiv when deadline triggers + hashes []common.Hash + crit FilterCriteria + logs []Log + s *Subscription // associated subscription in event system +} // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type PublicFilterAPI struct { - mux *event.TypeMux - - quit chan struct{} - chainDb ethdb.Database - - filterManager *FilterSystem - - filterMapMu sync.RWMutex - filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers - - logMu sync.RWMutex - logQueue map[int]*logQueue - - blockMu sync.RWMutex - blockQueue map[int]*hashQueue - - transactionMu sync.RWMutex - transactionQueue map[int]*hashQueue + mux *event.TypeMux + quit chan struct{} + chainDb ethdb.Database + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter } // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI { - svc := &PublicFilterAPI{ - mux: mux, - chainDb: chainDb, - filterManager: NewFilterSystem(mux), - filterMapping: make(map[string]int), - logQueue: make(map[int]*logQueue), - blockQueue: make(map[int]*hashQueue), - transactionQueue: make(map[int]*hashQueue), + api := &PublicFilterAPI{ + mux: mux, + chainDb: chainDb, + events: NewEventSystem(mux), + filters: make(map[rpc.ID]*filter), } - go svc.start() - return svc -} -// Stop quits the work loop. -func (s *PublicFilterAPI) Stop() { - close(s.quit) + go api.timeoutLoop() + + return api } -// start the work loop, wait and process events. -func (s *PublicFilterAPI) start() { - timer := time.NewTicker(2 * time.Second) - defer timer.Stop() -done: +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(5 * time.Minute) for { - select { - case <-timer.C: - s.filterManager.Lock() // lock order like filterLoop() - s.logMu.Lock() - for id, filter := range s.logQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.logQueue, id) - } + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe() + delete(api.filters, id) + default: + continue } - s.logMu.Unlock() + } + api.filtersMu.Unlock() + } +} - s.blockMu.Lock() - for id, filter := range s.blockQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.blockQueue, id) - } - } - s.blockMu.Unlock() +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used throug the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan common.Hash) + pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) + ) - s.transactionMu.Lock() - for id, filter := range s.transactionQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.transactionQueue, id) + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph) } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return } - s.transactionMu.Unlock() - s.filterManager.Unlock() - case <-s.quit: - break done } - } + }() + return pendingTxSub.ID } -// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain. -func (s *PublicFilterAPI) NewBlockFilter() (string, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() - - externalId, err := newFilterId() - if err != nil { - return "", err +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, ChainFilter) - if err != nil { - return "", err - } + rpcSub := notifier.CreateSubscription() + + go func() { + txHashes := make(chan common.Hash) + pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) + + for { + select { + case h := <-txHashes: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() - s.blockMu.Lock() - s.blockQueue[id] = &hashQueue{timeout: time.Now()} - s.blockMu.Unlock() + return rpcSub, nil +} - filter.BlockCallback = func(block *types.Block, logs vm.Logs) { - s.blockMu.Lock() - defer s.blockMu.Unlock() +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *types.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) - if queue := s.blockQueue[id]; queue != nil { - queue.add(block.Hash()) + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() + rpcSub := notifier.CreateSubscription() - return externalId, nil -} + go func() { + headers := make(chan *types.Header) + headersSub := api.events.SubscribeNewHeads(headers) -// NewPendingTransactionFilter creates a filter that returns new pending transactions. -func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() - externalId, err := newFilterId() - if err != nil { - return "", err - } + return rpcSub, nil +} - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, PendingTxFilter) - if err != nil { - return "", err +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - s.transactionMu.Lock() - s.transactionQueue[id] = &hashQueue{timeout: time.Now()} - s.transactionMu.Unlock() + rpcSub := notifier.CreateSubscription() - filter.TransactionCallback = func(tx *types.Transaction) { - s.transactionMu.Lock() - defer s.transactionMu.Unlock() + go func() { + matchedLogs := make(chan []Log) + logsSub := api.events.SubscribeLogs(crit, matchedLogs) - if queue := s.transactionQueue[id]; queue != nil { - queue.add(tx.Hash()) + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } } - } + }() - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() + return rpcSub, nil +} - return externalId, nil +// FilterCriteria represents a request to create a new filter. +type FilterCriteria struct { + FromBlock *big.Int + ToBlock *big.Int + Addresses []common.Address + Topics [][]common.Hash } -// newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { + var ( + logs = make(chan []Log) + logsSub = api.events.SubscribeLogs(crit, logs) + ) - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, LogFilter) - if err != nil { - return 0, err + if crit.FromBlock == nil { + crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + if crit.ToBlock == nil { + crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } - s.logMu.Lock() - s.logQueue[id] = &logQueue{timeout: time.Now()} - s.logMu.Unlock() - - filter.SetBeginBlock(earliest) - filter.SetEndBlock(latest) - filter.SetAddresses(addresses) - filter.SetTopics(topics) - filter.LogCallback = func(log *vm.Log, removed bool) { - if callback != nil { - callback(log, removed) - } else { - s.logMu.Lock() - defer s.logMu.Unlock() - if queue := s.logQueue[id]; queue != nil { - queue.add(vmlog{log, removed}) + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return } } - } + }() - return id, nil + return logsSub.ID } -// Logs creates a subscription that fires for all new log that match the given filter criteria. -func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log { + if crit.FromBlock == nil { + crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + if crit.ToBlock == nil { + crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } - var ( - externalId string - subscription rpc.Subscription - err error - ) + filter := New(api.chainDb) + filter.SetBeginBlock(crit.FromBlock.Int64()) + filter.SetEndBlock(crit.ToBlock.Int64()) + filter.SetAddresses(crit.Addresses) + filter.SetTopics(crit.Topics) - if externalId, err = newFilterId(); err != nil { - return nil, err - } + return returnLogs(filter.Find()) +} - // uninstall filter when subscription is unsubscribed/cancelled - if subscription, err = notifier.NewSubscription(func(string) { - s.UninstallFilter(externalId) - }); err != nil { - return nil, err +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) } - - notifySubscriber := func(log *vm.Log, removed bool) { - rpcLog := toRPCLogs(vm.Logs{log}, removed) - if err := subscription.Notify(rpcLog); err != nil { - subscription.Cancel() - } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() } - // from and to block number are not used since subscriptions don't allow you to travel to "time" - var id int - if len(args.Addresses) > 0 { - id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber) - } else { - id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber) + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found || f.typ != LogsSubscription { + return []Log{} } - if err != nil { - subscription.Cancel() - return nil, err + filter := New(api.chainDb) + filter.SetBeginBlock(f.crit.FromBlock.Int64()) + filter.SetEndBlock(f.crit.ToBlock.Int64()) + filter.SetAddresses(f.crit.Addresses) + filter.SetTopics(f.crit.Topics) + + return returnLogs(filter.Find()) +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time is was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. If the filter could not be found +// []interface{}{} is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if f, found := api.filters[id]; found { + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case PendingTransactionsSubscription, BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes) + case PendingLogsSubscription, LogsSubscription: + logs := f.logs + f.logs = nil + return returnLogs(logs) + } } - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() + return []interface{}{} +} - return subscription, err +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes } -// NewFilterArgs represents a request to create a new filter. -type NewFilterArgs struct { - FromBlock rpc.BlockNumber - ToBlock rpc.BlockNumber - Addresses []common.Address - Topics [][]common.Hash +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []Log) []Log { + if logs == nil { + return []Log{} + } + return logs } // UnmarshalJSON sets *args fields with given data. -func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { +func (args *FilterCriteria) UnmarshalJSON(data []byte) error { type input struct { From *rpc.BlockNumber `json:"fromBlock"` ToBlock *rpc.BlockNumber `json:"toBlock"` @@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { } if raw.From == nil || raw.From.Int64() < 0 { - args.FromBlock = rpc.LatestBlockNumber + args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { - args.FromBlock = *raw.From + args.FromBlock = big.NewInt(raw.From.Int64()) } if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { - args.ToBlock = rpc.LatestBlockNumber + args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { - args.ToBlock = *raw.ToBlock + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) } args.Addresses = []common.Address{} @@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { return nil } - -// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs. -func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { - externalId, err := newFilterId() - if err != nil { - return "", err - } - - var id int - if len(args.Addresses) > 0 { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil) - } else { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil) - } - if err != nil { - return "", err - } - - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() - - return externalId, nil -} - -// GetLogs returns the logs matching the given argument. -func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog { - filter := New(s.chainDb) - filter.SetBeginBlock(args.FromBlock.Int64()) - filter.SetEndBlock(args.ToBlock.Int64()) - filter.SetAddresses(args.Addresses) - filter.SetTopics(args.Topics) - - return toRPCLogs(filter.Find(), false) -} - -// UninstallFilter removes the filter with the given filter id. -func (s *PublicFilterAPI) UninstallFilter(filterId string) bool { - s.filterManager.Lock() - defer s.filterManager.Unlock() - - s.filterMapMu.Lock() - id, ok := s.filterMapping[filterId] - if !ok { - s.filterMapMu.Unlock() - return false - } - delete(s.filterMapping, filterId) - s.filterMapMu.Unlock() - - s.filterManager.Remove(id) - - s.logMu.Lock() - if _, ok := s.logQueue[id]; ok { - delete(s.logQueue, id) - s.logMu.Unlock() - return true - } - s.logMu.Unlock() - - s.blockMu.Lock() - if _, ok := s.blockQueue[id]; ok { - delete(s.blockQueue, id) - s.blockMu.Unlock() - return true - } - s.blockMu.Unlock() - - s.transactionMu.Lock() - if _, ok := s.transactionQueue[id]; ok { - delete(s.transactionQueue, id) - s.transactionMu.Unlock() - return true - } - s.transactionMu.Unlock() - - return false -} - -// getFilterType is a helper utility that determine the type of filter for the given filter id. -func (s *PublicFilterAPI) getFilterType(id int) byte { - if _, ok := s.blockQueue[id]; ok { - return blockFilterTy - } else if _, ok := s.transactionQueue[id]; ok { - return transactionFilterTy - } else if _, ok := s.logQueue[id]; ok { - return logFilterTy - } - - return unknownFilterTy -} - -// blockFilterChanged returns a collection of block hashes for the block filter with the given id. -func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash { - s.blockMu.Lock() - defer s.blockMu.Unlock() - - if s.blockQueue[id] != nil { - return s.blockQueue[id].get() - } - return nil -} - -// transactionFilterChanged returns a collection of transaction hashes for the pending -// transaction filter with the given id. -func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash { - s.blockMu.Lock() - defer s.blockMu.Unlock() - - if s.transactionQueue[id] != nil { - return s.transactionQueue[id].get() - } - return nil -} - -// logFilterChanged returns a collection of logs for the log filter with the given id. -func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog { - s.logMu.Lock() - defer s.logMu.Unlock() - - if s.logQueue[id] != nil { - return s.logQueue[id].get() - } - return nil -} - -// GetFilterLogs returns the logs for the filter with the given id. -func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { - s.filterMapMu.RLock() - id, ok := s.filterMapping[filterId] - s.filterMapMu.RUnlock() - if !ok { - return toRPCLogs(nil, false) - } - - if filter := s.filterManager.Get(id); filter != nil { - return toRPCLogs(filter.Find(), false) - } - - return toRPCLogs(nil, false) -} - -// GetFilterChanges returns the logs for the filter with the given id since last time is was called. -// This can be used for polling. -func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { - s.filterMapMu.RLock() - id, ok := s.filterMapping[filterId] - s.filterMapMu.RUnlock() - - if !ok { // filter not found - return []interface{}{} - } - - switch s.getFilterType(id) { - case blockFilterTy: - return returnHashes(s.blockFilterChanged(id)) - case transactionFilterTy: - return returnHashes(s.transactionFilterChanged(id)) - case logFilterTy: - return s.logFilterChanged(id) - } - - return []interface{}{} -} - -type vmlog struct { - *vm.Log - Removed bool `json:"removed"` -} - -type logQueue struct { - mu sync.Mutex - - logs []vmlog - timeout time.Time - id int -} - -func (l *logQueue) add(logs ...vmlog) { - l.mu.Lock() - defer l.mu.Unlock() - - l.logs = append(l.logs, logs...) -} - -func (l *logQueue) get() []vmlog { - l.mu.Lock() - defer l.mu.Unlock() - - l.timeout = time.Now() - tmp := l.logs - l.logs = nil - return tmp -} - -type hashQueue struct { - mu sync.Mutex - - hashes []common.Hash - timeout time.Time - id int -} - -func (l *hashQueue) add(hashes ...common.Hash) { - l.mu.Lock() - defer l.mu.Unlock() - - l.hashes = append(l.hashes, hashes...) -} - -func (l *hashQueue) get() []common.Hash { - l.mu.Lock() - defer l.mu.Unlock() - - l.timeout = time.Now() - tmp := l.hashes - l.hashes = nil - return tmp -} - -// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random -// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them -// causing the affected DApp to miss data. -func newFilterId() (string, error) { - var subid [16]byte - n, _ := rand.Read(subid[:]) - if n != 16 { - return "", errors.New("Unable to generate filter id") - } - return "0x" + hex.EncodeToString(subid[:]), nil -} - -// toRPCLogs is a helper that will convert a vm.Logs array to an structure which -// can hold additional information about the logs such as whether it was deleted. -// Additionally when nil is given it will by default instead create an empty slice -// instead. This is required by the RPC specification. -func toRPCLogs(logs vm.Logs, removed bool) []vmlog { - convertedLogs := make([]vmlog, len(logs)) - for i, log := range logs { - convertedLogs[i] = vmlog{Log: log, Removed: removed} - } - return convertedLogs -} - -// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will -// return the given hashes. The RPC interfaces defines that always an array is returned. -func returnHashes(hashes []common.Hash) []common.Hash { - if hashes == nil { - return []common.Hash{} - } - return hashes -} |