diff options
Diffstat (limited to 'trie/sync.go')
-rw-r--r-- | trie/sync.go | 74 |
1 files changed, 62 insertions, 12 deletions
diff --git a/trie/sync.go b/trie/sync.go index 168501392..9e8449431 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -28,6 +28,10 @@ import ( // node it did not request. var ErrNotRequested = errors.New("not requested") +// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a +// node it already processed previously. +var ErrAlreadyProcessed = errors.New("already processed") + // request represents a scheduled or already in-flight state retrieval request. type request struct { hash common.Hash // Hash of the node data content to retrieve @@ -48,6 +52,21 @@ type SyncResult struct { Data []byte // Data content of the retrieved node } +// syncMemBatch is an in-memory buffer of successfully downloaded but not yet +// persisted data items. +type syncMemBatch struct { + batch map[common.Hash][]byte // In-memory membatch of recently ocmpleted items + order []common.Hash // Order of completion to prevent out-of-order data loss +} + +// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes. +func newSyncMemBatch() *syncMemBatch { + return &syncMemBatch{ + batch: make(map[common.Hash][]byte), + order: make([]common.Hash, 0, 256), + } +} + // TrieSyncLeafCallback is a callback type invoked when a trie sync reaches a // leaf node. It's used by state syncing to check if the leaf node requires some // further data syncing. @@ -57,7 +76,8 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. type TrieSync struct { - database DatabaseReader + database DatabaseReader // Persistent database to check for existing entries + membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } @@ -66,6 +86,7 @@ type TrieSync struct { func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { ts := &TrieSync{ database: database, + membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), queue: prque.New(), } @@ -79,6 +100,9 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c if root == emptyRoot { return } + if _, ok := s.membatch.batch[root]; ok { + return + } key := root.Bytes() blob, _ := s.database.Get(key) if local, err := decodeNode(key, blob, 0); local != nil && err == nil { @@ -111,6 +135,9 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) if hash == emptyState { return } + if _, ok := s.membatch.batch[hash]; ok { + return + } if blob, _ := s.database.Get(hash.Bytes()); blob != nil { return } @@ -144,7 +171,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { +func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -153,10 +180,13 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, if request == nil { return committed, i, ErrNotRequested } + if request.data != nil { + return committed, i, ErrAlreadyProcessed + } // If the item is a raw entry request, commit directly if request.raw { request.data = item.Data - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -173,7 +203,7 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, i, err } if len(requests) == 0 && request.deps == 0 { - s.commit(request, dbw) + s.commit(request) committed = true continue } @@ -185,6 +215,22 @@ func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, return committed, 0, nil } +// Commit flushes the data stored in the internal membatch out to persistent +// storage, returning th enumber of items written and any occurred error. +func (s *TrieSync) Commit(dbw DatabaseWriter) (int, error) { + // Dump the membatch into a database dbw + for i, key := range s.membatch.order { + if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { + return i, err + } + } + written := len(s.membatch.order) + + // Drop the membatch data and return + s.membatch = newSyncMemBatch() + return written, nil +} + // Pending returns the number of state entries currently pending for download. func (s *TrieSync) Pending() int { return len(s.requests) @@ -246,13 +292,17 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // If the child references another node, resolve or schedule if node, ok := (child.node).(hashNode); ok { // Try to resolve the node from the local database + hash := common.BytesToHash(node) + if _, ok := s.membatch.batch[hash]; ok { + continue + } blob, _ := s.database.Get(node) if local, err := decodeNode(node[:], blob, 0); local != nil && err == nil { continue } // Locally unknown node, schedule for retrieval requests = append(requests, &request{ - hash: common.BytesToHash(node), + hash: hash, parents: []*request{req}, depth: child.depth, callback: req.callback, @@ -262,21 +312,21 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { return requests, nil } -// commit finalizes a retrieval request and stores it into the database. If any +// commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { - // Write the node content to disk - if err := dbw.Put(req.hash[:], req.data); err != nil { - return err - } +func (s *TrieSync) commit(req *request) (err error) { + // Write the node content to the membatch + s.membatch.batch[req.hash] = req.data + s.membatch.order = append(s.membatch.order, req.hash) + delete(s.requests, req.hash) // Check all parents for completion for _, parent := range req.parents { parent.deps-- if parent.deps == 0 { - if err := s.commit(parent, dbw); err != nil { + if err := s.commit(parent); err != nil { return err } } |