diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-10-24 21:19:09 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-10-24 21:19:09 +0800 |
commit | ca376ead88a5a26626a90abdb62f4de7f6313822 (patch) | |
tree | 71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core/chain_indexer.go | |
parent | 6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff) | |
download | go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst go-tangerine-ca376ead88a5a26626a90abdb62f4de7f6313822.zip |
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions:
* new and more efficient Merkle proofs reply format (when replying to
a multiple Merkle proofs request, we just send a single set of trie
nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
included in AnnounceMsg to provide an option for "very light
clients" (mobile/embedded devices) to skip expensive Ethash check
and accept multiple signatures of somewhat trusted servers (still a
lot better than trusting a single server completely and retrieving
everything through RPC). The new client mode is not implemented in
this PR, just the protocol extension.
Diffstat (limited to 'core/chain_indexer.go')
-rw-r--r-- | core/chain_indexer.go | 69 |
1 files changed, 50 insertions, 19 deletions
diff --git a/core/chain_indexer.go b/core/chain_indexer.go index f4c207dcc..837c908ab 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,13 +36,14 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64) + Reset(section uint64, lastSectionHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. Process(header *types.Header) - // Commit finalizes the section metadata and stores it into the database. + // Commit finalizes the section metadata and stores it into the database. This + // interface will usually be a batch writer. Commit() error } @@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken return c } +// AddKnownSectionHead marks a new section head as known/processed if it is newer +// than the already known best section head +func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + if section < c.storedSections { + return + } + c.setSectionHead(section, shead) + c.setValidSections(section + 1) +} + +// IndexerChain interface is used for connecting the indexer to a blockchain +type IndexerChain interface { + CurrentHeader() *types.Header + SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription +} + // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. -func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { - go c.eventLoop(currentHeader, chainEventer) +func (c *ChainIndexer) Start(chain IndexerChain) { + ch := make(chan ChainEvent, 10) + sub := chain.SubscribeChainEvent(ch) + currentHeader := chain.CurrentHeader() + + go c.eventLoop(currentHeader, ch, sub) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } } + // Close all children for _, child := range c.children { if err := child.Close(); err != nil { errs = append(errs, err) } } + // Return any failures switch { case len(errs) == 0: @@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) - events := make(chan ChainEvent, 10) - sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing @@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func( errc <- nil return - case ev, ok := <-events: + case ev, ok := <-ch: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit @@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func( } header := ev.Block.Header() if header.ParentHash != prevHash { - c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { + c.newHead(h.Number.Uint64(), true) + } } c.newHead(header.Number.Uint64(), false) @@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { // down into the processing backend. func (c *ChainIndexer) updateLoop() { var ( - updating bool - updated time.Time + updated time.Time + updateMsg bool ) + for { select { case errc := <-c.quit: @@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { - updating = true + updateMsg = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() @@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() { section := c.storedSections var oldHead common.Hash if section > 0 { - oldHead = c.sectionHead(section - 1) + oldHead = c.SectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() @@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() { c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed - if err == nil && oldHead == c.sectionHead(section-1) { + if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) - if c.storedSections == c.knownSections && updating { - updating = false + if c.storedSections == c.knownSections && updateMsg { + updateMsg = false c.log.Info("Finished upgrading chain index") } @@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - c.backend.Reset(section) + + if err := c.backend.Reset(section, lastHead); err != nil { + c.setValidSections(0) + return common.Hash{}, err + } for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) @@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) } // AddChildIndexer adds a child ChainIndexer that can use the output of this one @@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) { // sectionHead retrieves the last block hash of a processed section from the // index database. -func (c *ChainIndexer) sectionHead(section uint64) common.Hash { +func (c *ChainIndexer) SectionHead(section uint64) common.Hash { var data [8]byte binary.BigEndian.PutUint64(data[:], section) |