diff options
Diffstat (limited to 'core/bloombits/matcher_test.go')
-rw-r--r-- | core/bloombits/matcher_test.go | 283 |
1 files changed, 163 insertions, 120 deletions
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index bef1491b8..fc49b43b8 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -13,6 +13,7 @@ // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + package bloombits import ( @@ -20,177 +21,219 @@ import ( "sync/atomic" "testing" "time" - - "github.com/ethereum/go-ethereum/core/types" ) const testSectionSize = 4096 -func matcherTestVector(b uint, s uint64) []byte { - r := make([]byte, testSectionSize/8) - for i, _ := range r { - var bb byte - for bit := 0; bit < 8; bit++ { - blockIdx := s*testSectionSize + uint64(i*8+bit) - bb += bb - if (blockIdx % uint64(b)) == 0 { - bb++ - } - } - r[i] = bb - } - return r +// Tests the matcher pipeline on a single continuous workflow without interrupts. +func TestMatcherContinuous(t *testing.T) { + testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75) + testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81) + testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36) } -func expMatch1(idxs types.BloomIndexList, i uint64) bool { - for _, ii := range idxs { - if (i % uint64(ii)) != 0 { - return false - } - } - return true +// Tests the matcher pipeline on a constantly interrupted and resumed work pattern +// with the aim of ensuring data items are requested only once. +func TestMatcherIntermittent(t *testing.T) { + testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75) + testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81) + testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36) } -func expMatch2(idxs []types.BloomIndexList, i uint64) bool { - for _, ii := range idxs { - if expMatch1(ii, i) { - return true - } +// Tests the matcher pipeline on random input to hopefully catch anomalies. +func TestMatcherRandom(t *testing.T) { + for i := 0; i < 10; i++ { + testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0) + testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0) + testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0) + testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0) + testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0) } - return false } -func expMatch3(idxs [][]types.BloomIndexList, i uint64) bool { - for _, ii := range idxs { - if !expMatch2(ii, i) { - return false +// makeRandomIndexes generates a random filter system, composed on multiple filter +// criteria, each having one bloom list component for the address and arbitrarilly +// many topic bloom list components. +func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes { + res := make([][]bloomIndexes, len(lengths)) + for i, topics := range lengths { + res[i] = make([]bloomIndexes, topics) + for j := 0; j < topics; j++ { + for k := 0; k < len(res[i][j]); k++ { + res[i][j][k] = uint(rand.Intn(max-1) + 2) + } } } - return true + return res } -func testServeMatcher(m *Matcher, stop chan struct{}, cnt *uint32, maxRequestLen int) { - // serve matcher with test vectors - for i := 0; i < 10; i++ { - go func() { - for { - select { - case <-stop: - return - default: - } - b, ok := m.AllocSectionQueue() - if !ok { - return - } - if m.SectionCount(b) < maxRequestLen { - time.Sleep(time.Microsecond * 100) - } - s := m.FetchSections(b, maxRequestLen) - res := make([][]byte, len(s)) - for i, ss := range s { - res[i] = matcherTestVector(b, ss) - atomic.AddUint32(cnt, 1) - } - m.Deliver(b, s, res) - } - }() +// testMatcherDiffBatches runs the given matches test in single-delivery and also +// in batches delivery mode, verifying that all kinds of deliveries are handled +// correctly withn. +func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) { + singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1) + batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16) + + if singleton != batched { + t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched) } } -func testMatcher(t *testing.T, idxs [][]types.BloomIndexList, cnt uint64, stopOnMatches bool, expCount uint32) uint32 { - count1 := testMatcherWithReqCount(t, idxs, cnt, stopOnMatches, expCount, 1) - count16 := testMatcherWithReqCount(t, idxs, cnt, stopOnMatches, expCount, 16) - if count1 != count16 { - t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: request count mismatch, %v with maxReqCount = 1 vs. %v with maxReqCount = 16", idxs, cnt, stopOnMatches, count1, count16) +// testMatcherBothModes runs the given matcher test in both continuous as well as +// in intermittent mode, verifying that the request counts match each other. +func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) { + continuous := testMatcher(t, filter, blocks, false, retrievals, 16) + intermittent := testMatcher(t, filter, blocks, true, retrievals, 16) + + if continuous != intermittent { + t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent) } - return count1 } -func testMatcherWithReqCount(t *testing.T, idxs [][]types.BloomIndexList, cnt uint64, stopOnMatches bool, expCount uint32, maxReqCount int) uint32 { - m := NewMatcher(testSectionSize, nil, nil) +// testMatcher is a generic tester to run the given matcher test and return the +// number of requests made for cross validation between different modes. +func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 { + // Create a new matcher an simulate our explicit random bitsets + matcher := NewMatcher(testSectionSize, nil, nil) - for _, idxss := range idxs { - for _, idxs := range idxss { - for _, idx := range idxs { - m.newFetcher(idx) + matcher.addresses = filter[0] + matcher.topics = filter[1:] + + for _, rule := range filter { + for _, topic := range rule { + for _, bit := range topic { + matcher.addScheduler(bit) } } } + // Track the number of retrieval requests made + var requested uint32 - m.addresses = idxs[0] - m.topics = idxs[1:] - var reqCount uint32 + // Start the matching session for the filter and the retriver goroutines + quit := make(chan struct{}) + matches := make(chan uint64, 16) - stop := make(chan struct{}) - chn := m.Start(0, cnt-1) - testServeMatcher(m, stop, &reqCount, maxReqCount) + session, err := matcher.Start(0, blocks-1, matches) + if err != nil { + t.Fatalf("failed to stat matcher session: %v", err) + } + startRetrievers(session, quit, &requested, maxReqCount) - for i := uint64(0); i < cnt; i++ { - if expMatch3(idxs, i) { - match, ok := <-chn + // Iterate over all the blocks and verify that the pipeline produces the correct matches + for i := uint64(0); i < blocks; i++ { + if expMatch3(filter, i) { + match, ok := <-matches if !ok { - t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected #%v, results channel closed", idxs, cnt, stopOnMatches, i) + t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i) return 0 } if match != i { - t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected #%v, got #%v", idxs, cnt, stopOnMatches, i, match) + t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match) } - if stopOnMatches { - m.Stop() - close(stop) - stop = make(chan struct{}) - chn = m.Start(i+1, cnt-1) - testServeMatcher(m, stop, &reqCount, maxReqCount) + // If we're testing intermittent mode, abort and restart the pipeline + if intermittent { + session.Close(time.Second) + close(quit) + + quit = make(chan struct{}) + matches = make(chan uint64, 16) + + session, err = matcher.Start(i+1, blocks-1, matches) + if err != nil { + t.Fatalf("failed to stat matcher session: %v", err) + } + startRetrievers(session, quit, &requested, maxReqCount) } } } - match, ok := <-chn + // Ensure the result channel is torn down after the last block + match, ok := <-matches if ok { - t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: expected closed channel, got #%v", idxs, cnt, stopOnMatches, match) + t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) } - m.Stop() - close(stop) + // Clean up the session and ensure we match the expected retrieval count + session.Close(time.Second) + close(quit) - if expCount != 0 && expCount != reqCount { - t.Errorf("Error matching idxs = %v count = %v stopOnMatches = %v: request count mismatch, expected #%v, got #%v", idxs, cnt, stopOnMatches, expCount, reqCount) + if retrievals != 0 && requested != retrievals { + t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals) } + return requested +} + +// startRetrievers starts a batch of goroutines listening for section requests +// and serving them. +func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) { + requests := make(chan chan *Retrieval) + + for i := 0; i < 10; i++ { + // Start a multiplexer to test multiple threaded execution + go session.Multiplex(batch, 100*time.Microsecond, requests) - return reqCount + // Start a services to match the above multiplexer + go func() { + for { + // Wait for a service request or a shutdown + select { + case <-quit: + return + + case request := <-requests: + task := <-request + + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + if rand.Int()%4 != 0 { // Handle occasional missing deliveries + task.Bitsets[i] = generateBitset(task.Bit, section) + atomic.AddUint32(retrievals, 1) + } + } + request <- task + } + } + }() + } } -func testRandomIdxs(l []int, max int) [][]types.BloomIndexList { - res := make([][]types.BloomIndexList, len(l)) - for i, ll := range l { - res[i] = make([]types.BloomIndexList, ll) - for j, _ := range res[i] { - for k, _ := range res[i][j] { - res[i][j][k] = uint(rand.Intn(max-1) + 2) +// generateBitset generates the rotated bitset for the given bloom bit and section +// numbers. +func generateBitset(bit uint, section uint64) []byte { + bitset := make([]byte, testSectionSize/8) + for i := 0; i < len(bitset); i++ { + for b := 0; b < 8; b++ { + blockIdx := section*testSectionSize + uint64(i*8+b) + bitset[i] += bitset[i] + if (blockIdx % uint64(bit)) == 0 { + bitset[i]++ } } } - return res + return bitset } -func TestMatcher(t *testing.T) { - testMatcher(t, [][]types.BloomIndexList{{{10, 20, 30}}}, 100000, false, 75) - testMatcher(t, [][]types.BloomIndexList{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81) - testMatcher(t, [][]types.BloomIndexList{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36) +func expMatch1(filter bloomIndexes, i uint64) bool { + for _, ii := range filter { + if (i % uint64(ii)) != 0 { + return false + } + } + return true } -func TestMatcherStopOnMatches(t *testing.T) { - testMatcher(t, [][]types.BloomIndexList{{{10, 20, 30}}}, 100000, true, 75) - testMatcher(t, [][]types.BloomIndexList{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36) +func expMatch2(filter []bloomIndexes, i uint64) bool { + for _, ii := range filter { + if expMatch1(ii, i) { + return true + } + } + return false } -func TestMatcherRandom(t *testing.T) { - for i := 0; i < 20; i++ { - testMatcher(t, testRandomIdxs([]int{1}, 50), 100000, false, 0) - testMatcher(t, testRandomIdxs([]int{3}, 50), 100000, false, 0) - testMatcher(t, testRandomIdxs([]int{2, 2, 2}, 20), 100000, false, 0) - testMatcher(t, testRandomIdxs([]int{5, 5, 5}, 50), 100000, false, 0) - idxs := testRandomIdxs([]int{2, 2, 2}, 20) - reqCount := testMatcher(t, idxs, 10000, false, 0) - testMatcher(t, idxs, 10000, true, reqCount) +func expMatch3(filter [][]bloomIndexes, i uint64) bool { + for _, ii := range filter { + if !expMatch2(ii, i) { + return false + } } + return true } |