diff options
author | gary rong <garyrong0905@gmail.com> | 2018-07-12 16:07:51 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-07-12 16:07:51 +0800 |
commit | e8824f6e7409a2064b1165c5b6cc6b2d390573ce (patch) | |
tree | 6b8a57248492cb0253c21ee87b5932ecd6cc0ab0 /vendor/github.com | |
parent | a9835c1816bc49ee54c82b4f2a5b05cbcd89881b (diff) | |
download | dexon-e8824f6e7409a2064b1165c5b6cc6b2d390573ce.tar.gz dexon-e8824f6e7409a2064b1165c5b6cc6b2d390573ce.tar.zst dexon-e8824f6e7409a2064b1165c5b6cc6b2d390573ce.zip |
vendor, ethdb: resume write operation asap (#17144)
* vendor: update leveldb
* ethdb: remove useless warning log
Diffstat (limited to 'vendor/github.com')
4 files changed, 243 insertions, 117 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index b6563e87e..28e50906a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool { return v.needCompaction() } +// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted. +func (db *DB) resumeWrite() bool { + v := db.s.version() + defer v.release() + if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() { + return true + } + return false +} + func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: @@ -653,6 +663,7 @@ type cCmd interface { } type cAuto struct { + // Note for table compaction, an empty ackC represents it's a compaction waiting command. ackC chan<- error } @@ -765,8 +776,10 @@ func (db *DB) mCompaction() { } func (db *DB) tCompaction() { - var x cCmd - var ackQ []cCmd + var ( + x cCmd + ackQ, waitQ []cCmd + ) defer func() { if x := recover(); x != nil { @@ -778,6 +791,10 @@ func (db *DB) tCompaction() { ackQ[i].ack(ErrClosed) ackQ[i] = nil } + for i := range waitQ { + waitQ[i].ack(ErrClosed) + waitQ[i] = nil + } if x != nil { x.ack(ErrClosed) } @@ -795,12 +812,25 @@ func (db *DB) tCompaction() { return default: } + // Resume write operation as soon as possible. + if len(waitQ) > 0 && db.resumeWrite() { + for i := range waitQ { + waitQ[i].ack(nil) + waitQ[i] = nil + } + waitQ = waitQ[:0] + } } else { for i := range ackQ { ackQ[i].ack(nil) ackQ[i] = nil } ackQ = ackQ[:0] + for i := range waitQ { + waitQ[i].ack(nil) + waitQ[i] = nil + } + waitQ = waitQ[:0] select { case x = <-db.tcompCmdC: case ch := <-db.tcompPauseC: @@ -813,7 +843,11 @@ func (db *DB) tCompaction() { if x != nil { switch cmd := x.(type) { case cAuto: - ackQ = append(ackQ, x) + if cmd.ackC != nil { + waitQ = append(waitQ, x) + } else { + ackQ = append(ackQ, x) + } case cRange: x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) default: diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index 1189decac..9ba71fd6d 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -9,10 +9,12 @@ package storage import ( "errors" "fmt" + "io" "io/ioutil" "os" "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -42,6 +44,30 @@ func (lock *fileStorageLock) Unlock() { } } +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func writeFileSynced(filename string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + n, err := f.Write(data) + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + if err1 := f.Sync(); err == nil { + err = err1 + } + if err1 := f.Close(); err == nil { + err = err1 + } + return err +} + const logSizeThreshold = 1024 * 1024 // 1 MiB // fileStorage is a file-system backed storage. @@ -60,7 +86,7 @@ type fileStorage struct { day int } -// OpenFile returns a new filesytem-backed storage implementation with the given +// OpenFile returns a new filesystem-backed storage implementation with the given // path. This also acquire a file lock, so any subsequent attempt to open the // same path will fail. // @@ -189,7 +215,8 @@ func (fs *fileStorage) doLog(t time.Time, str string) { // write fs.buf = append(fs.buf, []byte(str)...) fs.buf = append(fs.buf, '\n') - fs.logw.Write(fs.buf) + n, _ := fs.logw.Write(fs.buf) + fs.logSize += int64(n) } func (fs *fileStorage) Log(str string) { @@ -210,7 +237,46 @@ func (fs *fileStorage) log(str string) { } } -func (fs *fileStorage) SetMeta(fd FileDesc) (err error) { +func (fs *fileStorage) setMeta(fd FileDesc) error { + content := fsGenName(fd) + "\n" + // Check and backup old CURRENT file. + currentPath := filepath.Join(fs.path, "CURRENT") + if _, err := os.Stat(currentPath); err == nil { + b, err := ioutil.ReadFile(currentPath) + if err != nil { + fs.log(fmt.Sprintf("backup CURRENT: %v", err)) + return err + } + if string(b) == content { + // Content not changed, do nothing. + return nil + } + if err := writeFileSynced(currentPath+".bak", b, 0644); err != nil { + fs.log(fmt.Sprintf("backup CURRENT: %v", err)) + return err + } + } else if !os.IsNotExist(err) { + return err + } + path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num) + if err := writeFileSynced(path, []byte(content), 0644); err != nil { + fs.log(fmt.Sprintf("create CURRENT.%d: %v", fd.Num, err)) + return err + } + // Replace CURRENT file. + if err := rename(path, currentPath); err != nil { + fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err)) + return err + } + // Sync root directory. + if err := syncDir(fs.path); err != nil { + fs.log(fmt.Sprintf("syncDir: %v", err)) + return err + } + return nil +} + +func (fs *fileStorage) SetMeta(fd FileDesc) error { if !FileDescOk(fd) { return ErrInvalidFile } @@ -223,44 +289,10 @@ func (fs *fileStorage) SetMeta(fd FileDesc) (err error) { if fs.open < 0 { return ErrClosed } - defer func() { - if err != nil { - fs.log(fmt.Sprintf("CURRENT: %v", err)) - } - }() - path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num) - w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return - } - _, err = fmt.Fprintln(w, fsGenName(fd)) - if err != nil { - fs.log(fmt.Sprintf("write CURRENT.%d: %v", fd.Num, err)) - return - } - if err = w.Sync(); err != nil { - fs.log(fmt.Sprintf("flush CURRENT.%d: %v", fd.Num, err)) - return - } - if err = w.Close(); err != nil { - fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, err)) - return - } - if err != nil { - return - } - if err = rename(path, filepath.Join(fs.path, "CURRENT")); err != nil { - fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err)) - return - } - // Sync root directory. - if err = syncDir(fs.path); err != nil { - fs.log(fmt.Sprintf("syncDir: %v", err)) - } - return + return fs.setMeta(fd) } -func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { +func (fs *fileStorage) GetMeta() (FileDesc, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { @@ -268,7 +300,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { } dir, err := os.Open(fs.path) if err != nil { - return + return FileDesc{}, err } names, err := dir.Readdirnames(0) // Close the dir first before checking for Readdirnames error. @@ -276,94 +308,134 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { fs.log(fmt.Sprintf("close dir: %v", ce)) } if err != nil { - return - } - // Find latest CURRENT file. - var rem []string - var pend bool - var cerr error - for _, name := range names { - if strings.HasPrefix(name, "CURRENT") { - pend1 := len(name) > 7 - var pendNum int64 - // Make sure it is valid name for a CURRENT file, otherwise skip it. - if pend1 { - if name[7] != '.' || len(name) < 9 { - fs.log(fmt.Sprintf("skipping %s: invalid file name", name)) - continue - } - var e1 error - if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil { - fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1)) - continue - } + return FileDesc{}, err + } + // Try this in order: + // - CURRENT.[0-9]+ ('pending rename' file, descending order) + // - CURRENT + // - CURRENT.bak + // + // Skip corrupted file or file that point to a missing target file. + type currentFile struct { + name string + fd FileDesc + } + tryCurrent := func(name string) (*currentFile, error) { + b, err := ioutil.ReadFile(filepath.Join(fs.path, name)) + if err != nil { + if os.IsNotExist(err) { + err = os.ErrNotExist } - path := filepath.Join(fs.path, name) - r, e1 := os.OpenFile(path, os.O_RDONLY, 0) - if e1 != nil { - return FileDesc{}, e1 + return nil, err + } + var fd FileDesc + if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd) { + fs.log(fmt.Sprintf("%s: corrupted content: %q", name, b)) + err := &ErrCorrupted{ + Err: errors.New("leveldb/storage: corrupted or incomplete CURRENT file"), } - b, e1 := ioutil.ReadAll(r) - if e1 != nil { - r.Close() - return FileDesc{}, e1 + return nil, err + } + if _, err := os.Stat(filepath.Join(fs.path, fsGenName(fd))); err != nil { + if os.IsNotExist(err) { + fs.log(fmt.Sprintf("%s: missing target file: %s", name, fd)) + err = os.ErrNotExist } - var fd1 FileDesc - if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) { - fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name)) - if pend1 { - rem = append(rem, name) - } - if !pend1 || cerr == nil { - metaFd, _ := fsParseName(name) - cerr = &ErrCorrupted{ - Fd: metaFd, - Err: errors.New("leveldb/storage: corrupted or incomplete meta file"), - } - } - } else if pend1 && pendNum != fd1.Num { - fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num)) - rem = append(rem, name) - } else if fd1.Num < fd.Num { - fs.log(fmt.Sprintf("skipping %s: obsolete", name)) - if pend1 { - rem = append(rem, name) - } + return nil, err + } + return ¤tFile{name: name, fd: fd}, nil + } + tryCurrents := func(names []string) (*currentFile, error) { + var ( + cur *currentFile + // Last corruption error. + lastCerr error + ) + for _, name := range names { + var err error + cur, err = tryCurrent(name) + if err == nil { + break + } else if err == os.ErrNotExist { + // Fallback to the next file. + } else if isCorrupted(err) { + lastCerr = err + // Fallback to the next file. } else { - fd = fd1 - pend = pend1 + // In case the error is due to permission, etc. + return nil, err } - if err := r.Close(); err != nil { - fs.log(fmt.Sprintf("close %s: %v", name, err)) + } + if cur == nil { + err := os.ErrNotExist + if lastCerr != nil { + err = lastCerr } + return nil, err } + return cur, nil } - // Don't remove any files if there is no valid CURRENT file. - if fd.Zero() { - if cerr != nil { - err = cerr - } else { - err = os.ErrNotExist + + // Try 'pending rename' files. + var nums []int64 + for _, name := range names { + if strings.HasPrefix(name, "CURRENT.") && name != "CURRENT.bak" { + i, err := strconv.ParseInt(name[8:], 10, 64) + if err == nil { + nums = append(nums, i) + } } - return } - if !fs.readOnly { - // Rename pending CURRENT file to an effective CURRENT. - if pend { - path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num) - if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil { - fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err)) - } + var ( + pendCur *currentFile + pendErr = os.ErrNotExist + pendNames []string + ) + if len(nums) > 0 { + sort.Sort(sort.Reverse(int64Slice(nums))) + pendNames = make([]string, len(nums)) + for i, num := range nums { + pendNames[i] = fmt.Sprintf("CURRENT.%d", num) } - // Remove obsolete or incomplete pending CURRENT files. - for _, name := range rem { - path := filepath.Join(fs.path, name) - if err := os.Remove(path); err != nil { - fs.log(fmt.Sprintf("remove %s: %v", name, err)) + pendCur, pendErr = tryCurrents(pendNames) + if pendErr != nil && pendErr != os.ErrNotExist && !isCorrupted(pendErr) { + return FileDesc{}, pendErr + } + } + + // Try CURRENT and CURRENT.bak. + curCur, curErr := tryCurrents([]string{"CURRENT", "CURRENT.bak"}) + if curErr != nil && curErr != os.ErrNotExist && !isCorrupted(curErr) { + return FileDesc{}, curErr + } + + // pendCur takes precedence, but guards against obsolete pendCur. + if pendCur != nil && (curCur == nil || pendCur.fd.Num > curCur.fd.Num) { + curCur = pendCur + } + + if curCur != nil { + // Restore CURRENT file to proper state. + if !fs.readOnly && (curCur.name != "CURRENT" || len(pendNames) != 0) { + // Ignore setMeta errors, however don't delete obsolete files if we + // catch error. + if err := fs.setMeta(curCur.fd); err == nil { + // Remove 'pending rename' files. + for _, name := range pendNames { + if err := os.Remove(filepath.Join(fs.path, name)); err != nil { + fs.log(fmt.Sprintf("remove %s: %v", name, err)) + } + } } } + return curCur.fd, nil } - return + + // Nothing found. + if isCorrupted(pendErr) { + return FileDesc{}, pendErr + } + return FileDesc{}, curErr } func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go index 7e2991537..d75f66a9e 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go @@ -67,13 +67,25 @@ func isErrInvalid(err error) bool { if err == os.ErrInvalid { return true } + // Go < 1.8 if syserr, ok := err.(*os.SyscallError); ok && syserr.Err == syscall.EINVAL { return true } + // Go >= 1.8 returns *os.PathError instead + if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL { + return true + } return false } func syncDir(name string) error { + // As per fsync manpage, Linux seems to expect fsync on directory, however + // some system don't support this, so we will ignore syscall.EINVAL. + // + // From fsync(2): + // Calling fsync() does not necessarily ensure that the entry in the + // directory containing the file has also reached disk. For that an + // explicit fsync() on a file descriptor for the directory is also needed. f, err := os.Open(name) if err != nil { return err diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go index c16bce6b6..4e4a72425 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -55,6 +55,14 @@ type ErrCorrupted struct { Err error } +func isCorrupted(err error) bool { + switch err.(type) { + case *ErrCorrupted: + return true + } + return false +} + func (e *ErrCorrupted) Error() string { if !e.Fd.Zero() { return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) |