diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go | 68 |
1 files changed, 38 insertions, 30 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index d4db9d6dd..0207e221e 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -12,14 +12,19 @@ import ( "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/storage" ) type memDB struct { - db *DB - mdb *memdb.DB + db *DB + *memdb.DB ref int32 } +func (m *memDB) getref() int32 { + return atomic.LoadInt32(&m.ref) +} + func (m *memDB) incref() { atomic.AddInt32(&m.ref, 1) } @@ -27,12 +32,12 @@ func (m *memDB) incref() { func (m *memDB) decref() { if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { // Only put back memdb with std capacity. - if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() { - m.mdb.Reset() - m.db.mpoolPut(m.mdb) + if m.Capacity() == m.db.s.o.GetWriteBuffer() { + m.Reset() + m.db.mpoolPut(m.DB) } m.db = nil - m.mdb = nil + m.DB = nil } else if ref < 0 { panic("negative memdb ref") } @@ -48,11 +53,15 @@ func (db *DB) addSeq(delta uint64) { atomic.AddUint64(&db.seq, delta) } +func (db *DB) setSeq(seq uint64) { + atomic.StoreUint64(&db.seq, seq) +} + func (db *DB) sampleSeek(ikey iKey) { v := db.s.version() if v.sampleSeek(ikey) { // Trigger table compaction. - db.compSendTrigger(db.tcompCmdC) + db.compTrigger(db.tcompCmdC) } v.release() } @@ -67,12 +76,18 @@ func (db *DB) mpoolPut(mem *memdb.DB) { } } -func (db *DB) mpoolGet() *memdb.DB { +func (db *DB) mpoolGet(n int) *memDB { + var mdb *memdb.DB select { - case mem := <-db.memPool: - return mem + case mdb = <-db.memPool: default: - return nil + } + if mdb == nil || mdb.Capacity() < n { + mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) + } + return &memDB{ + db: db, + DB: mdb, } } @@ -95,11 +110,10 @@ func (db *DB) mpoolDrain() { // Create new memdb and froze the old one; need external synchronization. // newMem only called synchronously by the writer. func (db *DB) newMem(n int) (mem *memDB, err error) { - num := db.s.allocFileNum() - file := db.s.getJournalFile(num) - w, err := file.Create() + fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()} + w, err := db.s.stor.Create(fd) if err != nil { - db.s.reuseFileNum(num) + db.s.reuseFileNum(fd.Num) return } @@ -115,20 +129,14 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { } else { db.journal.Reset(w) db.journalWriter.Close() - db.frozenJournalFile = db.journalFile + db.frozenJournalFd = db.journalFd } db.journalWriter = w - db.journalFile = file + db.journalFd = fd db.frozenMem = db.mem - mdb := db.mpoolGet() - if mdb == nil || mdb.Capacity() < n { - mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) - } - mem = &memDB{ - db: db, - mdb: mdb, - ref: 2, - } + mem = db.mpoolGet(n) + mem.incref() // for self + mem.incref() // for caller db.mem = mem // The seq only incremented by the writer. And whoever called newMem // should hold write lock, so no need additional synchronization here. @@ -181,12 +189,12 @@ func (db *DB) getFrozenMem() *memDB { // Drop frozen memdb; assume that frozen memdb isn't nil. func (db *DB) dropFrozenMem() { db.memMu.Lock() - if err := db.frozenJournalFile.Remove(); err != nil { - db.logf("journal@remove removing @%d %q", db.frozenJournalFile.Num(), err) + if err := db.s.stor.Remove(db.frozenJournalFd); err != nil { + db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err) } else { - db.logf("journal@remove removed @%d", db.frozenJournalFile.Num()) + db.logf("journal@remove removed @%d", db.frozenJournalFd.Num) } - db.frozenJournalFile = nil + db.frozenJournalFd = storage.FileDesc{} db.frozenMem.decref() db.frozenMem = nil db.memMu.Unlock() |