diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go | 113 |
1 files changed, 80 insertions, 33 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index e1cf30c53..5200be6fc 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -45,9 +45,9 @@ func (db *DB) jWriter() { } } -func (db *DB) rotateMem(n int) (mem *memDB, err error) { +func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { // Wait for pending memdb compaction. - err = db.compSendIdle(db.mcompCmdC) + err = db.compTriggerWait(db.mcompCmdC) if err != nil { return } @@ -59,46 +59,50 @@ func (db *DB) rotateMem(n int) (mem *memDB, err error) { } // Schedule memdb compaction. - db.compSendTrigger(db.mcompCmdC) + if wait { + err = db.compTriggerWait(db.mcompCmdC) + } else { + db.compTrigger(db.mcompCmdC) + } return } -func (db *DB) flush(n int) (mem *memDB, nn int, err error) { +func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false flush := func() (retry bool) { v := db.s.version() defer v.release() - mem = db.getEffectiveMem() + mdb = db.getEffectiveMem() defer func() { if retry { - mem.decref() - mem = nil + mdb.decref() + mdb = nil } }() - nn = mem.mdb.Free() + mdbFree = mdb.Free() switch { case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: delayed = true time.Sleep(time.Millisecond) - case nn >= n: + case mdbFree >= n: return false case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): delayed = true - err = db.compSendIdle(db.tcompCmdC) + err = db.compTriggerWait(db.tcompCmdC) if err != nil { return false } default: // Allow memdb to grow if it has no entry. - if mem.mdb.Len() == 0 { - nn = n + if mdb.Len() == 0 { + mdbFree = n } else { - mem.decref() - mem, err = db.rotateMem(n) + mdb.decref() + mdb, err = db.rotateMem(n, false) if err == nil { - nn = mem.mdb.Free() + mdbFree = mdb.Free() } else { - nn = 0 + mdbFree = 0 } } return false @@ -129,7 +133,20 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { return } - b.init(wo.GetSync()) + b.init(wo.GetSync() && !db.s.o.GetNoSync()) + + if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { + // Writes using transaction. + tr, err1 := db.OpenTransaction() + if err1 != nil { + return err1 + } + if err1 := tr.Write(b, wo); err1 != nil { + tr.Discard() + return err1 + } + return tr.Commit() + } // The write happen synchronously. select { @@ -137,6 +154,8 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { if <-db.writeMergedC { return <-db.writeAckC } + // Continue, the write lock already acquired by previous writer + // and handed out to us. case db.writeLockC <- struct{}{}: case err = <-db.compPerErrC: return @@ -148,6 +167,7 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { danglingMerge := false defer func() { if danglingMerge { + // Only one dangling merge at most, so this is safe. db.writeMergedC <- false } else { <-db.writeLockC @@ -157,18 +177,18 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { } }() - mem, memFree, err := db.flush(b.size()) + mdb, mdbFree, err := db.flush(b.size()) if err != nil { return } - defer mem.decref() + defer mdb.decref() // Calculate maximum size of the batch. m := 1 << 20 if x := b.size(); x <= 128<<10 { m = x + (128 << 10) } - m = minInt(m, memFree) + m = minInt(m, mdbFree) // Merge with other batch. drain: @@ -197,7 +217,7 @@ drain: select { case db.journalC <- b: // Write into memdb - if berr := b.memReplay(mem.mdb); berr != nil { + if berr := b.memReplay(mdb.DB); berr != nil { panic(berr) } case err = <-db.compPerErrC: @@ -211,7 +231,7 @@ drain: case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected - if berr := b.revertMemReplay(mem.mdb); berr != nil { + if berr := b.revertMemReplay(mdb.DB); berr != nil { panic(berr) } return @@ -225,7 +245,7 @@ drain: if err != nil { return } - if berr := b.memReplay(mem.mdb); berr != nil { + if berr := b.memReplay(mdb.DB); berr != nil { panic(berr) } } @@ -233,8 +253,8 @@ drain: // Set last seq number. db.addSeq(uint64(b.Len())) - if b.size() >= memFree { - db.rotateMem(0) + if b.size() >= mdbFree { + db.rotateMem(0, false) } return } @@ -249,8 +269,7 @@ func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { return db.Write(b, wo) } -// Delete deletes the value for the given key. It returns ErrNotFound if -// the DB does not contain the key. +// Delete deletes the value for the given key. // // It is safe to modify the contents of the arguments after Delete returns. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { @@ -290,16 +309,16 @@ func (db *DB) CompactRange(r util.Range) error { } // Check for overlaps in memdb. - mem := db.getEffectiveMem() - defer mem.decref() - if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) { + mdb := db.getEffectiveMem() + defer mdb.decref() + if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { // Memdb compaction. - if _, err := db.rotateMem(0); err != nil { + if _, err := db.rotateMem(0, false); err != nil { <-db.writeLockC return err } <-db.writeLockC - if err := db.compSendIdle(db.mcompCmdC); err != nil { + if err := db.compTriggerWait(db.mcompCmdC); err != nil { return err } } else { @@ -307,5 +326,33 @@ func (db *DB) CompactRange(r util.Range) error { } // Table compaction. - return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit) + return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) +} + +// SetReadOnly makes DB read-only. It will stay read-only until reopened. +func (db *DB) SetReadOnly() error { + if err := db.ok(); err != nil { + return err + } + + // Lock writer. + select { + case db.writeLockC <- struct{}{}: + db.compWriteLocking = true + case err := <-db.compPerErrC: + return err + case _, _ = <-db.closeC: + return ErrClosed + } + + // Set compaction read-only. + select { + case db.compErrSetC <- ErrReadOnly: + case perr := <-db.compPerErrC: + return perr + case _, _ = <-db.closeC: + return ErrClosed + } + + return nil } |