aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--eth/filters/api.go80
-rw-r--r--eth/filters/api_test.go8
-rw-r--r--eth/filters/filter.go15
-rw-r--r--eth/filters/filter_system.go131
-rw-r--r--eth/filters/filter_system_test.go155
-rw-r--r--eth/filters/filter_test.go3
-rw-r--r--miner/worker.go19
7 files changed, 319 insertions, 92 deletions
diff --git a/eth/filters/api.go b/eth/filters/api.go
index 584f55afd..d5dd57743 100644
--- a/eth/filters/api.go
+++ b/eth/filters/api.go
@@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
- rpcSub := notifier.CreateSubscription()
+ var (
+ rpcSub = notifier.CreateSubscription()
+ matchedLogs = make(chan []Log)
+ )
+
+ logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
+ if err != nil {
+ return nil, err
+ }
go func() {
- matchedLogs := make(chan []Log)
- logsSub := api.events.SubscribeLogs(crit, matchedLogs)
for {
select {
@@ -276,18 +282,20 @@ type FilterCriteria struct {
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
+// Default criteria for the from and to block are "latest".
+// Using "latest" as block number will return logs for mined blocks.
+// Using "pending" as block number returns logs for not yet mined (pending) blocks.
+// In case logs are removed (chain reorg) previously returned logs are returned
+// again but with the removed property set to true.
+//
+// In case "fromBlock" > "toBlock" an error is returned.
+//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
-func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
- var (
- logs = make(chan []Log)
- logsSub = api.events.SubscribeLogs(crit, logs)
- )
-
- if crit.FromBlock == nil {
- crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
- }
- if crit.ToBlock == nil {
- crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
+func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
+ logs := make(chan []Log)
+ logsSub, err := api.events.SubscribeLogs(crit, logs)
+ if err != nil {
+ return rpc.ID(""), err
}
api.filtersMu.Lock()
@@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
}
}()
- return logsSub.ID
+ return logsSub.ID, nil
}
// GetLogs returns logs matching the given argument that are stored within the state.
@@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log
api.filtersMu.Unlock()
if !found || f.typ != LogsSubscription {
- return []Log{}, nil
+ return nil, fmt.Errorf("filter not found")
}
filter := New(api.backend, api.useMipMap)
- filter.SetBeginBlock(f.crit.FromBlock.Int64())
- filter.SetEndBlock(f.crit.ToBlock.Int64())
+ if f.crit.FromBlock != nil {
+ filter.SetBeginBlock(f.crit.FromBlock.Int64())
+ } else {
+ filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
+ }
+ if f.crit.ToBlock != nil {
+ filter.SetEndBlock(f.crit.ToBlock.Int64())
+ } else {
+ filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
+ }
filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics)
- logs, err := filter.Find(ctx)
- return returnLogs(logs), err
+ logs, err:= filter.Find(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return returnLogs(logs), nil
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time is was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
-// (pending)Log filters return []Log. If the filter could not be found
-// []interface{}{} is returned.
+// (pending)Log filters return []Log.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
-func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
+func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
@@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
- return returnHashes(hashes)
- case PendingLogsSubscription, LogsSubscription:
+ return returnHashes(hashes), nil
+ case LogsSubscription:
logs := f.logs
f.logs = nil
- return returnLogs(logs)
+ return returnLogs(logs), nil
}
}
- return []interface{}{}
+ return []interface{}{}, fmt.Errorf("filter not found")
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
@@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
return err
}
- if raw.From == nil || raw.From.Int64() < 0 {
- args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
- } else {
+ if raw.From != nil {
args.FromBlock = big.NewInt(raw.From.Int64())
}
- if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
- args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
- } else {
+ if raw.ToBlock != nil {
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go
index 98eb6cbaa..068a5ea24 100644
--- a/eth/filters/api_test.go
+++ b/eth/filters/api_test.go
@@ -42,11 +42,11 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
t.Fatal(err)
}
- if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() {
- t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock)
+ if test0.FromBlock != nil {
+ t.Fatalf("expected nil, got %d", test0.FromBlock)
}
- if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() {
- t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock)
+ if test0.ToBlock != nil {
+ t.Fatalf("expected nil, got %d", test0.ToBlock)
}
if len(test0.Addresses) != 0 {
t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses))
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 4004af300..ce7383fb3 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -20,6 +20,8 @@ import (
"math"
"time"
+ "math/big"
+
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@@ -162,7 +164,7 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er
}
unfiltered = append(unfiltered, rl...)
}
- logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...)
+ logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...)
}
}
@@ -179,12 +181,18 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}
-func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log {
+func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log {
var ret []Log
-
// Filter the logs for interesting stuff
Logs:
for _, log := range logs {
+ if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber {
+ continue
+ }
+ if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber {
+ continue
+ }
+
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
@@ -211,7 +219,6 @@ Logs:
continue Logs
}
}
-
ret = append(ret, log)
}
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index c2c072a9f..b59718aea 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -43,13 +43,17 @@ const (
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
- // PendingLogsSubscription queries for logs for the pending block
+ // PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
+ // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
+ MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
+ // LastSubscription keeps track of the last index
+ LastIndexSubscription
)
var (
@@ -63,19 +67,26 @@ type Log struct {
Removed bool `json:"removed"`
}
+// MarshalJSON returns *l as the JSON encoding of l.
func (l *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{
"address": l.Address,
"data": fmt.Sprintf("0x%x", l.Data),
- "blockNumber": fmt.Sprintf("%#x", l.BlockNumber),
+ "blockNumber": nil,
"logIndex": fmt.Sprintf("%#x", l.Index),
- "blockHash": l.BlockHash,
+ "blockHash": nil,
"transactionHash": l.TxHash,
"transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
"topics": l.Topics,
"removed": l.Removed,
}
+ // mined logs
+ if l.BlockHash != (common.Hash{}) {
+ fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber)
+ fields["blockHash"] = l.BlockHash
+ }
+
return json.Marshal(fields)
}
@@ -169,11 +180,50 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {
}
// SubscribeLogs creates a subscription that will write all logs matching the
-// given criteria to the given logs channel.
-func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+// given criteria to the given logs channel. Default value for the from and to
+// block is "latest". If the fromBlock > toBlock an error is returned.
+func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) {
+ var from, to rpc.BlockNumber
+ if crit.FromBlock == nil {
+ from = rpc.LatestBlockNumber
+ } else {
+ from = rpc.BlockNumber(crit.FromBlock.Int64())
+ }
+ if crit.ToBlock == nil {
+ to = rpc.LatestBlockNumber
+ } else {
+ to = rpc.BlockNumber(crit.ToBlock.Int64())
+ }
+
+ // only interested in pending logs
+ if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
+ return es.subscribePendingLogs(crit, logs), nil
+ }
+ // only interested in new mined logs
+ if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // only interested in mined logs within a specific block range
+ if from >= 0 && to >= 0 && to >= from {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ // interested in mined logs from a specific block number, new logs and pending logs
+ if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
+ return es.subscribeMinedPendingLogs(crit, logs), nil
+ }
+ // interested in logs from a specific block number to new mined blocks
+ if from >= 0 && to == rpc.LatestBlockNumber {
+ return es.subscribeLogs(crit, logs), nil
+ }
+ return nil, fmt.Errorf("invalid from and to block combination: from > to")
+}
+
+// subscribeMinedPendingLogs creates a subscription that returned mined and
+// pending logs that match the given criteria.
+func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: LogsSubscription,
+ typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
@@ -186,12 +236,12 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subs
return es.subscribe(sub)
}
-// SubscribePendingLogs creates a subscription that will write pending logs matching the
-// given criteria to the given channel.
-func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
+// subscribeLogs creates a subscription that will write all logs matching the
+// given criteria to the given logs channel.
+func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: PendingLogsSubscription,
+ typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
@@ -204,15 +254,16 @@ func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log
return es.subscribe(sub)
}
-// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
+// subscribePendingLogs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
-func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
- typ: PendingTransactionsSubscription,
+ typ: PendingLogsSubscription,
+ logsCrit: crit,
created: time.Now(),
- logs: make(chan []Log),
- hashes: hashes,
+ logs: logs,
+ hashes: make(chan common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
@@ -238,6 +289,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub)
}
+// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
+// transactions that enter the transaction pool.
+func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
+ sub := &subscription{
+ id: rpc.NewID(),
+ typ: PendingTransactionsSubscription,
+ created: time.Now(),
+ logs: make(chan []Log),
+ hashes: hashes,
+ headers: make(chan *types.Header),
+ installed: make(chan struct{}),
+ err: make(chan error),
+ }
+
+ return es.subscribe(sub)
+}
+
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
@@ -251,7 +319,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -260,7 +328,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -268,7 +336,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.PendingLogsEvent:
for _, f := range filters[PendingLogsSubscription] {
if ev.Time.After(f.created) {
- if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
+ if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@@ -351,8 +419,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
}
unfiltered = append(unfiltered, rl...)
}
- logs := filterLogs(unfiltered, addresses, topics)
- //fmt.Println("found", len(logs))
+
+ logs := filterLogs(unfiltered, nil, nil, addresses, topics)
return logs
}
return nil
@@ -364,6 +432,11 @@ func (es *EventSystem) eventLoop() {
index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
)
+
+ for i := UnknownSubscription; i < LastIndexSubscription; i++ {
+ index[i] = make(map[rpc.ID]*subscription)
+ }
+
for {
select {
case ev, active := <-sub.Chan():
@@ -372,13 +445,22 @@ func (es *EventSystem) eventLoop() {
}
es.broadcast(index, ev)
case f := <-es.install:
- if _, found := index[f.typ]; !found {
- index[f.typ] = make(map[rpc.ID]*subscription)
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ index[LogsSubscription][f.id] = f
+ index[PendingLogsSubscription][f.id] = f
+ } else {
+ index[f.typ][f.id] = f
}
- index[f.typ][f.id] = f
close(f.installed)
case f := <-es.uninstall:
- delete(index[f.typ], f.id)
+ if f.typ == MinedAndPendingLogsSubscription {
+ // the type are logs and pending logs subscriptions
+ delete(index[LogsSubscription], f.id)
+ delete(index[PendingLogsSubscription], f.id)
+ } else {
+ delete(index[f.typ], f.id)
+ }
close(f.err)
}
}
@@ -386,6 +468,7 @@ func (es *EventSystem) eventLoop() {
// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
func convertLogs(in vm.Logs, removed bool) []Log {
+
logs := make([]Log, len(in))
for i, l := range in {
logs[i] = Log{l, removed}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index d6d4199cc..e8591a2e4 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -34,13 +34,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
-var (
- mux = new(event.TypeMux)
- db, _ = ethdb.NewMemDatabase()
- backend = &testBackend{mux, db}
- api = NewPublicFilterAPI(backend, false)
-)
-
type testBackend struct {
mux *event.TypeMux
db ethdb.Database
@@ -81,6 +74,11 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+
genesis = core.WriteGenesisBlockForTesting(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
@@ -130,6 +128,11 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@@ -150,9 +153,13 @@ func TestPendingTxFilter(t *testing.T) {
}
for {
- h := api.GetFilterChanges(fid0).([]common.Hash)
- hashes = append(hashes, h...)
+ results, err := api.GetFilterChanges(fid0)
+ if err != nil {
+ t.Fatalf("Unable to retrieve logs: %v", err)
+ }
+ h := results.([]common.Hash)
+ hashes = append(hashes, h...)
if len(hashes) >= len(transactions) {
break
}
@@ -167,11 +174,86 @@ func TestPendingTxFilter(t *testing.T) {
}
}
+// TestLogFilterCreation test whether a given filter criteria makes sense.
+// If not it must return an error.
+func TestLogFilterCreation(t *testing.T) {
+ var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+
+ testCases = []struct {
+ crit FilterCriteria
+ success bool
+ }{
+ // defaults
+ {FilterCriteria{}, true},
+ // valid block number range
+ {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true},
+ // "mined" block range to pending
+ {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true},
+ // new mined and pending blocks
+ {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true},
+ // from block "higher" than to block
+ {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false},
+ // from block "higher" than to block
+ {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
+ // from block "higher" than to block
+ {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
+ // from block "higher" than to block
+ {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false},
+ }
+ )
+
+ for i, test := range testCases {
+ _, err := api.NewFilter(test.crit)
+ if test.success && err != nil {
+ t.Errorf("expected filter creation for case %d to success, got %v", i, err)
+ }
+ if !test.success && err == nil {
+ t.Errorf("expected testcase %d to fail with an error", i)
+ }
+ }
+}
+
+// TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error
+// when the filter is created.
+func TestInvalidLogFilterCreation(t *testing.T) {
+ t.Parallel()
+
+ var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+ )
+
+ // different situations where log filter creation should fail.
+ // Reason: fromBlock > toBlock
+ testCases := []FilterCriteria{
+ 0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
+ 1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)},
+ 2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)},
+ }
+
+ for i, test := range testCases {
+ if _, err := api.NewFilter(test); err == nil {
+ t.Errorf("Expected NewFilter for case #%d to fail", i)
+ }
+ }
+}
+
// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
func TestLogFilter(t *testing.T) {
t.Parallel()
var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
@@ -180,8 +262,8 @@ func TestLogFilter(t *testing.T) {
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
+ // posted twice, once as vm.Logs and once as core.PendingLogsEvent
allLogs = vm.Logs{
- // Note, these are used for comparison of the test cases.
vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0),
vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1),
vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1),
@@ -189,45 +271,64 @@ func TestLogFilter(t *testing.T) {
vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3),
}
+ expectedCase7 = vm.Logs{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]}
+ expectedCase11 = vm.Logs{allLogs[1], allLogs[2], allLogs[1], allLogs[2]}
+
testCases = []struct {
crit FilterCriteria
expected vm.Logs
id rpc.ID
}{
// match all
- {FilterCriteria{}, allLogs, ""},
+ 0: {FilterCriteria{}, allLogs, ""},
// match none due to no matching addresses
- {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
+ 1: {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
// match logs based on addresses, ignore topics
- {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
+ 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
// match none due to no matching topics (match with address)
- {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
+ 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
// match logs based on addresses and topics
- {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
+ 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
// match logs based on multiple addresses and "or" topics
- {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
- // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes
- {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""},
+ 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
+ // logs in the pending block
+ 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""},
+ // mined logs with block num >= 2 or pending logs
+ 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""},
+ // all "mined" logs with block num >= 2
+ 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
+ // all "mined" logs
+ 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
+ // all "mined" logs with 1>= block num <=2 and topic secondTopic
+ 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{[]common.Hash{secondTopic}}}, allLogs[3:4], ""},
+ // all "mined" and pending logs with topic firstTopic
+ 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{[]common.Hash{firstTopic}}}, expectedCase11, ""},
}
-
- err error
)
// create all filters
for i := range testCases {
- testCases[i].id = api.NewFilter(testCases[i].crit)
+ testCases[i].id, _ = api.NewFilter(testCases[i].crit)
}
// raise events
time.Sleep(1 * time.Second)
- if err = mux.Post(allLogs); err != nil {
+ if err := mux.Post(allLogs); err != nil {
+ t.Fatal(err)
+ }
+ if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err)
}
for i, tt := range testCases {
var fetched []Log
for { // fetch all expected logs
- fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...)
+ results, err := api.GetFilterChanges(tt.id)
+ if err != nil {
+ t.Fatalf("Unable to fetch logs: %v", err)
+ }
+
+ fetched = append(fetched, results.([]Log)...)
if len(fetched) >= len(tt.expected) {
break
}
@@ -247,7 +348,6 @@ func TestLogFilter(t *testing.T) {
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
-
}
}
}
@@ -257,6 +357,11 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
+ mux = new(event.TypeMux)
+ db, _ = ethdb.NewMemDatabase()
+ backend = &testBackend{mux, db}
+ api = NewPublicFilterAPI(backend, false)
+
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
@@ -319,7 +424,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// (some) events are posted.
for i := range testCases {
testCases[i].c = make(chan []Log)
- testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c)
+ testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
}
for n, test := range testCases {
diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go
index a8c767ead..ab6a87851 100644
--- a/eth/filters/filter_test.go
+++ b/eth/filters/filter_test.go
@@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/event"
)
func makeReceipt(addr common.Address) *types.Receipt {
@@ -51,6 +52,7 @@ func BenchmarkMipmaps(b *testing.B) {
var (
db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
backend = &testBackend{mux, db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
@@ -126,6 +128,7 @@ func TestFilters(t *testing.T) {
var (
db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
+ mux = new(event.TypeMux)
backend = &testBackend{mux, db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
diff --git a/miner/worker.go b/miner/worker.go
index 2933b6bd3..ca00c7229 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -262,6 +262,7 @@ func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) (
func (self *worker) wait() {
for {
+ mustCommitNewWork := true
for result := range self.recv {
atomic.AddInt32(&self.atWork, -1)
@@ -315,6 +316,8 @@ func (self *worker) wait() {
core.WriteReceipts(self.chainDb, work.receipts)
// Write map map bloom filters
core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts)
+ // implicit by posting ChainHeadEvent
+ mustCommitNewWork = false
}
// broadcast before waiting for validation
@@ -343,7 +346,9 @@ func (self *worker) wait() {
}
glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm)
- self.commitNewWork()
+ if mustCommitNewWork {
+ self.commitNewWork()
+ }
}
}
}
@@ -451,6 +456,7 @@ func (self *worker) commitNewWork() {
tstart := time.Now()
parent := self.chain.CurrentBlock()
+
tstamp := tstart.Unix()
if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
tstamp = parent.Time().Int64() + 1
@@ -618,7 +624,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
txs.Shift()
}
}
+
if len(coalescedLogs) > 0 || env.tcount > 0 {
+ // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
+ // logs by filling in the block hash when the block was mined by the local miner. This can
+ // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
+ cpy := make(vm.Logs, len(coalescedLogs))
+ for i, l := range coalescedLogs {
+ cpy[i] = new(vm.Log)
+ *cpy[i] = *l
+ }
go func(logs vm.Logs, tcount int) {
if len(logs) > 0 {
mux.Post(core.PendingLogsEvent{Logs: logs})
@@ -626,7 +641,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
if tcount > 0 {
mux.Post(core.PendingStateEvent{})
}
- }(coalescedLogs, env.tcount)
+ }(cpy, env.tcount)
}
}