diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go | 78 |
1 files changed, 39 insertions, 39 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index eb6abd0fb..a02cb2c50 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -53,14 +53,13 @@ type DB struct { aliveSnaps, aliveIters int32 // Write. - writeC chan *Batch + batchPool sync.Pool + writeMergeC chan writeMerge writeMergedC chan bool writeLockC chan struct{} writeAckC chan error writeDelay time.Duration writeDelayN int - journalC chan *Batch - journalAckC chan error tr *Transaction // Compaction. @@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) { // Snapshot snapsList: list.New(), // Write - writeC: make(chan *Batch), + batchPool: sync.Pool{New: newBatch}, + writeMergeC: make(chan writeMerge), writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), writeAckC: make(chan error), - journalC: make(chan *Batch), - journalAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), tcompPauseC: make(chan chan<- struct{}), @@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) { if readOnly { db.SetReadOnly() } else { - db.closeW.Add(3) + db.closeW.Add(2) go db.tCompaction() go db.mCompaction() - go db.jWriter() + // go db.jWriter() } s.logf("db@open done T·%v", time.Since(start)) @@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) { // os.ErrExist error. // // Open will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { // os.ErrExist error. // // OpenFile uses standard file-system backed storage implementation as -// desribed in the leveldb/storage package. +// described in the leveldb/storage package. // // OpenFile will return an error with type of ErrCorrupted if corruption -// detected in the DB. Corrupted DB can be recovered with Recover -// function. +// detected in the DB. Use errors.IsCorrupted to test whether an error is +// due to corruption. Corrupted DB can be recovered with Recover function. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, o.GetReadOnly()) @@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { s, err := newSession(stor, o) @@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // -// RecoverFile uses standard file-system backed storage implementation as desribed +// RecoverFile uses standard file-system backed storage implementation as described // in the leveldb/storage package. // -// The returned DB instance is goroutine-safe. +// The returned DB instance is safe for concurrent use. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path, false) @@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error { checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) writeBuffer = db.s.o.GetWriteBuffer() - jr *journal.Reader - mdb = memdb.New(db.s.icmp, writeBuffer) - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + mdb = memdb.New(db.s.icmp, writeBuffer) + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error { } // Flush memdb and remove obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { if mdb.Len() > 0 { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { fr.Close() @@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) // Flush it if large enough. if mdb.Size() >= writeBuffer { @@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error { } // Remove the last obsolete journal file. - if !ofd.Nil() { + if !ofd.Zero() { db.s.stor.Remove(ofd) } @@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error { db.logf("journal@recovery RO·Mode F·%d", len(fds)) var ( - jr *journal.Reader - buf = &util.Buffer{} - batch = &Batch{} + jr *journal.Reader + buf = &util.Buffer{} + batchSeq uint64 + batchLen int ) for _, fd := range fds { @@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error { fr.Close() return errors.SetFd(err, fd) } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb) + if err != nil { if !strict && errors.IsCorrupted(err) { db.s.logf("journal error: %v (skipped)", err) // We won't apply sequence number as it might be corrupted. @@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error { } // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) + db.seq = batchSeq + uint64(batchLen) } fr.Close() @@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // NewIterator returns an iterator for the latest snapshot of the // underlying DB. -// The returned iterator is not goroutine-safe, but it is safe to use +// The returned iterator is not safe for concurrent use, but it is safe to use // multiple iterators concurrently, with each in a dedicated goroutine. // It is also safe to use an iterator concurrently with modifying its // underlying DB. The resultant key/value pairs are guaranteed to be @@ -1062,6 +1064,8 @@ func (db *DB) Close() error { if db.journal != nil { db.journal.Close() db.journalWriter.Close() + db.journal = nil + db.journalWriter = nil } if db.writeDelayN > 0 { @@ -1077,15 +1081,11 @@ func (db *DB) Close() error { if err1 := db.closer.Close(); err == nil { err = err1 } + db.closer = nil } - // NIL'ing pointers. - db.s = nil - db.mem = nil - db.frozenMem = nil - db.journal = nil - db.journalWriter = nil - db.closer = nil + // Clear memdbs. + db.clearMems() return err } |