diff options
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r-- | core/chain_indexer.go | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 9a88a5b1b..56360b59a 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,7 +36,7 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64) + Reset(section uint64, lastSectionHead common.Hash) // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. @@ -44,7 +44,7 @@ type ChainIndexerBackend interface { // Commit finalizes the section metadata and stores it into the database. This // interface will usually be a batch writer. - Commit(db ethdb.Database) error + Commit() error } // ChainIndexer does a post-processing job for equally sized sections of the @@ -101,10 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken return c } +// AddKnownSectionHead marks a new section head as known/processed if it is newer +// than the already known best section head +func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + if section < c.storedSections { + return + } + c.setSectionHead(section, shead) + c.setValidSections(section + 1) +} + +// IndexerChain interface is used for connecting the indexer to a blockchain +type IndexerChain interface { + CurrentHeader() *types.Header + SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription +} + // Start creates a goroutine to feed chain head events into the indexer for -// cascading background processing. -func (c *ChainIndexer) Start(currentHeader *types.Header, eventMux *event.TypeMux) { - go c.eventLoop(currentHeader, eventMux) +// cascading background processing. Children do not need to be started, they +// are notified about new events by their parents. +func (c *ChainIndexer) Start(chain IndexerChain) { + ch := make(chan ChainEvent, 10) + sub := chain.SubscribeChainEvent(ch) + currentHeader := chain.CurrentHeader() + + go c.eventLoop(currentHeader, ch, sub) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -125,6 +149,14 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } } + + // Close all children + for _, child := range c.children { + if err := child.Close(); err != nil { + errs = append(errs, err) + } + } + // Return any failures switch { case len(errs) == 0: @@ -141,12 +173,10 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) - // Subscribe to chain head events - sub := eventMux.Subscribe(ChainEvent{}) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing @@ -163,14 +193,14 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.Ty errc <- nil return - case ev, ok := <-sub.Chan(): + case ev, ok := <-ch: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit errc <- nil return } - header := ev.Data.(ChainEvent).Block.Header() + header := ev.Block.Header() if header.ParentHash != prevHash { c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) } @@ -226,7 +256,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { // updateLoop is the main event loop of the indexer which pushes chain segments // down into the processing backend. func (c *ChainIndexer) updateLoop() { - var updated time.Time + var ( + updated time.Time + updateMsg bool + ) for { select { @@ -242,6 +275,7 @@ func (c *ChainIndexer) updateLoop() { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { + updateMsg = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() @@ -250,17 +284,24 @@ func (c *ChainIndexer) updateLoop() { section := c.storedSections var oldHead common.Hash if section > 0 { - oldHead = c.sectionHead(section - 1) + oldHead = c.SectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() newHead, err := c.processSection(section, oldHead) + if err != nil { + c.log.Error("Section processing failed", "error", err) + } c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed - if err == nil && oldHead == c.sectionHead(section-1) { + if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) + if c.storedSections == c.knownSections && updateMsg { + updateMsg = false + c.log.Info("Finished upgrading chain index") + } c.cascadedHead = c.storedSections*c.sectionSize - 1 for _, child := range c.children { @@ -295,7 +336,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - c.backend.Reset(section) + c.backend.Reset(section, lastHead) for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) @@ -311,7 +352,8 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.backend.Process(header) lastHead = header.Hash() } - if err := c.backend.Commit(c.chainDb); err != nil { + if err := c.backend.Commit(); err != nil { + c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil @@ -324,7 +366,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) } // AddChildIndexer adds a child ChainIndexer that can use the output of this one @@ -366,7 +408,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) { // sectionHead retrieves the last block hash of a processed section from the // index database. -func (c *ChainIndexer) sectionHead(section uint64) common.Hash { +func (c *ChainIndexer) SectionHead(section uint64) common.Hash { var data [8]byte binary.BigEndian.PutUint64(data[:], section) |