aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r--core/chain_indexer.go60
1 files changed, 46 insertions, 14 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index f4c207dcc..7fb184aaa 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,7 +36,7 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64)
+ Reset(section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
@@ -46,6 +46,15 @@ type ChainIndexerBackend interface {
Commit() error
}
+// ChainIndexerChain interface is used for connecting the indexer to a blockchain
+type ChainIndexerChain interface {
+ // CurrentHeader retrieves the latest locally known header.
+ CurrentHeader() *types.Header
+
+ // SubscribeChainEvent subscribes to new head header notifications.
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
@@ -100,11 +109,27 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}
+// AddKnownSectionHead marks a new section head as known/processed if it is newer
+// than the already known best section head
+func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if section < c.storedSections {
+ return
+ }
+ c.setSectionHead(section, shead)
+ c.setValidSections(section + 1)
+}
+
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
- go c.eventLoop(currentHeader, chainEventer)
+func (c *ChainIndexer) Start(chain ChainIndexerChain) {
+ events := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(events)
+
+ go c.eventLoop(chain.CurrentHeader(), events, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -147,12 +172,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
- events := make(chan ChainEvent, 10)
- sub := chainEventer(events)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
@@ -178,7 +201,11 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
- c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
+ // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
+ // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
+ if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
+ c.newHead(h.Number.Uint64(), true)
+ }
}
c.newHead(header.Number.Uint64(), false)
@@ -203,7 +230,7 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
if changed < c.storedSections {
c.setValidSections(changed)
}
- // Update the new head number to te finalized section end and notify children
+ // Update the new head number to the finalized section end and notify children
head = changed * c.sectionSize
if head < c.cascadedHead {
@@ -236,6 +263,7 @@ func (c *ChainIndexer) updateLoop() {
updating bool
updated time.Time
)
+
for {
select {
case errc := <-c.quit:
@@ -259,7 +287,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
- oldHead = c.sectionHead(section - 1)
+ oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
@@ -270,7 +298,7 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()
// If processing succeeded and no reorgs occcurred, mark the section completed
- if err == nil && oldHead == c.sectionHead(section-1) {
+ if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
@@ -311,7 +339,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
- c.backend.Reset(section)
+
+ if err := c.backend.Reset(section, lastHead); err != nil {
+ c.setValidSections(0)
+ return common.Hash{}, err
+ }
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
@@ -341,7 +373,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
@@ -381,9 +413,9 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
c.storedSections = sections // needed if new > old
}
-// sectionHead retrieves the last block hash of a processed section from the
+// SectionHead retrieves the last block hash of a processed section from the
// index database.
-func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
+func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)