aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_indexer.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r--core/chain_indexer.go76
1 files changed, 59 insertions, 17 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 9a88a5b1b..56360b59a 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,7 +36,7 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64)
+ Reset(section uint64, lastSectionHead common.Hash)
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
@@ -44,7 +44,7 @@ type ChainIndexerBackend interface {
// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
- Commit(db ethdb.Database) error
+ Commit() error
}
// ChainIndexer does a post-processing job for equally sized sections of the
@@ -101,10 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}
+// AddKnownSectionHead marks a new section head as known/processed if it is newer
+// than the already known best section head
+func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if section < c.storedSections {
+ return
+ }
+ c.setSectionHead(section, shead)
+ c.setValidSections(section + 1)
+}
+
+// IndexerChain interface is used for connecting the indexer to a blockchain
+type IndexerChain interface {
+ CurrentHeader() *types.Header
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// Start creates a goroutine to feed chain head events into the indexer for
-// cascading background processing.
-func (c *ChainIndexer) Start(currentHeader *types.Header, eventMux *event.TypeMux) {
- go c.eventLoop(currentHeader, eventMux)
+// cascading background processing. Children do not need to be started, they
+// are notified about new events by their parents.
+func (c *ChainIndexer) Start(chain IndexerChain) {
+ ch := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(ch)
+ currentHeader := chain.CurrentHeader()
+
+ go c.eventLoop(currentHeader, ch, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -125,6 +149,14 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
+
+ // Close all children
+ for _, child := range c.children {
+ if err := child.Close(); err != nil {
+ errs = append(errs, err)
+ }
+ }
+
// Return any failures
switch {
case len(errs) == 0:
@@ -141,12 +173,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.TypeMux) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
- // Subscribe to chain head events
- sub := eventMux.Subscribe(ChainEvent{})
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
@@ -163,14 +193,14 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.Ty
errc <- nil
return
- case ev, ok := <-sub.Chan():
+ case ev, ok := <-ch:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
errc <- nil
return
}
- header := ev.Data.(ChainEvent).Block.Header()
+ header := ev.Block.Header()
if header.ParentHash != prevHash {
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
}
@@ -226,7 +256,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// updateLoop is the main event loop of the indexer which pushes chain segments
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
- var updated time.Time
+ var (
+ updated time.Time
+ updateMsg bool
+ )
for {
select {
@@ -242,6 +275,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
+ updateMsg = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -250,17 +284,24 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
- oldHead = c.sectionHead(section - 1)
+ oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
+ if err != nil {
+ c.log.Error("Section processing failed", "error", err)
+ }
c.lock.Lock()
// If processing succeeded and no reorgs occcurred, mark the section completed
- if err == nil && oldHead == c.sectionHead(section-1) {
+ if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
+ if c.storedSections == c.knownSections && updateMsg {
+ updateMsg = false
+ c.log.Info("Finished upgrading chain index")
+ }
c.cascadedHead = c.storedSections*c.sectionSize - 1
for _, child := range c.children {
@@ -295,7 +336,7 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
- c.backend.Reset(section)
+ c.backend.Reset(section, lastHead)
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
@@ -311,7 +352,8 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.backend.Process(header)
lastHead = header.Hash()
}
- if err := c.backend.Commit(c.chainDb); err != nil {
+ if err := c.backend.Commit(); err != nil {
+ c.log.Error("Section commit failed", "error", err)
return common.Hash{}, err
}
return lastHead, nil
@@ -324,7 +366,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
@@ -366,7 +408,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
// sectionHead retrieves the last block hash of a processed section from the
// index database.
-func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
+func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)