aboutsummaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go')
-rw-r--r--Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go78
1 files changed, 39 insertions, 39 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
index eb6abd0fb..a02cb2c50 100644
--- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
+++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go
@@ -53,14 +53,13 @@ type DB struct {
aliveSnaps, aliveIters int32
// Write.
- writeC chan *Batch
+ batchPool sync.Pool
+ writeMergeC chan writeMerge
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
writeDelay time.Duration
writeDelayN int
- journalC chan *Batch
- journalAckC chan error
tr *Transaction
// Compaction.
@@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) {
// Snapshot
snapsList: list.New(),
// Write
- writeC: make(chan *Batch),
+ batchPool: sync.Pool{New: newBatch},
+ writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error),
- journalC: make(chan *Batch),
- journalAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}),
@@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) {
if readOnly {
db.SetReadOnly()
} else {
- db.closeW.Add(3)
+ db.closeW.Add(2)
go db.tCompaction()
go db.mCompaction()
- go db.jWriter()
+ // go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start))
@@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) {
// os.ErrExist error.
//
// Open will return an error with type of ErrCorrupted if corruption
-// detected in the DB. Corrupted DB can be recovered with Recover
-// function.
+// detected in the DB. Use errors.IsCorrupted to test whether an error is
+// due to corruption. Corrupted DB can be recovered with Recover function.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
@@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// os.ErrExist error.
//
// OpenFile uses standard file-system backed storage implementation as
-// desribed in the leveldb/storage package.
+// described in the leveldb/storage package.
//
// OpenFile will return an error with type of ErrCorrupted if corruption
-// detected in the DB. Corrupted DB can be recovered with Recover
-// function.
+// detected in the DB. Use errors.IsCorrupted to test whether an error is
+// due to corruption. Corrupted DB can be recovered with Recover function.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, o.GetReadOnly())
@@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o)
@@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
//
-// RecoverFile uses standard file-system backed storage implementation as desribed
+// RecoverFile uses standard file-system backed storage implementation as described
// in the leveldb/storage package.
//
-// The returned DB instance is goroutine-safe.
+// The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, false)
@@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error {
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
- jr *journal.Reader
- mdb = memdb.New(db.s.icmp, writeBuffer)
- buf = &util.Buffer{}
- batch = &Batch{}
+ jr *journal.Reader
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ buf = &util.Buffer{}
+ batchSeq uint64
+ batchLen int
)
for _, fd := range fds {
@@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error {
}
// Flush memdb and remove obsolete journal file.
- if !ofd.Nil() {
+ if !ofd.Zero() {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
@@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error {
fr.Close()
return errors.SetFd(err, fd)
}
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+ if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
@@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error {
}
// Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
+ db.seq = batchSeq + uint64(batchLen)
// Flush it if large enough.
if mdb.Size() >= writeBuffer {
@@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error {
}
// Remove the last obsolete journal file.
- if !ofd.Nil() {
+ if !ofd.Zero() {
db.s.stor.Remove(ofd)
}
@@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error {
db.logf("journal@recovery RO·Mode F·%d", len(fds))
var (
- jr *journal.Reader
- buf = &util.Buffer{}
- batch = &Batch{}
+ jr *journal.Reader
+ buf = &util.Buffer{}
+ batchSeq uint64
+ batchLen int
)
for _, fd := range fds {
@@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error {
fr.Close()
return errors.SetFd(err, fd)
}
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+ if err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
@@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error {
}
// Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
+ db.seq = batchSeq + uint64(batchLen)
}
fr.Close()
@@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// NewIterator returns an iterator for the latest snapshot of the
// underlying DB.
-// The returned iterator is not goroutine-safe, but it is safe to use
+// The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be
@@ -1062,6 +1064,8 @@ func (db *DB) Close() error {
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
+ db.journal = nil
+ db.journalWriter = nil
}
if db.writeDelayN > 0 {
@@ -1077,15 +1081,11 @@ func (db *DB) Close() error {
if err1 := db.closer.Close(); err == nil {
err = err1
}
+ db.closer = nil
}
- // NIL'ing pointers.
- db.s = nil
- db.mem = nil
- db.frozenMem = nil
- db.journal = nil
- db.journalWriter = nil
- db.closer = nil
+ // Clear memdbs.
+ db.clearMems()
return err
}