From b5be6b72cb06ded22075a41db6abc670d33c5ea7 Mon Sep 17 00:00:00 2001 From: bas-vk Date: Mon, 28 Nov 2016 14:59:06 +0100 Subject: eth/filter: add support for pending logs (#3219) --- eth/filters/filter_system_test.go | 155 ++++++++++++++++++++++++++++++++------ 1 file changed, 130 insertions(+), 25 deletions(-) (limited to 'eth/filters/filter_system_test.go') diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index d6d4199cc..e8591a2e4 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -34,13 +34,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) -) - type testBackend struct { mux *event.TypeMux db ethdb.Database @@ -81,6 +74,11 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + genesis = core.WriteGenesisBlockForTesting(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) chainEvents = []core.ChainEvent{} @@ -130,6 +128,11 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -150,9 +153,13 @@ func TestPendingTxFilter(t *testing.T) { } for { - h := api.GetFilterChanges(fid0).([]common.Hash) - hashes = append(hashes, h...) + results, err := api.GetFilterChanges(fid0) + if err != nil { + t.Fatalf("Unable to retrieve logs: %v", err) + } + h := results.([]common.Hash) + hashes = append(hashes, h...) if len(hashes) >= len(transactions) { break } @@ -167,11 +174,86 @@ func TestPendingTxFilter(t *testing.T) { } } +// TestLogFilterCreation test whether a given filter criteria makes sense. +// If not it must return an error. +func TestLogFilterCreation(t *testing.T) { + var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + + testCases = []struct { + crit FilterCriteria + success bool + }{ + // defaults + {FilterCriteria{}, true}, + // valid block number range + {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, + // "mined" block range to pending + {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, + // new mined and pending blocks + {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false}, + } + ) + + for i, test := range testCases { + _, err := api.NewFilter(test.crit) + if test.success && err != nil { + t.Errorf("expected filter creation for case %d to success, got %v", i, err) + } + if !test.success && err == nil { + t.Errorf("expected testcase %d to fail with an error", i) + } + } +} + +// TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error +// when the filter is created. +func TestInvalidLogFilterCreation(t *testing.T) { + t.Parallel() + + var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + ) + + // different situations where log filter creation should fail. + // Reason: fromBlock > toBlock + testCases := []FilterCriteria{ + 0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, + 1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, + 2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, + } + + for i, test := range testCases { + if _, err := api.NewFilter(test); err == nil { + t.Errorf("Expected NewFilter for case #%d to fail", i) + } + } +} + // TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. func TestLogFilter(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") @@ -180,8 +262,8 @@ func TestLogFilter(t *testing.T) { secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + // posted twice, once as vm.Logs and once as core.PendingLogsEvent allLogs = vm.Logs{ - // Note, these are used for comparison of the test cases. vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0), vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1), vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1), @@ -189,45 +271,64 @@ func TestLogFilter(t *testing.T) { vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3), } + expectedCase7 = vm.Logs{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} + expectedCase11 = vm.Logs{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} + testCases = []struct { crit FilterCriteria expected vm.Logs id rpc.ID }{ // match all - {FilterCriteria{}, allLogs, ""}, + 0: {FilterCriteria{}, allLogs, ""}, // match none due to no matching addresses - {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, + 1: {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, // match logs based on addresses, ignore topics - {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, + 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, // match none due to no matching topics (match with address) - {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, + 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, // match logs based on addresses and topics - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, + 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics - {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes - {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""}, + 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, + // logs in the pending block + 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, + // mined logs with block num >= 2 or pending logs + 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, + // all "mined" logs with block num >= 2 + 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + // all "mined" logs + 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + // all "mined" logs with 1>= block num <=2 and topic secondTopic + 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{[]common.Hash{secondTopic}}}, allLogs[3:4], ""}, + // all "mined" and pending logs with topic firstTopic + 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{[]common.Hash{firstTopic}}}, expectedCase11, ""}, } - - err error ) // create all filters for i := range testCases { - testCases[i].id = api.NewFilter(testCases[i].crit) + testCases[i].id, _ = api.NewFilter(testCases[i].crit) } // raise events time.Sleep(1 * time.Second) - if err = mux.Post(allLogs); err != nil { + if err := mux.Post(allLogs); err != nil { + t.Fatal(err) + } + if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) } for i, tt := range testCases { var fetched []Log for { // fetch all expected logs - fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...) + results, err := api.GetFilterChanges(tt.id) + if err != nil { + t.Fatalf("Unable to fetch logs: %v", err) + } + + fetched = append(fetched, results.([]Log)...) if len(fetched) >= len(tt.expected) { break } @@ -247,7 +348,6 @@ func TestLogFilter(t *testing.T) { if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { t.Errorf("invalid log on index %d for case %d", l, i) } - } } } @@ -257,6 +357,11 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") @@ -319,7 +424,7 @@ func TestPendingLogsSubscription(t *testing.T) { // (some) events are posted. for i := range testCases { testCases[i].c = make(chan []Log) - testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c) + testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) } for n, test := range testCases { -- cgit