diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go | 312 |
1 files changed, 32 insertions, 280 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go index b3906f7fc..a8d7b54dc 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go @@ -11,14 +11,11 @@ import ( "io" "os" "sync" - "sync/atomic" "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" - "github.com/syndtr/goleveldb/leveldb/util" ) type ErrManifestCorrupted struct { @@ -30,28 +27,28 @@ func (e *ErrManifestCorrupted) Error() string { return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason) } -func newErrManifestCorrupted(f storage.File, field, reason string) error { - return errors.NewErrCorrupted(f, &ErrManifestCorrupted{field, reason}) +func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error { + return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason}) } // session represent a persistent database session. type session struct { // Need 64-bit alignment. - stNextFileNum uint64 // current unused file number - stJournalNum uint64 // current journal file number; need external synchronization - stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb + stNextFileNum int64 // current unused file number + stJournalNum int64 // current journal file number; need external synchronization + stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb + stTempFileNum int64 stSeqNum uint64 // last mem compacted seq; need external synchronization - stTempFileNum uint64 stor storage.Storage - storLock util.Releaser + storLock storage.Lock o *cachedOptions icmp *iComparer tops *tOps manifest *journal.Writer manifestWriter storage.Writer - manifestFile storage.File + manifestFd storage.FileDesc stCompPtrs []iKey // compaction pointers; need external synchronization stVersion *version // current version @@ -68,9 +65,8 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: stor, - storLock: storLock, - stCompPtrs: make([]iKey, o.GetNumLevel()), + stor: stor, + storLock: storLock, } s.setOptions(o) s.tops = newTableOps(s) @@ -90,7 +86,6 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.manifestFile = nil s.stVersion = nil } @@ -111,27 +106,31 @@ func (s *session) recover() (err error) { if os.IsNotExist(err) { // Don't return os.ErrNotExist if the underlying storage contains // other files that belong to LevelDB. So the DB won't get trashed. - if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 { - err = &errors.ErrCorrupted{File: &storage.FileInfo{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}} + if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 { + err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}} } } }() - m, err := s.stor.GetManifest() + fd, err := s.stor.GetMeta() if err != nil { return } - reader, err := m.Open() + reader, err := s.stor.Open(fd) if err != nil { return } defer reader.Close() - strict := s.o.GetStrict(opt.StrictManifest) - jr := journal.NewReader(reader, dropper{s, m}, strict, true) - staging := s.stVersion.newStaging() - rec := &sessionRecord{numLevel: s.o.GetNumLevel()} + var ( + // Options. + strict = s.o.GetStrict(opt.StrictManifest) + + jr = journal.NewReader(reader, dropper{s, fd}, strict, true) + rec = &sessionRecord{} + staging = s.stVersion.newStaging() + ) for { var r io.Reader r, err = jr.Next() @@ -140,23 +139,23 @@ func (s *session) recover() (err error) { err = nil break } - return errors.SetFile(err, m) + return errors.SetFd(err, fd) } err = rec.decode(r) if err == nil { // save compact pointers for _, r := range rec.compPtrs { - s.stCompPtrs[r.level] = iKey(r.ikey) + s.setCompPtr(r.level, iKey(r.ikey)) } // commit record to version staging staging.commit(rec) } else { - err = errors.SetFile(err, m) + err = errors.SetFd(err, fd) if strict || !errors.IsCorrupted(err) { return } else { - s.logf("manifest error: %v (skipped)", errors.SetFile(err, m)) + s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd)) } } rec.resetCompPtrs() @@ -166,18 +165,18 @@ func (s *session) recover() (err error) { switch { case !rec.has(recComparer): - return newErrManifestCorrupted(m, "comparer", "missing") + return newErrManifestCorrupted(fd, "comparer", "missing") case rec.comparer != s.icmp.uName(): - return newErrManifestCorrupted(m, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) + return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer)) case !rec.has(recNextFileNum): - return newErrManifestCorrupted(m, "next-file-num", "missing") + return newErrManifestCorrupted(fd, "next-file-num", "missing") case !rec.has(recJournalNum): - return newErrManifestCorrupted(m, "journal-file-num", "missing") + return newErrManifestCorrupted(fd, "journal-file-num", "missing") case !rec.has(recSeqNum): - return newErrManifestCorrupted(m, "seq-num", "missing") + return newErrManifestCorrupted(fd, "seq-num", "missing") } - s.manifestFile = m + s.manifestFd = fd s.setVersion(staging.finish()) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) @@ -206,250 +205,3 @@ func (s *session) commit(r *sessionRecord) (err error) { return } - -// Pick a compaction based on current state; need external synchronization. -func (s *session) pickCompaction() *compaction { - v := s.version() - - var level int - var t0 tFiles - if v.cScore >= 1 { - level = v.cLevel - cptr := s.stCompPtrs[level] - tables := v.tables[level] - for _, t := range tables { - if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { - t0 = append(t0, t) - break - } - } - if len(t0) == 0 { - t0 = append(t0, tables[0]) - } - } else { - if p := atomic.LoadPointer(&v.cSeek); p != nil { - ts := (*tSet)(p) - level = ts.level - t0 = append(t0, ts.table) - } else { - v.release() - return nil - } - } - - return newCompaction(s, v, level, t0) -} - -// Create compaction from given level and range; need external synchronization. -func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction { - v := s.version() - - t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0) - if len(t0) == 0 { - v.release() - return nil - } - - // Avoid compacting too much in one shot in case the range is large. - // But we cannot do this for level-0 since level-0 files can overlap - // and we must not pick one file and drop another older file if the - // two files overlap. - if level > 0 { - limit := uint64(v.s.o.GetCompactionSourceLimit(level)) - total := uint64(0) - for i, t := range t0 { - total += t.size - if total >= limit { - s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1) - t0 = t0[:i+1] - break - } - } - } - - return newCompaction(s, v, level, t0) -} - -func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction { - c := &compaction{ - s: s, - v: v, - level: level, - tables: [2]tFiles{t0, nil}, - maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)), - tPtrs: make([]int, s.o.GetNumLevel()), - } - c.expand() - c.save() - return c -} - -// compaction represent a compaction state. -type compaction struct { - s *session - v *version - - level int - tables [2]tFiles - maxGPOverlaps uint64 - - gp tFiles - gpi int - seenKey bool - gpOverlappedBytes uint64 - imin, imax iKey - tPtrs []int - released bool - - snapGPI int - snapSeenKey bool - snapGPOverlappedBytes uint64 - snapTPtrs []int -} - -func (c *compaction) save() { - c.snapGPI = c.gpi - c.snapSeenKey = c.seenKey - c.snapGPOverlappedBytes = c.gpOverlappedBytes - c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...) -} - -func (c *compaction) restore() { - c.gpi = c.snapGPI - c.seenKey = c.snapSeenKey - c.gpOverlappedBytes = c.snapGPOverlappedBytes - c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...) -} - -func (c *compaction) release() { - if !c.released { - c.released = true - c.v.release() - } -} - -// Expand compacted tables; need external synchronization. -func (c *compaction) expand() { - limit := uint64(c.s.o.GetCompactionExpandLimit(c.level)) - vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1] - - t0, t1 := c.tables[0], c.tables[1] - imin, imax := t0.getRange(c.s.icmp) - // We expand t0 here just incase ukey hop across tables. - t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0) - if len(t0) != len(c.tables[0]) { - imin, imax = t0.getRange(c.s.icmp) - } - t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) - // Get entire range covered by compaction. - amin, amax := append(t0, t1...).getRange(c.s.icmp) - - // See if we can grow the number of inputs in "level" without - // changing the number of "level+1" files we pick up. - if len(t1) > 0 { - exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0) - if len(exp0) > len(t0) && t1.size()+exp0.size() < limit { - xmin, xmax := exp0.getRange(c.s.icmp) - exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) - if len(exp1) == len(t1) { - c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", - c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), - len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) - imin, imax = xmin, xmax - t0, t1 = exp0, exp1 - amin, amax = append(t0, t1...).getRange(c.s.icmp) - } - } - } - - // Compute the set of grandparent files that overlap this compaction - // (parent == level+1; grandparent == level+2) - if c.level+2 < c.s.o.GetNumLevel() { - c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false) - } - - c.tables[0], c.tables[1] = t0, t1 - c.imin, c.imax = imin, imax -} - -// Check whether compaction is trivial. -func (c *compaction) trivial() bool { - return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps -} - -func (c *compaction) baseLevelForKey(ukey []byte) bool { - for level, tables := range c.v.tables[c.level+2:] { - for c.tPtrs[level] < len(tables) { - t := tables[c.tPtrs[level]] - if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 { - // We've advanced far enough. - if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { - // Key falls in this file's range, so definitely not base level. - return false - } - break - } - c.tPtrs[level]++ - } - } - return true -} - -func (c *compaction) shouldStopBefore(ikey iKey) bool { - for ; c.gpi < len(c.gp); c.gpi++ { - gp := c.gp[c.gpi] - if c.s.icmp.Compare(ikey, gp.imax) <= 0 { - break - } - if c.seenKey { - c.gpOverlappedBytes += gp.size - } - } - c.seenKey = true - - if c.gpOverlappedBytes > c.maxGPOverlaps { - // Too much overlap for current output; start new output. - c.gpOverlappedBytes = 0 - return true - } - return false -} - -// Creates an iterator. -func (c *compaction) newIterator() iterator.Iterator { - // Creates iterator slice. - icap := len(c.tables) - if c.level == 0 { - // Special case for level-0 - icap = len(c.tables[0]) + 1 - } - its := make([]iterator.Iterator, 0, icap) - - // Options. - ro := &opt.ReadOptions{ - DontFillCache: true, - Strict: opt.StrictOverride, - } - strict := c.s.o.GetStrict(opt.StrictCompaction) - if strict { - ro.Strict |= opt.StrictReader - } - - for i, tables := range c.tables { - if len(tables) == 0 { - continue - } - - // Level-0 is not sorted and may overlaps each other. - if c.level+i == 0 { - for _, t := range tables { - its = append(its, c.s.tops.newIterator(t, nil, ro)) - } - } else { - it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict) - its = append(its, it) - } - } - - return iterator.NewMergedIterator(its, c.s.icmp, strict) -} |