diff options
Diffstat (limited to 'core/chain_indexer_test.go')
-rw-r--r-- | core/chain_indexer_test.go | 251 |
1 files changed, 125 insertions, 126 deletions
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index 827976d51..780e46e43 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -14,11 +14,10 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -// Package core implements the Ethereum consensus protocol. package core import ( - "encoding/binary" + "fmt" "math/big" "math/rand" "testing" @@ -28,208 +27,208 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) +// Runs multiple tests with randomized parameters. func TestChainIndexerSingle(t *testing.T) { - // run multiple tests with randomized parameters for i := 0; i < 10; i++ { testChainIndexer(t, 1) } } +// Runs multiple tests with randomized parameters and different number of +// chain backends. func TestChainIndexerWithChildren(t *testing.T) { - // run multiple tests with randomized parameters and different number of - // chained indexers for i := 2; i < 8; i++ { testChainIndexer(t, i) } } -// testChainIndexer runs a test with either a single ChainIndexer or a chain of multiple indexers -// sectionSize and confirmReq parameters are randomized -func testChainIndexer(t *testing.T, tciCount int) { +// testChainIndexer runs a test with either a single chain indexer or a chain of +// multiple backends. The section size and required confirmation count parameters +// are randomized. +func testChainIndexer(t *testing.T, count int) { db, _ := ethdb.NewMemDatabase() - stop := make(chan struct{}) - tciList := make([]*testChainIndex, tciCount) - var lastIndexer *ChainIndexer - for i, _ := range tciList { - tci := &testChainIndex{t: t, sectionSize: uint64(rand.Intn(100) + 1), confirmReq: uint64(rand.Intn(10)), processCh: make(chan uint64)} - tciList[i] = tci - tci.indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), tci, tci.sectionSize, tci.confirmReq, 0, stop) - if cs := tci.indexer.CanonicalSections(); cs != 0 { - t.Errorf("Expected 0 canonical sections, got %d", cs) + defer db.Close() + + // Create a chain of indexers and ensure they all report empty + backends := make([]*testChainIndexBackend, count) + for i := 0; i < count; i++ { + var ( + sectionSize = uint64(rand.Intn(100) + 1) + confirmsReq = uint64(rand.Intn(10)) + ) + backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)} + backends[i].indexer = NewChainIndexer(db, ethdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i)) + defer backends[i].indexer.Close() + + if sections, _, _ := backends[i].indexer.Sections(); sections != 0 { + t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0) } - if lastIndexer != nil { - lastIndexer.AddChildIndexer(tci.indexer) + if i > 0 { + backends[i-1].indexer.AddChildIndexer(backends[i].indexer) } - lastIndexer = tci.indexer } - - // expectCs expects a certain number of available canonical sections - expectCs := func(indexer *ChainIndexer, expCs uint64) { - cnt := 0 - for { - cs := indexer.CanonicalSections() - if cs == expCs { - return + // notify pings the root indexer about a new head or reorg, then expect + // processed blocks if a section is processable + notify := func(headNum, failNum uint64, reorg bool) { + backends[0].indexer.newHead(headNum, reorg) + if reorg { + for _, backend := range backends { + headNum = backend.reorg(headNum) + backend.assertSections() } - // keep trying for 10 seconds if it does not match - cnt++ - if cnt == 10000 { - t.Fatalf("Expected %d canonical sections, got %d", expCs, cs) + return + } + var cascade bool + for _, backend := range backends { + headNum, cascade = backend.assertBlocks(headNum, failNum) + if !cascade { + break } - time.Sleep(time.Millisecond) + backend.assertSections() } } - - // notify the indexer about a new head or rollback, then expect processed blocks if a section is processable - notify := func(headNum, expFailAfter uint64, rollback bool) { - tciList[0].indexer.newHead(headNum, rollback) - if rollback { - for _, tci := range tciList { - headNum = tci.rollback(headNum) - expectCs(tci.indexer, tci.stored) - } - } else { - for _, tci := range tciList { - var more bool - headNum, more = tci.newBlocks(headNum, expFailAfter) - if !more { - break - } - expectCs(tci.indexer, tci.stored) - } + // inject inserts a new random canonical header into the database directly + inject := func(number uint64) { + header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()} + if number > 0 { + header.ParentHash = GetCanonicalHash(db, number-1) } + WriteHeader(db, header) + WriteCanonicalHash(db, header.Hash(), number) } - + // Start indexer with an already existing chain for i := uint64(0); i <= 100; i++ { - testCanonicalHeader(db, i) + inject(i) } - // start indexer with an already existing chain notify(100, 100, false) - // add new blocks one by one + + // Add new blocks one by one for i := uint64(101); i <= 1000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - // do a rollback + // Do a reorg notify(500, 500, true) - // create new fork + + // Create new fork for i := uint64(501); i <= 1000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - for i := uint64(1001); i <= 1500; i++ { - testCanonicalHeader(db, i) + inject(i) } - // create a failed processing scenario where less blocks are available at processing time than notified + // Failed processing scenario where less blocks are available than notified notify(2000, 1500, false) - // notify about a rollback (which could have caused the missing blocks if happened during processing) + + // Notify about a reorg (which could have caused the missing blocks if happened during processing) notify(1500, 1500, true) - // create new fork + // Create new fork for i := uint64(1501); i <= 2000; i++ { - testCanonicalHeader(db, i) + inject(i) notify(i, i, false) } - close(stop) - db.Close() } -func testCanonicalHeader(db ethdb.Database, idx uint64) { - var rnd [8]byte - binary.BigEndian.PutUint64(rnd[:], uint64(rand.Int63())) - header := &types.Header{Number: big.NewInt(int64(idx)), Extra: rnd[:]} - if idx > 0 { - header.ParentHash = GetCanonicalHash(db, idx-1) - } - WriteHeader(db, header) - WriteCanonicalHash(db, header.Hash(), idx) -} - -// testChainIndex implements ChainIndexerBackend -type testChainIndex struct { +// testChainIndexBackend implements ChainIndexerBackend +type testChainIndexBackend struct { t *testing.T - sectionSize, confirmReq uint64 - section, headerCnt, stored uint64 indexer *ChainIndexer + section, headerCnt, stored uint64 processCh chan uint64 } -// newBlocks expects process calls after new blocks have arrived. If expFailAfter < headNum then -// we are simulating a scenario where a rollback has happened after the processing has started and -// the processing of a section fails. -func (t *testChainIndex) newBlocks(headNum, expFailAfter uint64) (uint64, bool) { - var newCount uint64 - if headNum >= t.confirmReq { - newCount = (headNum + 1 - t.confirmReq) / t.sectionSize - if newCount > t.stored { +// assertSections verifies if a chain indexer has the correct number of section. +func (b *testChainIndexBackend) assertSections() { + // Keep trying for 3 seconds if it does not match + var sections uint64 + for i := 0; i < 300; i++ { + sections, _, _ = b.indexer.Sections() + if sections == b.stored { + return + } + time.Sleep(10 * time.Millisecond) + } + b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored) +} + +// assertBlocks expects processing calls after new blocks have arrived. If the +// failNum < headNum then we are simulating a scenario where a reorg has happened +// after the processing has started and the processing of a section fails. +func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) { + var sections uint64 + if headNum >= b.indexer.confirmsReq { + sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize + if sections > b.stored { // expect processed blocks - for exp := t.stored * t.sectionSize; exp < newCount*t.sectionSize; exp++ { - if exp > expFailAfter { + for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ { + if expectd > failNum { // rolled back after processing started, no more process calls expected // wait until updating is done to make sure that processing actually fails - for { - t.indexer.lock.Lock() - u := t.indexer.updating - t.indexer.lock.Unlock() - if !u { + var updating bool + for i := 0; i < 300; i++ { + b.indexer.lock.Lock() + updating = b.indexer.knownSections > b.indexer.storedSections + b.indexer.lock.Unlock() + if !updating { break } - time.Sleep(time.Millisecond) + time.Sleep(10 * time.Millisecond) } - - newCount = exp / t.sectionSize + if updating { + b.t.Fatalf("update did not finish") + } + sections = expectd / b.indexer.sectionSize break } select { case <-time.After(10 * time.Second): - t.t.Fatalf("Expected processed block #%d, got nothing", exp) - case proc := <-t.processCh: - if proc != exp { - t.t.Errorf("Expected processed block #%d, got #%d", exp, proc) + b.t.Fatalf("Expected processed block #%d, got nothing", expectd) + case processed := <-b.processCh: + if processed != expectd { + b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed) } } } - t.stored = newCount + b.stored = sections } } - if t.stored == 0 { + if b.stored == 0 { return 0, false } - return t.stored*t.sectionSize - 1, true + return b.stored*b.indexer.sectionSize - 1, true } -func (t *testChainIndex) rollback(headNum uint64) uint64 { - firstChanged := headNum / t.sectionSize - if firstChanged < t.stored { - t.stored = firstChanged +func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { + firstChanged := headNum / b.indexer.sectionSize + if firstChanged < b.stored { + b.stored = firstChanged } - return t.stored * t.sectionSize + return b.stored * b.indexer.sectionSize } -func (t *testChainIndex) Reset(section uint64) { - t.section = section - t.headerCnt = 0 +func (b *testChainIndexBackend) Reset(section uint64) { + b.section = section + b.headerCnt = 0 } -func (t *testChainIndex) Process(header *types.Header) { - t.headerCnt++ - if t.headerCnt > t.sectionSize { - t.t.Error("Processing too many headers") +func (b *testChainIndexBackend) Process(header *types.Header) { + b.headerCnt++ + if b.headerCnt > b.indexer.sectionSize { + b.t.Error("Processing too many headers") } //t.processCh <- header.Number.Uint64() select { case <-time.After(10 * time.Second): - t.t.Fatal("Unexpected call to Process") - case t.processCh <- header.Number.Uint64(): + b.t.Fatal("Unexpected call to Process") + case b.processCh <- header.Number.Uint64(): } } -func (t *testChainIndex) Commit(db ethdb.Database) error { - if t.headerCnt != t.sectionSize { - t.t.Error("Not enough headers processed") +func (b *testChainIndexBackend) Commit(db ethdb.Database) error { + if b.headerCnt != b.indexer.sectionSize { + b.t.Error("Not enough headers processed") } return nil } - -func (t *testChainIndex) UpdateMsg(done, all uint64) {} |