diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-08-04 00:25:06 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2017-08-07 22:38:33 +0800 |
commit | 8edaaa227db9dfc0a3993f9d33413771c7963330 (patch) | |
tree | 3011a93a134a8c912d4316a3cf520022203aa394 /core/chain_indexer.go | |
parent | bd74882d83509ba4a8477dd21b86d46bc7d12eb4 (diff) | |
download | go-tangerine-8edaaa227db9dfc0a3993f9d33413771c7963330.tar.gz go-tangerine-8edaaa227db9dfc0a3993f9d33413771c7963330.tar.zst go-tangerine-8edaaa227db9dfc0a3993f9d33413771c7963330.zip |
core: polish chain indexer a bit
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r-- | core/chain_indexer.go | 454 |
1 files changed, 278 insertions, 176 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go index f1ead526d..9a88a5b1b 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -14,261 +14,361 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. -// Package core implements the Ethereum consensus protocol. package core import ( "encoding/binary" + "fmt" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) -// ChainIndexer does a post-processing job for equally sized sections of the canonical -// chain (like BlooomBits and CHT structures). A ChainIndexer is connected to the blockchain -// through the event system by starting a ChainEventLoop in a goroutine. -// Further child ChainIndexers can be added which use the output of the parent section -// indexer. These child indexers receive new head notifications only after an entire section -// has been finished or in case of rollbacks that might affect already finished sections. -type ChainIndexer struct { - chainDb, indexDb ethdb.Database - backend ChainIndexerBackend - sectionSize, confirmReq uint64 - stop chan struct{} - lock sync.Mutex - procWait time.Duration - tryUpdate chan struct{} - stored, targetCount, calcIdx, lastForwarded uint64 - updating bool - children []*ChainIndexer +// ChainIndexerBackend defines the methods needed to process chain segments in +// the background and write the segment results into the database. These can be +// used to create filter blooms or CHTs. +type ChainIndexerBackend interface { + // Reset initiates the processing of a new chain segment, potentially terminating + // any partially completed operations (in case of a reorg). + Reset(section uint64) + + // Process crunches through the next header in the chain segment. The caller + // will ensure a sequential order of headers. + Process(header *types.Header) + + // Commit finalizes the section metadata and stores it into the database. This + // interface will usually be a batch writer. + Commit(db ethdb.Database) error } -// ChainIndexerBackend interface is a backend for the indexer doing the actual post-processing job -type ChainIndexerBackend interface { - Reset(section uint64) // start processing a new section - Process(header *types.Header) // process a single block (called for each block in the section) - Commit(db ethdb.Database) error // do some more processing if necessary and store the results in the database - UpdateMsg(done, all uint64) // print a progress update message if necessary (only called when multiple sections need to be processed) +// ChainIndexer does a post-processing job for equally sized sections of the +// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is +// connected to the blockchain through the event system by starting a +// ChainEventLoop in a goroutine. +// +// Further child ChainIndexers can be added which use the output of the parent +// section indexer. These child indexers receive new head notifications only +// after an entire section has been finished or in case of rollbacks that might +// affect already finished sections. +type ChainIndexer struct { + chainDb ethdb.Database // Chain database to index the data from + indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into + backend ChainIndexerBackend // Background processor generating the index data content + children []*ChainIndexer // Child indexers to cascade chain updates to + + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + + sectionSize uint64 // Number of blocks in a single chain segment to process + confirmsReq uint64 // Number of confirmations before processing a completed segment + + storedSections uint64 // Number of sections successfully indexed into the database + knownSections uint64 // Number of sections known to be complete (block wise) + cascadedHead uint64 // Block number of the last completed section cascaded to subindexers + + throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources + + log log.Logger + lock sync.RWMutex } -// NewChainIndexer creates a new ChainIndexer -// db: database where the index of available processed sections is stored (the index is stored by the -// indexer, the actual processed chain data is stored by the backend) -// dbKey: key prefix where the index is stored -// backend: an implementation of ChainIndexerBackend -// sectionSize: the size of processable sections -// confirmReq: required number of confirmation blocks before a new section is being processed -// procWait: waiting time between processing sections (simple way of limiting the resource usage of a db upgrade) -// stop: quit channel -func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, sectionSize, confirmReq uint64, procWait time.Duration, stop chan struct{}) *ChainIndexer { +// NewChainIndexer creates a new chain indexer to do background processing on +// chain segments of a given size after certain number of confirmations passed. +// The throttling parameter might be used to prevent database thrashing. +func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { c := &ChainIndexer{ chainDb: chainDb, indexDb: indexDb, backend: backend, - sectionSize: sectionSize, - confirmReq: confirmReq, - tryUpdate: make(chan struct{}, 1), - stop: stop, - procWait: procWait, + update: make(chan struct{}, 1), + quit: make(chan chan error), + sectionSize: section, + confirmsReq: confirm, + throttling: throttling, + log: log.New("type", kind), } - c.stored = c.getValidSections() + // Initialize database dependent fields and start the updater + c.loadValidSections() go c.updateLoop() + return c } -// updateLoop is the main event loop of the indexer -func (c *ChainIndexer) updateLoop() { - updateMsg := false - - for { - select { - case <-c.stop: - return - case <-c.tryUpdate: - c.lock.Lock() - if c.targetCount > c.stored { - if !updateMsg && c.targetCount > c.stored+1 { - updateMsg = true - c.backend.UpdateMsg(c.stored, c.targetCount) - } - c.calcIdx = c.stored +// Start creates a goroutine to feed chain head events into the indexer for +// cascading background processing. +func (c *ChainIndexer) Start(currentHeader *types.Header, eventMux *event.TypeMux) { + go c.eventLoop(currentHeader, eventMux) +} - var lastSectionHead common.Hash - if c.calcIdx > 0 { - lastSectionHead = c.getSectionHead(c.calcIdx - 1) - } +// Close tears down all goroutines belonging to the indexer and returns any error +// that might have occurred internally. +func (c *ChainIndexer) Close() error { + var errs []error - c.lock.Unlock() - sectionHead, ok := c.processSection(c.calcIdx, lastSectionHead) - c.lock.Lock() + // Tear down the primary update loop + errc := make(chan error) + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + // If needed, tear down the secondary event loop + if atomic.LoadUint32(&c.active) != 0 { + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + // Return any failures + switch { + case len(errs) == 0: + return nil - if ok && lastSectionHead == c.getSectionHead(c.calcIdx-1) { - c.stored = c.calcIdx + 1 - c.setSectionHead(c.calcIdx, sectionHead) - c.setValidSections(c.stored) - if updateMsg { - c.backend.UpdateMsg(c.stored, c.targetCount) - if c.stored >= c.targetCount { - updateMsg = false - } - } - c.lastForwarded = c.stored*c.sectionSize - 1 - for _, cp := range c.children { - cp.newHead(c.lastForwarded, false) - } - } else { - // if processing has failed, do not retry until further notification - c.targetCount = c.stored - } - } + case len(errs) == 1: + return errs[0] - if c.targetCount > c.stored { - go func() { - time.Sleep(c.procWait) - c.tryUpdate <- struct{}{} - }() - } else { - c.updating = false - } - c.lock.Unlock() - } + default: + return fmt.Errorf("%v", errs) } } -// ChainEventLoop runs in a goroutine and feeds blockchain events to the indexer by calling newHead -// (not needed for child indexers where the parent calls newHead) -func (c *ChainIndexer) ChainEventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { +// eventLoop is a secondary - optional - event loop of the indexer which is only +// started for the outermost indexer to push chain head events into a processing +// queue. +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, eventMux *event.TypeMux) { + // Mark the chain indexer as active, requiring an additional teardown + atomic.StoreUint32(&c.active, 1) + + // Subscribe to chain head events sub := eventMux.Subscribe(ChainEvent{}) + defer sub.Unsubscribe() + + // Fire the initial new head event to start any outstanding processing c.newHead(currentHeader.Number.Uint64(), false) - lastHead := currentHeader.Hash() + + var ( + prevHeader = currentHeader + prevHash = currentHeader.Hash() + ) for { select { - case <-c.stop: + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil return - case ev := <-sub.Chan(): + + case ev, ok := <-sub.Chan(): + // Received a new event, ensure it's not nil (closing) and update + if !ok { + errc := <-c.quit + errc <- nil + return + } header := ev.Data.(ChainEvent).Block.Header() - c.newHead(header.Number.Uint64(), header.ParentHash != lastHead) - lastHead = header.Hash() + if header.ParentHash != prevHash { + c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + } + c.newHead(header.Number.Uint64(), false) + + prevHeader, prevHash = header, header.Hash() } } } -// AddChildIndexer adds a child ChainIndexer that can use the output of this one -func (c *ChainIndexer) AddChildIndexer(ci *ChainIndexer) { - c.children = append(c.children, ci) -} - -// newHead notifies the indexer about new chain heads or rollbacks -func (c *ChainIndexer) newHead(headNum uint64, rollback bool) { +// newHead notifies the indexer about new chain heads and/or reorgs. +func (c *ChainIndexer) newHead(head uint64, reorg bool) { c.lock.Lock() defer c.lock.Unlock() - if rollback { - firstChanged := headNum / c.sectionSize - if firstChanged < c.targetCount { - c.targetCount = firstChanged + // If a reorg happened, invalidate all sections until that point + if reorg { + // Revert the known section number to the reorg point + changed := head / c.sectionSize + if changed < c.knownSections { + c.knownSections = changed } - if firstChanged < c.stored { - c.stored = firstChanged - c.setValidSections(c.stored) + // Revert the stored sections from the database to the reorg point + if changed < c.storedSections { + c.setValidSections(changed) } - headNum = firstChanged * c.sectionSize + // Update the new head number to te finalized section end and notify children + head = changed * c.sectionSize - if headNum < c.lastForwarded { - c.lastForwarded = headNum - for _, cp := range c.children { - cp.newHead(c.lastForwarded, true) + if head < c.cascadedHead { + c.cascadedHead = head + for _, child := range c.children { + child.newHead(c.cascadedHead, true) } } + return + } + // No reorg, calculate the number of newly known sections and update if high enough + var sections uint64 + if head >= c.confirmsReq { + sections = (head + 1 - c.confirmsReq) / c.sectionSize + if sections > c.knownSections { + c.knownSections = sections + + select { + case c.update <- struct{}{}: + default: + } + } + } +} + +// updateLoop is the main event loop of the indexer which pushes chain segments +// down into the processing backend. +func (c *ChainIndexer) updateLoop() { + var updated time.Time + + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case <-c.update: + // Section headers completed (or rolled back), update the index + c.lock.Lock() + if c.knownSections > c.storedSections { + // Periodically print an upgrade log message to the user + if time.Since(updated) > 8*time.Second { + if c.knownSections > c.storedSections+1 { + c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) + } + updated = time.Now() + } + // Cache the current section count and head to allow unlocking the mutex + section := c.storedSections + var oldHead common.Hash + if section > 0 { + oldHead = c.sectionHead(section - 1) + } + // Process the newly defined section in the background + c.lock.Unlock() + newHead, err := c.processSection(section, oldHead) + c.lock.Lock() + + // If processing succeeded and no reorgs occcurred, mark the section completed + if err == nil && oldHead == c.sectionHead(section-1) { + c.setSectionHead(section, newHead) + c.setValidSections(section + 1) - } else { - var newCount uint64 - if headNum >= c.confirmReq { - newCount = (headNum + 1 - c.confirmReq) / c.sectionSize - if newCount > c.targetCount { - c.targetCount = newCount - if !c.updating { - c.updating = true - c.tryUpdate <- struct{}{} + c.cascadedHead = c.storedSections*c.sectionSize - 1 + for _, child := range c.children { + c.log.Trace("Cascading chain index update", "head", c.cascadedHead) + child.newHead(c.cascadedHead, false) + } + } else { + // If processing failed, don't retry until further notification + c.log.Debug("Chain index processing failed", "section", section, "err", err) + c.knownSections = c.storedSections } } + // If there are still further sections to process, reschedule + if c.knownSections > c.storedSections { + time.AfterFunc(c.throttling, func() { + select { + case c.update <- struct{}{}: + default: + } + }) + } + c.lock.Unlock() } } } -// processSection processes an entire section by calling backend functions while ensuring -// the continuity of the passed headers. Since the chain mutex is not held while processing, -// the continuity can be broken by a long reorg, in which case the function returns with ok == false. -func (c *ChainIndexer) processSection(section uint64, lastSectionHead common.Hash) (sectionHead common.Hash, ok bool) { +// processSection processes an entire section by calling backend functions while +// ensuring the continuity of the passed headers. Since the chain mutex is not +// held while processing, the continuity can be broken by a long reorg, in which +// case the function returns with an error. +func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { + c.log.Trace("Processing new chain section", "section", section) + + // Reset and partial processing c.backend.Reset(section) - head := lastSectionHead - for i := section * c.sectionSize; i < (section+1)*c.sectionSize; i++ { - hash := GetCanonicalHash(c.chainDb, i) + for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { + hash := GetCanonicalHash(c.chainDb, number) if hash == (common.Hash{}) { - return common.Hash{}, false + return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) } - header := GetHeader(c.chainDb, hash, i) - if header == nil || header.ParentHash != head { - return common.Hash{}, false + header := GetHeader(c.chainDb, hash, number) + if header == nil { + return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) + } else if header.ParentHash != lastHead { + return common.Hash{}, fmt.Errorf("chain reorged during section processing") } c.backend.Process(header) - head = header.Hash() + lastHead = header.Hash() } if err := c.backend.Commit(c.chainDb); err != nil { - return common.Hash{}, false + return common.Hash{}, err } - return head, true + return lastHead, nil } -// CanonicalSections returns the number of processed sections that are consistent with -// the current canonical chain -func (c *ChainIndexer) CanonicalSections() uint64 { +// Sections returns the number of processed sections maintained by the indexer +// and also the information about the last header indexed for potential canonical +// verifications. +func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - cnt := c.getValidSections() - for cnt > 0 { - if c.getSectionHead(cnt-1) == GetCanonicalHash(c.chainDb, cnt*c.sectionSize-1) { - break - } - cnt-- - c.setValidSections(cnt) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) +} + +// AddChildIndexer adds a child ChainIndexer that can use the output of this one +func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { + c.lock.Lock() + defer c.lock.Unlock() + + c.children = append(c.children, indexer) + + // Cascade any pending updates to new children too + if c.storedSections > 0 { + indexer.newHead(c.storedSections*c.sectionSize-1, false) } - return cnt } -// getValidSections reads the number of valid sections from the index database -func (c *ChainIndexer) getValidSections() uint64 { +// loadValidSections reads the number of valid sections from the index database +// and caches is into the local state. +func (c *ChainIndexer) loadValidSections() { data, _ := c.indexDb.Get([]byte("count")) if len(data) == 8 { - return binary.BigEndian.Uint64(data[:]) + c.storedSections = binary.BigEndian.Uint64(data[:]) } - return 0 } // setValidSections writes the number of valid sections to the index database -func (c *ChainIndexer) setValidSections(cnt uint64) { - oldCnt := c.getValidSections() - if cnt < oldCnt { - for i := cnt; i < oldCnt; i++ { - c.removeSectionHead(i) - } - } - +func (c *ChainIndexer) setValidSections(sections uint64) { + // Set the current number of valid sections in the database var data [8]byte - binary.BigEndian.PutUint64(data[:], cnt) + binary.BigEndian.PutUint64(data[:], sections) c.indexDb.Put([]byte("count"), data[:]) + + // Remove any reorged sections, caching the valids in the mean time + for c.storedSections > sections { + c.storedSections-- + c.removeSectionHead(c.storedSections) + } + c.storedSections = sections // needed if new > old } -// getSectionHead reads the last block hash of a processed section from the index database -func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash { +// sectionHead retrieves the last block hash of a processed section from the +// index database. +func (c *ChainIndexer) sectionHead(section uint64) common.Hash { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) if len(hash) == len(common.Hash{}) { @@ -277,18 +377,20 @@ func (c *ChainIndexer) getSectionHead(idx uint64) common.Hash { return common.Hash{} } -// setSectionHead writes the last block hash of a processed section to the index database -func (c *ChainIndexer) setSectionHead(idx uint64, shead common.Hash) { +// setSectionHead writes the last block hash of a processed section to the index +// database. +func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) - c.indexDb.Put(append([]byte("shead"), data[:]...), shead.Bytes()) + c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) } -// removeSectionHead removes the reference to a processed section from the index database -func (c *ChainIndexer) removeSectionHead(idx uint64) { +// removeSectionHead removes the reference to a processed section from the index +// database. +func (c *ChainIndexer) removeSectionHead(section uint64) { var data [8]byte - binary.BigEndian.PutUint64(data[:], idx) + binary.BigEndian.PutUint64(data[:], section) c.indexDb.Delete(append([]byte("shead"), data[:]...)) } |