diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go | 113 |
1 files changed, 58 insertions, 55 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index 007c02cde..e4fa98d92 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -17,15 +17,15 @@ import ( // Logging. type dropper struct { - s *session - file storage.File + s *session + fd storage.FileDesc } func (d dropper) Drop(err error) { if e, ok := err.(*journal.ErrCorrupted); ok { - d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason) + d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) } else { - d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num(), err) + d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) } } @@ -34,25 +34,9 @@ func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf // File utils. -func (s *session) getJournalFile(num uint64) storage.File { - return s.stor.GetFile(num, storage.TypeJournal) -} - -func (s *session) getTableFile(num uint64) storage.File { - return s.stor.GetFile(num, storage.TypeTable) -} - -func (s *session) getFiles(t storage.FileType) ([]storage.File, error) { - return s.stor.GetFiles(t) -} - -func (s *session) newTemp() storage.File { - num := atomic.AddUint64(&s.stTempFileNum, 1) - 1 - return s.stor.GetFile(num, storage.TypeTemp) -} - -func (s *session) tableFileFromRecord(r atRecord) *tFile { - return newTableFile(s.getTableFile(r.num), r.size, r.imin, r.imax) +func (s *session) newTemp() storage.FileDesc { + num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 + return storage.FileDesc{storage.TypeTemp, num} } // Session state. @@ -80,47 +64,65 @@ func (s *session) setVersion(v *version) { } // Get current unused file number. -func (s *session) nextFileNum() uint64 { - return atomic.LoadUint64(&s.stNextFileNum) +func (s *session) nextFileNum() int64 { + return atomic.LoadInt64(&s.stNextFileNum) } // Set current unused file number to num. -func (s *session) setNextFileNum(num uint64) { - atomic.StoreUint64(&s.stNextFileNum, num) +func (s *session) setNextFileNum(num int64) { + atomic.StoreInt64(&s.stNextFileNum, num) } // Mark file number as used. -func (s *session) markFileNum(num uint64) { +func (s *session) markFileNum(num int64) { nextFileNum := num + 1 for { old, x := s.stNextFileNum, nextFileNum if old > x { x = old } - if atomic.CompareAndSwapUint64(&s.stNextFileNum, old, x) { + if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { break } } } // Allocate a file number. -func (s *session) allocFileNum() uint64 { - return atomic.AddUint64(&s.stNextFileNum, 1) - 1 +func (s *session) allocFileNum() int64 { + return atomic.AddInt64(&s.stNextFileNum, 1) - 1 } // Reuse given file number. -func (s *session) reuseFileNum(num uint64) { +func (s *session) reuseFileNum(num int64) { for { old, x := s.stNextFileNum, num if old != x+1 { x = old } - if atomic.CompareAndSwapUint64(&s.stNextFileNum, old, x) { + if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) { break } } } +// Set compaction ptr at given level; need external synchronization. +func (s *session) setCompPtr(level int, ik iKey) { + if level >= len(s.stCompPtrs) { + newCompPtrs := make([]iKey, level+1) + copy(newCompPtrs, s.stCompPtrs) + s.stCompPtrs = newCompPtrs + } + s.stCompPtrs[level] = append(iKey{}, ik...) +} + +// Get compaction ptr at given level; need external synchronization. +func (s *session) getCompPtr(level int) iKey { + if level >= len(s.stCompPtrs) { + return nil + } + return s.stCompPtrs[level] +} + // Manifest related utils. // Fill given session record obj with current states; need external @@ -149,29 +151,28 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) { // Mark if record has been committed, this will update session state; // need external synchronization. -func (s *session) recordCommited(r *sessionRecord) { - if r.has(recJournalNum) { - s.stJournalNum = r.journalNum +func (s *session) recordCommited(rec *sessionRecord) { + if rec.has(recJournalNum) { + s.stJournalNum = rec.journalNum } - if r.has(recPrevJournalNum) { - s.stPrevJournalNum = r.prevJournalNum + if rec.has(recPrevJournalNum) { + s.stPrevJournalNum = rec.prevJournalNum } - if r.has(recSeqNum) { - s.stSeqNum = r.seqNum + if rec.has(recSeqNum) { + s.stSeqNum = rec.seqNum } - for _, p := range r.compPtrs { - s.stCompPtrs[p.level] = iKey(p.ikey) + for _, r := range rec.compPtrs { + s.setCompPtr(r.level, iKey(r.ikey)) } } // Create a new manifest file; need external synchronization. func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { - num := s.allocFileNum() - file := s.stor.GetFile(num, storage.TypeManifest) - writer, err := file.Create() + fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()} + writer, err := s.stor.Create(fd) if err != nil { return } @@ -182,7 +183,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { defer v.release() } if rec == nil { - rec = &sessionRecord{numLevel: s.o.GetNumLevel()} + rec = &sessionRecord{} } s.fillRecord(rec, true) v.fillRecord(rec) @@ -196,16 +197,16 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if s.manifestWriter != nil { s.manifestWriter.Close() } - if s.manifestFile != nil { - s.manifestFile.Remove() + if !s.manifestFd.Nil() { + s.stor.Remove(s.manifestFd) } - s.manifestFile = file + s.manifestFd = fd s.manifestWriter = writer s.manifest = jw } else { writer.Close() - file.Remove() - s.reuseFileNum(num) + s.stor.Remove(fd) + s.reuseFileNum(fd.Num) } }() @@ -221,7 +222,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if err != nil { return } - err = s.stor.SetManifest(file) + err = s.stor.SetMeta(fd) return } @@ -240,9 +241,11 @@ func (s *session) flushManifest(rec *sessionRecord) (err error) { if err != nil { return } - err = s.manifestWriter.Sync() - if err != nil { - return + if !s.o.GetNoSync() { + err = s.manifestWriter.Sync() + if err != nil { + return + } } s.recordCommited(rec) return |