diff options
Diffstat (limited to 'vendor/github.com/syndtr/goleveldb/leveldb/db_state.go')
-rw-r--r-- | vendor/github.com/syndtr/goleveldb/leveldb/db_state.go | 234 |
1 files changed, 234 insertions, 0 deletions
diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go new file mode 100644 index 000000000..85b02d24b --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -0,0 +1,234 @@ +// Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + "sync/atomic" + "time" + + "github.com/syndtr/goleveldb/leveldb/journal" + "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +type memDB struct { + db *DB + *memdb.DB + ref int32 +} + +func (m *memDB) getref() int32 { + return atomic.LoadInt32(&m.ref) +} + +func (m *memDB) incref() { + atomic.AddInt32(&m.ref, 1) +} + +func (m *memDB) decref() { + if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { + // Only put back memdb with std capacity. + if m.Capacity() == m.db.s.o.GetWriteBuffer() { + m.Reset() + m.db.mpoolPut(m.DB) + } + m.db = nil + m.DB = nil + } else if ref < 0 { + panic("negative memdb ref") + } +} + +// Get latest sequence number. +func (db *DB) getSeq() uint64 { + return atomic.LoadUint64(&db.seq) +} + +// Atomically adds delta to seq. +func (db *DB) addSeq(delta uint64) { + atomic.AddUint64(&db.seq, delta) +} + +func (db *DB) setSeq(seq uint64) { + atomic.StoreUint64(&db.seq, seq) +} + +func (db *DB) sampleSeek(ikey internalKey) { + v := db.s.version() + if v.sampleSeek(ikey) { + // Trigger table compaction. + db.compTrigger(db.tcompCmdC) + } + v.release() +} + +func (db *DB) mpoolPut(mem *memdb.DB) { + if !db.isClosed() { + select { + case db.memPool <- mem: + default: + } + } +} + +func (db *DB) mpoolGet(n int) *memDB { + var mdb *memdb.DB + select { + case mdb = <-db.memPool: + default: + } + if mdb == nil || mdb.Capacity() < n { + mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) + } + return &memDB{ + db: db, + DB: mdb, + } +} + +func (db *DB) mpoolDrain() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + select { + case <-db.memPool: + default: + } + case <-db.closeC: + ticker.Stop() + // Make sure the pool is drained. + select { + case <-db.memPool: + case <-time.After(time.Second): + } + close(db.memPool) + return + } + } +} + +// Create new memdb and froze the old one; need external synchronization. +// newMem only called synchronously by the writer. +func (db *DB) newMem(n int) (mem *memDB, err error) { + fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()} + w, err := db.s.stor.Create(fd) + if err != nil { + db.s.reuseFileNum(fd.Num) + return + } + + db.memMu.Lock() + defer db.memMu.Unlock() + + if db.frozenMem != nil { + panic("still has frozen mem") + } + + if db.journal == nil { + db.journal = journal.NewWriter(w) + } else { + db.journal.Reset(w) + db.journalWriter.Close() + db.frozenJournalFd = db.journalFd + } + db.journalWriter = w + db.journalFd = fd + db.frozenMem = db.mem + mem = db.mpoolGet(n) + mem.incref() // for self + mem.incref() // for caller + db.mem = mem + // The seq only incremented by the writer. And whoever called newMem + // should hold write lock, so no need additional synchronization here. + db.frozenSeq = db.seq + return +} + +// Get all memdbs. +func (db *DB) getMems() (e, f *memDB) { + db.memMu.RLock() + defer db.memMu.RUnlock() + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { + panic("nil effective mem") + } + if db.frozenMem != nil { + db.frozenMem.incref() + } + return db.mem, db.frozenMem +} + +// Get effective memdb. +func (db *DB) getEffectiveMem() *memDB { + db.memMu.RLock() + defer db.memMu.RUnlock() + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { + panic("nil effective mem") + } + return db.mem +} + +// Check whether we has frozen memdb. +func (db *DB) hasFrozenMem() bool { + db.memMu.RLock() + defer db.memMu.RUnlock() + return db.frozenMem != nil +} + +// Get frozen memdb. +func (db *DB) getFrozenMem() *memDB { + db.memMu.RLock() + defer db.memMu.RUnlock() + if db.frozenMem != nil { + db.frozenMem.incref() + } + return db.frozenMem +} + +// Drop frozen memdb; assume that frozen memdb isn't nil. +func (db *DB) dropFrozenMem() { + db.memMu.Lock() + if err := db.s.stor.Remove(db.frozenJournalFd); err != nil { + db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err) + } else { + db.logf("journal@remove removed @%d", db.frozenJournalFd.Num) + } + db.frozenJournalFd = storage.FileDesc{} + db.frozenMem.decref() + db.frozenMem = nil + db.memMu.Unlock() +} + +// Clear mems ptr; used by DB.Close(). +func (db *DB) clearMems() { + db.memMu.Lock() + db.mem = nil + db.frozenMem = nil + db.memMu.Unlock() +} + +// Set closed flag; return true if not already closed. +func (db *DB) setClosed() bool { + return atomic.CompareAndSwapUint32(&db.closed, 0, 1) +} + +// Check whether DB was closed. +func (db *DB) isClosed() bool { + return atomic.LoadUint32(&db.closed) != 0 +} + +// Check read ok status. +func (db *DB) ok() error { + if db.isClosed() { + return ErrClosed + } + return nil +} |