diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go | 565 |
1 files changed, 307 insertions, 258 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index 46cc9d070..cbe1dc103 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -17,11 +17,12 @@ import ( "strings" "sync" "time" - - "github.com/syndtr/goleveldb/leveldb/util" ) -var errFileOpen = errors.New("leveldb/storage: file still open") +var ( + errFileOpen = errors.New("leveldb/storage: file still open") + errReadOnly = errors.New("leveldb/storage: storage is read-only") +) type fileLock interface { release() error @@ -32,40 +33,52 @@ type fileStorageLock struct { } func (lock *fileStorageLock) Release() { - fs := lock.fs - fs.mu.Lock() - defer fs.mu.Unlock() - if fs.slock == lock { - fs.slock = nil + if lock.fs != nil { + lock.fs.mu.Lock() + defer lock.fs.mu.Unlock() + if lock.fs.slock == lock { + lock.fs.slock = nil + } } - return } +const logSizeThreshold = 1024 * 1024 // 1 MiB + // fileStorage is a file-system backed storage. type fileStorage struct { - path string - - mu sync.Mutex - flock fileLock - slock *fileStorageLock - logw *os.File - buf []byte + path string + readOnly bool + + mu sync.Mutex + flock fileLock + slock *fileStorageLock + logw *os.File + logSize int64 + buf []byte // Opened file counter; if open < 0 means closed. open int day int } // OpenFile returns a new filesytem-backed storage implementation with the given -// path. This also hold a file lock, so any subsequent attempt to open the same -// path will fail. +// path. This also acquire a file lock, so any subsequent attempt to open the +// same path will fail. // // The storage must be closed after use, by calling Close method. -func OpenFile(path string) (Storage, error) { - if err := os.MkdirAll(path, 0755); err != nil { +func OpenFile(path string, readOnly bool) (Storage, error) { + if fi, err := os.Stat(path); err == nil { + if !fi.IsDir() { + return nil, fmt.Errorf("leveldb/storage: open %s: not a directory", path) + } + } else if os.IsNotExist(err) && !readOnly { + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + } else { return nil, err } - flock, err := newFileLock(filepath.Join(path, "LOCK")) + flock, err := newFileLock(filepath.Join(path, "LOCK"), readOnly) if err != nil { return nil, err } @@ -76,23 +89,42 @@ func OpenFile(path string) (Storage, error) { } }() - rename(filepath.Join(path, "LOG"), filepath.Join(path, "LOG.old")) - logw, err := os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return nil, err + var ( + logw *os.File + logSize int64 + ) + if !readOnly { + logw, err = os.OpenFile(filepath.Join(path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + logSize, err = logw.Seek(0, os.SEEK_END) + if err != nil { + logw.Close() + return nil, err + } } - fs := &fileStorage{path: path, flock: flock, logw: logw} + fs := &fileStorage{ + path: path, + readOnly: readOnly, + flock: flock, + logw: logw, + logSize: logSize, + } runtime.SetFinalizer(fs, (*fileStorage).Close) return fs, nil } -func (fs *fileStorage) Lock() (util.Releaser, error) { +func (fs *fileStorage) Lock() (Lock, error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { return nil, ErrClosed } + if fs.readOnly { + return &fileStorageLock{}, nil + } if fs.slock != nil { return nil, ErrLocked } @@ -101,7 +133,7 @@ func (fs *fileStorage) Lock() (util.Releaser, error) { } func itoa(buf []byte, i int, wid int) []byte { - var u uint = uint(i) + u := uint(i) if u == 0 && wid <= 1 { return append(buf, '0') } @@ -126,6 +158,22 @@ func (fs *fileStorage) printDay(t time.Time) { } func (fs *fileStorage) doLog(t time.Time, str string) { + if fs.logSize > logSizeThreshold { + // Rotate log file. + fs.logw.Close() + fs.logw = nil + fs.logSize = 0 + rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")) + } + if fs.logw == nil { + var err error + fs.logw, err = os.OpenFile(filepath.Join(fs.path, "LOG"), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return + } + // Force printDay on new log file. + fs.day = 0 + } fs.printDay(t) hour, min, sec := t.Clock() msec := t.Nanosecond() / 1e3 @@ -145,65 +193,71 @@ func (fs *fileStorage) doLog(t time.Time, str string) { } func (fs *fileStorage) Log(str string) { - t := time.Now() - fs.mu.Lock() - defer fs.mu.Unlock() - if fs.open < 0 { - return + if !fs.readOnly { + t := time.Now() + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.open < 0 { + return + } + fs.doLog(t, str) } - fs.doLog(t, str) } func (fs *fileStorage) log(str string) { - fs.doLog(time.Now(), str) + if !fs.readOnly { + fs.doLog(time.Now(), str) + } } -func (fs *fileStorage) GetFile(num uint64, t FileType) File { - return &file{fs: fs, num: num, t: t} -} +func (fs *fileStorage) SetMeta(fd FileDesc) (err error) { + if !FileDescOk(fd) { + return ErrInvalidFile + } + if fs.readOnly { + return errReadOnly + } -func (fs *fileStorage) GetFiles(t FileType) (ff []File, err error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { - return nil, ErrClosed + return ErrClosed } - dir, err := os.Open(fs.path) + defer func() { + if err != nil { + fs.log(fmt.Sprintf("CURRENT: %v", err)) + } + }() + path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num) + w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return } - fnn, err := dir.Readdirnames(0) - // Close the dir first before checking for Readdirnames error. - if err := dir.Close(); err != nil { - fs.log(fmt.Sprintf("close dir: %v", err)) + _, err = fmt.Fprintln(w, fsGenName(fd)) + // Close the file first. + if cerr := w.Close(); cerr != nil { + fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, cerr)) } if err != nil { return } - f := &file{fs: fs} - for _, fn := range fnn { - if f.parse(fn) && (f.t&t) != 0 { - ff = append(ff, f) - f = &file{fs: fs} - } - } - return + return rename(path, filepath.Join(fs.path, "CURRENT")) } -func (fs *fileStorage) GetManifest() (f File, err error) { +func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { - return nil, ErrClosed + return FileDesc{}, ErrClosed } dir, err := os.Open(fs.path) if err != nil { return } - fnn, err := dir.Readdirnames(0) + names, err := dir.Readdirnames(0) // Close the dir first before checking for Readdirnames error. - if err := dir.Close(); err != nil { - fs.log(fmt.Sprintf("close dir: %v", err)) + if ce := dir.Close(); ce != nil { + fs.log(fmt.Sprintf("close dir: %v", ce)) } if err != nil { return @@ -212,55 +266,64 @@ func (fs *fileStorage) GetManifest() (f File, err error) { var rem []string var pend bool var cerr error - for _, fn := range fnn { - if strings.HasPrefix(fn, "CURRENT") { - pend1 := len(fn) > 7 + for _, name := range names { + if strings.HasPrefix(name, "CURRENT") { + pend1 := len(name) > 7 + var pendNum int64 // Make sure it is valid name for a CURRENT file, otherwise skip it. if pend1 { - if fn[7] != '.' || len(fn) < 9 { - fs.log(fmt.Sprintf("skipping %s: invalid file name", fn)) + if name[7] != '.' || len(name) < 9 { + fs.log(fmt.Sprintf("skipping %s: invalid file name", name)) continue } - if _, e1 := strconv.ParseUint(fn[8:], 10, 0); e1 != nil { - fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", fn, e1)) + var e1 error + if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil { + fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1)) continue } } - path := filepath.Join(fs.path, fn) + path := filepath.Join(fs.path, name) r, e1 := os.OpenFile(path, os.O_RDONLY, 0) if e1 != nil { - return nil, e1 + return FileDesc{}, e1 } b, e1 := ioutil.ReadAll(r) if e1 != nil { r.Close() - return nil, e1 + return FileDesc{}, e1 } - f1 := &file{fs: fs} - if len(b) < 1 || b[len(b)-1] != '\n' || !f1.parse(string(b[:len(b)-1])) { - fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", fn)) + var fd1 FileDesc + if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) { + fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name)) if pend1 { - rem = append(rem, fn) + rem = append(rem, name) } if !pend1 || cerr == nil { - cerr = fmt.Errorf("leveldb/storage: corrupted or incomplete %s file", fn) + metaFd, _ := fsParseName(name) + cerr = &ErrCorrupted{ + Fd: metaFd, + Err: errors.New("leveldb/storage: corrupted or incomplete meta file"), + } } - } else if f != nil && f1.Num() < f.Num() { - fs.log(fmt.Sprintf("skipping %s: obsolete", fn)) + } else if pend1 && pendNum != fd1.Num { + fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num)) + rem = append(rem, name) + } else if fd1.Num < fd.Num { + fs.log(fmt.Sprintf("skipping %s: obsolete", name)) if pend1 { - rem = append(rem, fn) + rem = append(rem, name) } } else { - f = f1 + fd = fd1 pend = pend1 } if err := r.Close(); err != nil { - fs.log(fmt.Sprintf("close %s: %v", fn, err)) + fs.log(fmt.Sprintf("close %s: %v", name, err)) } } } // Don't remove any files if there is no valid CURRENT file. - if f == nil { + if fd.Nil() { if cerr != nil { err = cerr } else { @@ -268,267 +331,253 @@ func (fs *fileStorage) GetManifest() (f File, err error) { } return } - // Rename pending CURRENT file to an effective CURRENT. - if pend { - path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), f.Num()) - if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil { - fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", f.Num(), err)) + if !fs.readOnly { + // Rename pending CURRENT file to an effective CURRENT. + if pend { + path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num) + if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil { + fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err)) + } } - } - // Remove obsolete or incomplete pending CURRENT files. - for _, fn := range rem { - path := filepath.Join(fs.path, fn) - if err := os.Remove(path); err != nil { - fs.log(fmt.Sprintf("remove %s: %v", fn, err)) + // Remove obsolete or incomplete pending CURRENT files. + for _, name := range rem { + path := filepath.Join(fs.path, name) + if err := os.Remove(path); err != nil { + fs.log(fmt.Sprintf("remove %s: %v", name, err)) + } } } return } -func (fs *fileStorage) SetManifest(f File) (err error) { +func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) { fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { - return ErrClosed - } - f2, ok := f.(*file) - if !ok || f2.t != TypeManifest { - return ErrInvalidFile + return nil, ErrClosed } - defer func() { - if err != nil { - fs.log(fmt.Sprintf("CURRENT: %v", err)) - } - }() - path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), f2.Num()) - w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + dir, err := os.Open(fs.path) if err != nil { - return err + return } - _, err = fmt.Fprintln(w, f2.name()) - // Close the file first. - if err := w.Close(); err != nil { - fs.log(fmt.Sprintf("close CURRENT.%d: %v", f2.num, err)) + names, err := dir.Readdirnames(0) + // Close the dir first before checking for Readdirnames error. + if cerr := dir.Close(); cerr != nil { + fs.log(fmt.Sprintf("close dir: %v", cerr)) } - if err != nil { - return err + if err == nil { + for _, name := range names { + if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 { + fds = append(fds, fd) + } + } } - return rename(path, filepath.Join(fs.path, "CURRENT")) + return } -func (fs *fileStorage) Close() error { +func (fs *fileStorage) Open(fd FileDesc) (Reader, error) { + if !FileDescOk(fd) { + return nil, ErrInvalidFile + } + fs.mu.Lock() defer fs.mu.Unlock() if fs.open < 0 { - return ErrClosed - } - // Clear the finalizer. - runtime.SetFinalizer(fs, nil) - - if fs.open > 0 { - fs.log(fmt.Sprintf("refuse to close, %d files still open", fs.open)) - return fmt.Errorf("leveldb/storage: cannot close, %d files still open", fs.open) + return nil, ErrClosed } - fs.open = -1 - e1 := fs.logw.Close() - err := fs.flock.release() - if err == nil { - err = e1 + of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0) + if err != nil { + if fsHasOldName(fd) && os.IsNotExist(err) { + of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0) + if err == nil { + goto ok + } + } + return nil, err } - return err -} - -type fileWrap struct { - *os.File - f *file +ok: + fs.open++ + return &fileWrap{File: of, fs: fs, fd: fd}, nil } -func (fw fileWrap) Sync() error { - if err := fw.File.Sync(); err != nil { - return err +func (fs *fileStorage) Create(fd FileDesc) (Writer, error) { + if !FileDescOk(fd) { + return nil, ErrInvalidFile } - if fw.f.Type() == TypeManifest { - // Also sync parent directory if file type is manifest. - // See: https://code.google.com/p/leveldb/issues/detail?id=190. - if err := syncDir(fw.f.fs.path); err != nil { - return err - } + if fs.readOnly { + return nil, errReadOnly } - return nil -} -func (fw fileWrap) Close() error { - f := fw.f - f.fs.mu.Lock() - defer f.fs.mu.Unlock() - if !f.open { - return ErrClosed + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.open < 0 { + return nil, ErrClosed } - f.open = false - f.fs.open-- - err := fw.File.Close() + of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { - f.fs.log(fmt.Sprintf("close %s.%d: %v", f.Type(), f.Num(), err)) + return nil, err } - return err + fs.open++ + return &fileWrap{File: of, fs: fs, fd: fd}, nil } -type file struct { - fs *fileStorage - num uint64 - t FileType - open bool -} - -func (f *file) Open() (Reader, error) { - f.fs.mu.Lock() - defer f.fs.mu.Unlock() - if f.fs.open < 0 { - return nil, ErrClosed +func (fs *fileStorage) Remove(fd FileDesc) error { + if !FileDescOk(fd) { + return ErrInvalidFile } - if f.open { - return nil, errFileOpen + if fs.readOnly { + return errReadOnly } - of, err := os.OpenFile(f.path(), os.O_RDONLY, 0) + + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.open < 0 { + return ErrClosed + } + err := os.Remove(filepath.Join(fs.path, fsGenName(fd))) if err != nil { - if f.hasOldName() && os.IsNotExist(err) { - of, err = os.OpenFile(f.oldPath(), os.O_RDONLY, 0) - if err == nil { - goto ok + if fsHasOldName(fd) && os.IsNotExist(err) { + if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) { + fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err)) + err = e1 } + } else { + fs.log(fmt.Sprintf("remove %s: %v", fd, err)) } - return nil, err } -ok: - f.open = true - f.fs.open++ - return fileWrap{of, f}, nil + return err } -func (f *file) Create() (Writer, error) { - f.fs.mu.Lock() - defer f.fs.mu.Unlock() - if f.fs.open < 0 { - return nil, ErrClosed +func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error { + if !FileDescOk(oldfd) || !FileDescOk(newfd) { + return ErrInvalidFile } - if f.open { - return nil, errFileOpen + if oldfd == newfd { + return nil } - of, err := os.OpenFile(f.path(), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil, err + if fs.readOnly { + return errReadOnly } - f.open = true - f.fs.open++ - return fileWrap{of, f}, nil + + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.open < 0 { + return ErrClosed + } + return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd))) } -func (f *file) Replace(newfile File) error { - f.fs.mu.Lock() - defer f.fs.mu.Unlock() - if f.fs.open < 0 { +func (fs *fileStorage) Close() error { + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.open < 0 { return ErrClosed } - newfile2, ok := newfile.(*file) - if !ok { - return ErrInvalidFile + // Clear the finalizer. + runtime.SetFinalizer(fs, nil) + + if fs.open > 0 { + fs.log(fmt.Sprintf("close: warning, %d files still open", fs.open)) } - if f.open || newfile2.open { - return errFileOpen + fs.open = -1 + if fs.logw != nil { + fs.logw.Close() } - return rename(newfile2.path(), f.path()) + return fs.flock.release() } -func (f *file) Type() FileType { - return f.t +type fileWrap struct { + *os.File + fs *fileStorage + fd FileDesc + closed bool } -func (f *file) Num() uint64 { - return f.num +func (fw *fileWrap) Sync() error { + if err := fw.File.Sync(); err != nil { + return err + } + if fw.fd.Type == TypeManifest { + // Also sync parent directory if file type is manifest. + // See: https://code.google.com/p/leveldb/issues/detail?id=190. + if err := syncDir(fw.fs.path); err != nil { + fw.fs.log(fmt.Sprintf("syncDir: %v", err)) + return err + } + } + return nil } -func (f *file) Remove() error { - f.fs.mu.Lock() - defer f.fs.mu.Unlock() - if f.fs.open < 0 { +func (fw *fileWrap) Close() error { + fw.fs.mu.Lock() + defer fw.fs.mu.Unlock() + if fw.closed { return ErrClosed } - if f.open { - return errFileOpen - } - err := os.Remove(f.path()) + fw.closed = true + fw.fs.open-- + err := fw.File.Close() if err != nil { - f.fs.log(fmt.Sprintf("remove %s.%d: %v", f.Type(), f.Num(), err)) - } - // Also try remove file with old name, just in case. - if f.hasOldName() { - if e1 := os.Remove(f.oldPath()); !os.IsNotExist(e1) { - f.fs.log(fmt.Sprintf("remove %s.%d: %v (old name)", f.Type(), f.Num(), err)) - err = e1 - } + fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err)) } return err } -func (f *file) hasOldName() bool { - return f.t == TypeTable -} - -func (f *file) oldName() string { - switch f.t { - case TypeTable: - return fmt.Sprintf("%06d.sst", f.num) - } - return f.name() -} - -func (f *file) oldPath() string { - return filepath.Join(f.fs.path, f.oldName()) -} - -func (f *file) name() string { - switch f.t { +func fsGenName(fd FileDesc) string { + switch fd.Type { case TypeManifest: - return fmt.Sprintf("MANIFEST-%06d", f.num) + return fmt.Sprintf("MANIFEST-%06d", fd.Num) case TypeJournal: - return fmt.Sprintf("%06d.log", f.num) + return fmt.Sprintf("%06d.log", fd.Num) case TypeTable: - return fmt.Sprintf("%06d.ldb", f.num) + return fmt.Sprintf("%06d.ldb", fd.Num) case TypeTemp: - return fmt.Sprintf("%06d.tmp", f.num) + return fmt.Sprintf("%06d.tmp", fd.Num) default: panic("invalid file type") } } -func (f *file) path() string { - return filepath.Join(f.fs.path, f.name()) +func fsHasOldName(fd FileDesc) bool { + return fd.Type == TypeTable } -func (f *file) parse(name string) bool { - var num uint64 +func fsGenOldName(fd FileDesc) string { + switch fd.Type { + case TypeTable: + return fmt.Sprintf("%06d.sst", fd.Num) + } + return fsGenName(fd) +} + +func fsParseName(name string) (fd FileDesc, ok bool) { var tail string - _, err := fmt.Sscanf(name, "%d.%s", &num, &tail) + _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail) if err == nil { switch tail { case "log": - f.t = TypeJournal + fd.Type = TypeJournal case "ldb", "sst": - f.t = TypeTable + fd.Type = TypeTable case "tmp": - f.t = TypeTemp + fd.Type = TypeTemp default: - return false + return } - f.num = num - return true + return fd, true } - n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &num, &tail) + n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail) if n == 1 { - f.t = TypeManifest - f.num = num - return true + fd.Type = TypeManifest + return fd, true } + return +} - return false +func fsParseNamePtr(name string, fd *FileDesc) bool { + _fd, ok := fsParseName(name) + if fd != nil { + *fd = _fd + } + return ok } |