diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go | 755 |
1 files changed, 755 insertions, 0 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go new file mode 100644 index 000000000..8e975dc63 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -0,0 +1,755 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + "errors" + "fmt" + "io" + "os" + "runtime" + "strings" + "sync" + "time" + + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/journal" + "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/table" + "github.com/syndtr/goleveldb/leveldb/util" +) + +// DB is a LevelDB database. +type DB struct { + // Need 64-bit alignment. + seq uint64 + + s *session + + // MemDB + memMu sync.RWMutex + mem *memdb.DB + frozenMem *memdb.DB + journal *journal.Writer + journalWriter storage.Writer + journalFile storage.File + frozenJournalFile storage.File + frozenSeq uint64 + + // Snapshot + snapsMu sync.Mutex + snapsRoot snapshotElement + + // Write + writeC chan *Batch + writeMergedC chan bool + writeLockC chan struct{} + writeAckC chan error + journalC chan *Batch + journalAckC chan error + + // Compaction + tcompCmdC chan cCmd + tcompPauseC chan chan<- struct{} + tcompTriggerC chan struct{} + mcompCmdC chan cCmd + mcompTriggerC chan struct{} + compErrC chan error + compErrSetC chan error + compStats [kNumLevels]cStats + + // Close + closeW sync.WaitGroup + closeC chan struct{} + closed uint32 + closer io.Closer +} + +func openDB(s *session) (*DB, error) { + s.log("db@open opening") + start := time.Now() + db := &DB{ + s: s, + // Initial sequence + seq: s.stSeq, + // Write + writeC: make(chan *Batch), + writeMergedC: make(chan bool), + writeLockC: make(chan struct{}, 1), + writeAckC: make(chan error), + journalC: make(chan *Batch), + journalAckC: make(chan error), + // Compaction + tcompCmdC: make(chan cCmd), + tcompPauseC: make(chan chan<- struct{}), + tcompTriggerC: make(chan struct{}, 1), + mcompCmdC: make(chan cCmd), + mcompTriggerC: make(chan struct{}, 1), + compErrC: make(chan error), + compErrSetC: make(chan error), + // Close + closeC: make(chan struct{}), + } + db.initSnapshot() + + if err := db.recoverJournal(); err != nil { + return nil, err + } + + // Remove any obsolete files. + if err := db.checkAndCleanFiles(); err != nil { + // Close journal. + if db.journal != nil { + db.journal.Close() + db.journalWriter.Close() + } + return nil, err + } + + // Don't include compaction error goroutine into wait group. + go db.compactionError() + + db.closeW.Add(3) + go db.tCompaction() + go db.mCompaction() + go db.jWriter() + + s.logf("db@open done T·%v", time.Since(start)) + + runtime.SetFinalizer(db, (*DB).Close) + return db, nil +} + +// Open opens or creates a DB for the given storage. +// The DB will be created if not exist, unless ErrorIfMissing is true. +// Also, if ErrorIfExist is true and the DB exist Open will returns +// os.ErrExist error. +// +// Open will return an error with type of ErrCorrupted if corruption +// detected in the DB. Corrupted DB can be recovered with Recover +// function. +// +// The DB must be closed after use, by calling Close method. +func Open(p storage.Storage, o *opt.Options) (db *DB, err error) { + s, err := newSession(p, o) + if err != nil { + return + } + defer func() { + if err != nil { + s.close() + s.release() + } + }() + + err = s.recover() + if err != nil { + if !os.IsNotExist(err) || s.o.GetErrorIfMissing() { + return + } + err = s.create() + if err != nil { + return + } + } else if s.o.GetErrorIfExist() { + err = os.ErrExist + return + } + + return openDB(s) +} + +// OpenFile opens or creates a DB for the given path. +// The DB will be created if not exist, unless ErrorIfMissing is true. +// Also, if ErrorIfExist is true and the DB exist OpenFile will returns +// os.ErrExist error. +// +// OpenFile uses standard file-system backed storage implementation as +// desribed in the leveldb/storage package. +// +// OpenFile will return an error with type of ErrCorrupted if corruption +// detected in the DB. Corrupted DB can be recovered with Recover +// function. +// +// The DB must be closed after use, by calling Close method. +func OpenFile(path string, o *opt.Options) (db *DB, err error) { + stor, err := storage.OpenFile(path) + if err != nil { + return + } + db, err = Open(stor, o) + if err != nil { + stor.Close() + } else { + db.closer = stor + } + return +} + +// Recover recovers and opens a DB with missing or corrupted manifest files +// for the given storage. It will ignore any manifest files, valid or not. +// The DB must already exist or it will returns an error. +// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. +// +// The DB must be closed after use, by calling Close method. +func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { + s, err := newSession(p, o) + if err != nil { + return + } + defer func() { + if err != nil { + s.close() + s.release() + } + }() + + err = recoverTable(s, o) + if err != nil { + return + } + return openDB(s) +} + +// RecoverFile recovers and opens a DB with missing or corrupted manifest files +// for the given path. It will ignore any manifest files, valid or not. +// The DB must already exist or it will returns an error. +// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. +// +// RecoverFile uses standard file-system backed storage implementation as desribed +// in the leveldb/storage package. +// +// The DB must be closed after use, by calling Close method. +func RecoverFile(path string, o *opt.Options) (db *DB, err error) { + stor, err := storage.OpenFile(path) + if err != nil { + return + } + db, err = Recover(stor, o) + if err != nil { + stor.Close() + } else { + db.closer = stor + } + return +} + +func recoverTable(s *session, o *opt.Options) error { + ff0, err := s.getFiles(storage.TypeTable) + if err != nil { + return err + } + ff1 := files(ff0) + ff1.sort() + + var mSeq uint64 + var good, corrupted int + rec := new(sessionRecord) + buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) { + tmp = s.newTemp() + writer, err := tmp.Create() + if err != nil { + return + } + defer func() { + writer.Close() + if err != nil { + tmp.Remove() + tmp = nil + } + }() + tw := table.NewWriter(writer, o) + // Copy records. + for iter.Next() { + key := iter.Key() + if validIkey(key) { + err = tw.Append(key, iter.Value()) + if err != nil { + return + } + } + } + err = iter.Error() + if err != nil { + return + } + err = tw.Close() + if err != nil { + return + } + err = writer.Sync() + if err != nil { + return + } + size = int64(tw.BytesLen()) + return + } + recoverTable := func(file storage.File) error { + s.logf("table@recovery recovering @%d", file.Num()) + reader, err := file.Open() + if err != nil { + return err + } + defer reader.Close() + // Get file size. + size, err := reader.Seek(0, 2) + if err != nil { + return err + } + var tSeq uint64 + var tgood, tcorrupted, blockerr int + var min, max []byte + tr := table.NewReader(reader, size, nil, o) + iter := tr.NewIterator(nil, nil) + iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { + s.logf("table@recovery found error @%d %q", file.Num(), err) + blockerr++ + }) + // Scan the table. + for iter.Next() { + key := iter.Key() + _, seq, _, ok := parseIkey(key) + if !ok { + tcorrupted++ + continue + } + tgood++ + if seq > tSeq { + tSeq = seq + } + if min == nil { + min = append([]byte{}, key...) + } + max = append(max[:0], key...) + } + if err := iter.Error(); err != nil { + iter.Release() + return err + } + iter.Release() + if tgood > 0 { + if tcorrupted > 0 || blockerr > 0 { + // Rebuild the table. + s.logf("table@recovery rebuilding @%d", file.Num()) + iter := tr.NewIterator(nil, nil) + tmp, newSize, err := buildTable(iter) + iter.Release() + if err != nil { + return err + } + reader.Close() + if err := file.Replace(tmp); err != nil { + return err + } + size = newSize + } + if tSeq > mSeq { + mSeq = tSeq + } + // Add table to level 0. + rec.addTable(0, file.Num(), uint64(size), min, max) + s.logf("table@recovery recovered @%d N·%d C·%d B·%d S·%d Q·%d", file.Num(), tgood, tcorrupted, blockerr, size, tSeq) + } else { + s.logf("table@recovery unrecoverable @%d C·%d B·%d S·%d", file.Num(), tcorrupted, blockerr, size) + } + + good += tgood + corrupted += tcorrupted + + return nil + } + // Recover all tables. + if len(ff1) > 0 { + s.logf("table@recovery F·%d", len(ff1)) + s.markFileNum(ff1[len(ff1)-1].Num()) + for _, file := range ff1 { + if err := recoverTable(file); err != nil { + return err + } + } + s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(ff1), good, corrupted, mSeq) + } + // Set sequence number. + rec.setSeq(mSeq + 1) + // Create new manifest. + if err := s.create(); err != nil { + return err + } + // Commit. + return s.commit(rec) +} + +func (d *DB) recoverJournal() error { + s := d.s + + ff0, err := s.getFiles(storage.TypeJournal) + if err != nil { + return err + } + ff1 := files(ff0) + ff1.sort() + ff2 := make([]storage.File, 0, len(ff1)) + for _, file := range ff1 { + if file.Num() >= s.stJournalNum || file.Num() == s.stPrevJournalNum { + s.markFileNum(file.Num()) + ff2 = append(ff2, file) + } + } + + var jr *journal.Reader + var of storage.File + var mem *memdb.DB + batch := new(Batch) + cm := newCMem(s) + buf := new(util.Buffer) + // Options. + strict := s.o.GetStrict(opt.StrictJournal) + checksum := s.o.GetStrict(opt.StrictJournalChecksum) + writeBuffer := s.o.GetWriteBuffer() + recoverJournal := func(file storage.File) error { + s.logf("journal@recovery recovering @%d", file.Num()) + reader, err := file.Open() + if err != nil { + return err + } + defer reader.Close() + if jr == nil { + jr = journal.NewReader(reader, dropper{s, file}, strict, checksum) + } else { + jr.Reset(reader, dropper{s, file}, strict, checksum) + } + if of != nil { + if mem.Len() > 0 { + if err := cm.flush(mem, 0); err != nil { + return err + } + } + if err := cm.commit(file.Num(), d.seq); err != nil { + return err + } + cm.reset() + of.Remove() + of = nil + } + // Reset memdb. + mem.Reset() + for { + r, err := jr.Next() + if err != nil { + if err == io.EOF { + break + } + return err + } + buf.Reset() + if _, err := buf.ReadFrom(r); err != nil { + if strict { + return err + } + continue + } + if err := batch.decode(buf.Bytes()); err != nil { + return err + } + if err := batch.memReplay(mem); err != nil { + return err + } + d.seq = batch.seq + uint64(batch.len()) + if mem.Size() >= writeBuffer { + // Large enough, flush it. + if err := cm.flush(mem, 0); err != nil { + return err + } + // Reset memdb. + mem.Reset() + } + } + of = file + return nil + } + // Recover all journals. + if len(ff2) > 0 { + s.logf("journal@recovery F·%d", len(ff2)) + mem = memdb.New(s.icmp, writeBuffer) + for _, file := range ff2 { + if err := recoverJournal(file); err != nil { + return err + } + } + // Flush the last journal. + if mem.Len() > 0 { + if err := cm.flush(mem, 0); err != nil { + return err + } + } + } + // Create a new journal. + if _, err := d.newMem(0); err != nil { + return err + } + // Commit. + if err := cm.commit(d.journalFile.Num(), d.seq); err != nil { + // Close journal. + if d.journal != nil { + d.journal.Close() + d.journalWriter.Close() + } + return err + } + // Remove the last journal. + if of != nil { + of.Remove() + } + return nil +} + +func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { + s := d.s + + ikey := newIKey(key, seq, tSeek) + + em, fm := d.getMems() + for _, m := range [...]*memdb.DB{em, fm} { + if m == nil { + continue + } + mk, mv, me := m.Find(ikey) + if me == nil { + ukey, _, t, ok := parseIkey(mk) + if ok && s.icmp.uCompare(ukey, key) == 0 { + if t == tDel { + return nil, ErrNotFound + } + return mv, nil + } + } else if me != ErrNotFound { + return nil, me + } + } + + v := s.version() + value, cSched, err := v.get(ikey, ro) + v.release() + if cSched { + // Trigger table compaction. + d.compTrigger(d.tcompTriggerC) + } + return +} + +// Get gets the value for the given key. It returns ErrNotFound if the +// DB does not contain the key. +// +// The caller should not modify the contents of the returned slice, but +// it is safe to modify the contents of the argument after Get returns. +func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + err = d.ok() + if err != nil { + return + } + + return d.get(key, d.getSeq(), ro) +} + +// NewIterator returns an iterator for the latest snapshot of the +// uderlying DB. +// The returned iterator is not goroutine-safe, but it is safe to use +// multiple iterators concurrently, with each in a dedicated goroutine. +// It is also safe to use an iterator concurrently with modifying its +// underlying DB. The resultant key/value pairs are guaranteed to be +// consistent. +// +// Slice allows slicing the iterator to only contains keys in the given +// range. A nil Range.Start is treated as a key before all keys in the +// DB. And a nil Range.Limit is treated as a key after all keys in +// the DB. +// +// The iterator must be released after use, by calling Release method. +// +// Also read Iterator documentation of the leveldb/iterator package. +func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + if err := d.ok(); err != nil { + return iterator.NewEmptyIterator(err) + } + + p := d.newSnapshot() + defer p.Release() + return p.NewIterator(slice, ro) +} + +// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot +// is a frozen snapshot of a DB state at a particular point in time. The +// content of snapshot are guaranteed to be consistent. +// +// The snapshot must be released after use, by calling Release method. +func (d *DB) GetSnapshot() (*Snapshot, error) { + if err := d.ok(); err != nil { + return nil, err + } + + return d.newSnapshot(), nil +} + +// GetProperty returns value of the given property name. +// +// Property names: +// leveldb.num-files-at-level{n} +// Returns the number of filer at level 'n'. +// leveldb.stats +// Returns statistics of the underlying DB. +// leveldb.sstables +// Returns sstables list for each level. +func (d *DB) GetProperty(name string) (value string, err error) { + err = d.ok() + if err != nil { + return + } + + const prefix = "leveldb." + if !strings.HasPrefix(name, prefix) { + return "", errors.New("leveldb: GetProperty: unknown property: " + name) + } + + p := name[len(prefix):] + + s := d.s + v := s.version() + defer v.release() + + switch { + case strings.HasPrefix(p, "num-files-at-level"): + var level uint + var rest string + n, _ := fmt.Scanf("%d%s", &level, &rest) + if n != 1 || level >= kNumLevels { + err = errors.New("leveldb: GetProperty: invalid property: " + name) + } else { + value = fmt.Sprint(v.tLen(int(level))) + } + case p == "stats": + value = "Compactions\n" + + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + + "-------+------------+---------------+---------------+---------------+---------------\n" + for level, tt := range v.tables { + duration, read, write := d.compStats[level].get() + if len(tt) == 0 && duration == 0 { + continue + } + value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", + level, len(tt), float64(tt.size())/1048576.0, duration.Seconds(), + float64(read)/1048576.0, float64(write)/1048576.0) + } + case p == "sstables": + for level, tt := range v.tables { + value += fmt.Sprintf("--- level %d ---\n", level) + for _, t := range tt { + value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.min, t.max) + } + } + default: + err = errors.New("leveldb: GetProperty: unknown property: " + name) + } + + return +} + +// SizeOf calculates approximate sizes of the given key ranges. +// The length of the returned sizes are equal with the length of the given +// ranges. The returned sizes measure storage space usage, so if the user +// data compresses by a factor of ten, the returned sizes will be one-tenth +// the size of the corresponding user data size. +// The results may not include the sizes of recently written data. +func (d *DB) SizeOf(ranges []util.Range) (Sizes, error) { + if err := d.ok(); err != nil { + return nil, err + } + + v := d.s.version() + defer v.release() + + sizes := make(Sizes, 0, len(ranges)) + for _, r := range ranges { + min := newIKey(r.Start, kMaxSeq, tSeek) + max := newIKey(r.Limit, kMaxSeq, tSeek) + start, err := v.offsetOf(min) + if err != nil { + return nil, err + } + limit, err := v.offsetOf(max) + if err != nil { + return nil, err + } + var size uint64 + if limit >= start { + size = limit - start + } + sizes = append(sizes, size) + } + + return sizes, nil +} + +// Close closes the DB. This will also releases any outstanding snapshot. +// +// It is not safe to close a DB until all outstanding iterators are released. +// It is valid to call Close multiple times. Other methods should not be +// called after the DB has been closed. +func (d *DB) Close() error { + if !d.setClosed() { + return ErrClosed + } + + s := d.s + start := time.Now() + s.log("db@close closing") + + // Clear the finalizer. + runtime.SetFinalizer(d, nil) + + // Get compaction error. + var err error + select { + case err = <-d.compErrC: + default: + } + + close(d.closeC) + + // Wait for the close WaitGroup. + d.closeW.Wait() + + // Close journal. + if d.journal != nil { + d.journal.Close() + d.journalWriter.Close() + } + + // Close session. + s.close() + s.logf("db@close done T·%v", time.Since(start)) + s.release() + + if d.closer != nil { + if err1 := d.closer.Close(); err == nil { + err = err1 + } + } + + d.s = nil + d.mem = nil + d.frozenMem = nil + d.journal = nil + d.journalWriter = nil + d.journalFile = nil + d.frozenJournalFile = nil + d.snapsRoot = snapshotElement{} + d.closer = nil + + return err +} |