From 88b1db728826efd499ea407579f41e8b683d6b53 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 9 Oct 2017 12:40:50 +0200 Subject: accounts/keystore: scan key directory without locks held (#15171) The accountCache contains a file cache, and remembers from scan to scan what files were present earlier. Thus, whenever there's a change, the scan phase only bothers processing new and removed files. --- accounts/keystore/account_cache.go | 161 ++++++++++++++++++++++++++++--------- 1 file changed, 122 insertions(+), 39 deletions(-) (limited to 'accounts/keystore/account_cache.go') diff --git a/accounts/keystore/account_cache.go b/accounts/keystore/account_cache.go index dc6ac6ccb..4b08cc202 100644 --- a/accounts/keystore/account_cache.go +++ b/accounts/keystore/account_cache.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "gopkg.in/fatih/set.v0" ) // Minimum amount of time between cache reloads. This limit applies if the platform does @@ -71,6 +72,14 @@ type accountCache struct { byAddr map[common.Address][]accounts.Account throttle *time.Timer notify chan struct{} + fileC fileCache +} + +// fileCache is a cache of files seen during scan of keystore +type fileCache struct { + all *set.SetNonTS // list of all files + mtime time.Time // latest mtime seen + mu sync.RWMutex } func newAccountCache(keydir string) (*accountCache, chan struct{}) { @@ -78,6 +87,7 @@ func newAccountCache(keydir string) (*accountCache, chan struct{}) { keydir: keydir, byAddr: make(map[common.Address][]accounts.Account), notify: make(chan struct{}, 1), + fileC: fileCache{all: set.NewNonTS()}, } ac.watcher = newWatcher(ac) return ac, ac.notify @@ -127,6 +137,23 @@ func (ac *accountCache) delete(removed accounts.Account) { } } +// deleteByFile removes an account referenced by the given path. +func (ac *accountCache) deleteByFile(path string) { + ac.mu.Lock() + defer ac.mu.Unlock() + i := sort.Search(len(ac.all), func(i int) bool { return ac.all[i].URL.Path >= path }) + + if i < len(ac.all) && ac.all[i].URL.Path == path { + removed := ac.all[i] + ac.all = append(ac.all[:i], ac.all[i+1:]...) + if ba := removeAccount(ac.byAddr[removed.Address], removed); len(ba) == 0 { + delete(ac.byAddr, removed.Address) + } else { + ac.byAddr[removed.Address] = ba + } + } +} + func removeAccount(slice []accounts.Account, elem accounts.Account) []accounts.Account { for i := range slice { if slice[i] == elem { @@ -167,15 +194,16 @@ func (ac *accountCache) find(a accounts.Account) (accounts.Account, error) { default: err := &AmbiguousAddrError{Addr: a.Address, Matches: make([]accounts.Account, len(matches))} copy(err.Matches, matches) + sort.Sort(accountsByURL(err.Matches)) return accounts.Account{}, err } } func (ac *accountCache) maybeReload() { ac.mu.Lock() - defer ac.mu.Unlock() if ac.watcher.running { + ac.mu.Unlock() return // A watcher is running and will keep the cache up-to-date. } if ac.throttle == nil { @@ -184,12 +212,15 @@ func (ac *accountCache) maybeReload() { select { case <-ac.throttle.C: default: + ac.mu.Unlock() return // The cache was reloaded recently. } } + // No watcher running, start it. ac.watcher.start() - ac.reload() ac.throttle.Reset(minReloadInterval) + ac.mu.Unlock() + ac.scanAccounts() } func (ac *accountCache) close() { @@ -205,54 +236,76 @@ func (ac *accountCache) close() { ac.mu.Unlock() } -// reload caches addresses of existing accounts. -// Callers must hold ac.mu. -func (ac *accountCache) reload() { - accounts, err := ac.scan() +// scanFiles performs a new scan on the given directory, compares against the already +// cached filenames, and returns file sets: new, missing , modified +func (fc *fileCache) scanFiles(keyDir string) (set.Interface, set.Interface, set.Interface, error) { + t0 := time.Now() + files, err := ioutil.ReadDir(keyDir) + t1 := time.Now() if err != nil { - log.Debug("Failed to reload keystore contents", "err", err) + return nil, nil, nil, err } - ac.all = accounts - sort.Sort(ac.all) - for k := range ac.byAddr { - delete(ac.byAddr, k) - } - for _, a := range accounts { - ac.byAddr[a.Address] = append(ac.byAddr[a.Address], a) - } - select { - case ac.notify <- struct{}{}: - default: + fc.mu.RLock() + prevMtime := fc.mtime + fc.mu.RUnlock() + + filesNow := set.NewNonTS() + moddedFiles := set.NewNonTS() + var newMtime time.Time + for _, fi := range files { + modTime := fi.ModTime() + path := filepath.Join(keyDir, fi.Name()) + if skipKeyFile(fi) { + log.Trace("Ignoring file on account scan", "path", path) + continue + } + filesNow.Add(path) + if modTime.After(prevMtime) { + moddedFiles.Add(path) + } + if modTime.After(newMtime) { + newMtime = modTime + } } - log.Debug("Reloaded keystore contents", "accounts", len(ac.all)) + t2 := time.Now() + + fc.mu.Lock() + // Missing = previous - current + missing := set.Difference(fc.all, filesNow) + // New = current - previous + newFiles := set.Difference(filesNow, fc.all) + // Modified = modified - new + modified := set.Difference(moddedFiles, newFiles) + fc.all = filesNow + fc.mtime = newMtime + fc.mu.Unlock() + t3 := time.Now() + log.Debug("FS scan times", "list", t1.Sub(t0), "set", t2.Sub(t1), "diff", t3.Sub(t2)) + return newFiles, missing, modified, nil } -func (ac *accountCache) scan() ([]accounts.Account, error) { - files, err := ioutil.ReadDir(ac.keydir) +// scanAccounts checks if any changes have occurred on the filesystem, and +// updates the account cache accordingly +func (ac *accountCache) scanAccounts() error { + newFiles, missingFiles, modified, err := ac.fileC.scanFiles(ac.keydir) + t1 := time.Now() if err != nil { - return nil, err + log.Debug("Failed to reload keystore contents", "err", err) + return err } - var ( buf = new(bufio.Reader) - addrs []accounts.Account keyJSON struct { Address string `json:"address"` } ) - for _, fi := range files { - path := filepath.Join(ac.keydir, fi.Name()) - if skipKeyFile(fi) { - log.Trace("Ignoring file on account scan", "path", path) - continue - } - logger := log.New("path", path) - + readAccount := func(path string) *accounts.Account { fd, err := os.Open(path) if err != nil { - logger.Trace("Failed to open keystore file", "err", err) - continue + log.Trace("Failed to open keystore file", "path", path, "err", err) + return nil } + defer fd.Close() buf.Reset(fd) // Parse the address. keyJSON.Address = "" @@ -260,15 +313,45 @@ func (ac *accountCache) scan() ([]accounts.Account, error) { addr := common.HexToAddress(keyJSON.Address) switch { case err != nil: - logger.Debug("Failed to decode keystore key", "err", err) + log.Debug("Failed to decode keystore key", "path", path, "err", err) case (addr == common.Address{}): - logger.Debug("Failed to decode keystore key", "err", "missing or zero address") + log.Debug("Failed to decode keystore key", "path", path, "err", "missing or zero address") default: - addrs = append(addrs, accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}}) + return &accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}} } - fd.Close() + return nil } - return addrs, err + + for _, p := range newFiles.List() { + path, _ := p.(string) + a := readAccount(path) + if a != nil { + ac.add(*a) + } + } + for _, p := range missingFiles.List() { + path, _ := p.(string) + ac.deleteByFile(path) + } + + for _, p := range modified.List() { + path, _ := p.(string) + a := readAccount(path) + ac.deleteByFile(path) + if a != nil { + ac.add(*a) + } + } + + t2 := time.Now() + + select { + case ac.notify <- struct{}{}: + default: + } + log.Trace("Handled keystore changes", "time", t2.Sub(t1)) + + return nil } func skipKeyFile(fi os.FileInfo) bool { -- cgit