diff options
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r-- | core/chain_indexer.go | 60 |
1 files changed, 46 insertions, 14 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go index f4c207dcc..7fb184aaa 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,7 +36,7 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64) + Reset(section uint64, prevHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. @@ -46,6 +46,15 @@ type ChainIndexerBackend interface { Commit() error } +// ChainIndexerChain interface is used for connecting the indexer to a blockchain +type ChainIndexerChain interface { + // CurrentHeader retrieves the latest locally known header. + CurrentHeader() *types.Header + + // SubscribeChainEvent subscribes to new head header notifications. + SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription +} + // ChainIndexer does a post-processing job for equally sized sections of the // canonical chain (like BlooomBits and CHT structures). A ChainIndexer is // connected to the blockchain through the event system by starting a @@ -100,11 +109,27 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken return c } +// AddKnownSectionHead marks a new section head as known/processed if it is newer +// than the already known best section head +func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + if section < c.storedSections { + return + } + c.setSectionHead(section, shead) + c.setValidSections(section + 1) +} + // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. -func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { - go c.eventLoop(currentHeader, chainEventer) +func (c *ChainIndexer) Start(chain ChainIndexerChain) { + events := make(chan ChainEvent, 10) + sub := chain.SubscribeChainEvent(events) + + go c.eventLoop(chain.CurrentHeader(), events, sub) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -147,12 +172,10 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) - events := make(chan ChainEvent, 10) - sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing @@ -178,7 +201,11 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func( } header := ev.Block.Header() if header.ParentHash != prevHash { - c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then) + // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? + if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { + c.newHead(h.Number.Uint64(), true) + } } c.newHead(header.Number.Uint64(), false) @@ -203,7 +230,7 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { if changed < c.storedSections { c.setValidSections(changed) } - // Update the new head number to te finalized section end and notify children + // Update the new head number to the finalized section end and notify children head = changed * c.sectionSize if head < c.cascadedHead { @@ -236,6 +263,7 @@ func (c *ChainIndexer) updateLoop() { updating bool updated time.Time ) + for { select { case errc := <-c.quit: @@ -259,7 +287,7 @@ func (c *ChainIndexer) updateLoop() { section := c.storedSections var oldHead common.Hash if section > 0 { - oldHead = c.sectionHead(section - 1) + oldHead = c.SectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() @@ -270,7 +298,7 @@ func (c *ChainIndexer) updateLoop() { c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed - if err == nil && oldHead == c.sectionHead(section-1) { + if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) if c.storedSections == c.knownSections && updating { @@ -311,7 +339,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - c.backend.Reset(section) + + if err := c.backend.Reset(section, lastHead); err != nil { + c.setValidSections(0) + return common.Hash{}, err + } for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) @@ -341,7 +373,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) } // AddChildIndexer adds a child ChainIndexer that can use the output of this one @@ -381,9 +413,9 @@ func (c *ChainIndexer) setValidSections(sections uint64) { c.storedSections = sections // needed if new > old } -// sectionHead retrieves the last block hash of a processed section from the +// SectionHead retrieves the last block hash of a processed section from the // index database. -func (c *ChainIndexer) sectionHead(section uint64) common.Hash { +func (c *ChainIndexer) SectionHead(section uint64) common.Hash { var data [8]byte binary.BigEndian.PutUint64(data[:], section) |