diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go | 281 |
1 files changed, 137 insertions, 144 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 447407aba..a94cf4c84 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -11,109 +11,79 @@ import ( "time" "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" ) var ( errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting") ) -type cStats struct { - sync.Mutex +type cStat struct { duration time.Duration - read uint64 - write uint64 + read int64 + write int64 } -func (p *cStats) add(n *cStatsStaging) { - p.Lock() +func (p *cStat) add(n *cStatStaging) { p.duration += n.duration p.read += n.read p.write += n.write - p.Unlock() } -func (p *cStats) get() (duration time.Duration, read, write uint64) { - p.Lock() - defer p.Unlock() +func (p *cStat) get() (duration time.Duration, read, write int64) { return p.duration, p.read, p.write } -type cStatsStaging struct { +type cStatStaging struct { start time.Time duration time.Duration on bool - read uint64 - write uint64 + read int64 + write int64 } -func (p *cStatsStaging) startTimer() { +func (p *cStatStaging) startTimer() { if !p.on { p.start = time.Now() p.on = true } } -func (p *cStatsStaging) stopTimer() { +func (p *cStatStaging) stopTimer() { if p.on { p.duration += time.Since(p.start) p.on = false } } -type cMem struct { - s *session - level int - rec *sessionRecord -} - -func newCMem(s *session) *cMem { - return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}} +type cStats struct { + lk sync.Mutex + stats []cStat } -func (c *cMem) flush(mem *memdb.DB, level int) error { - s := c.s - - // Write memdb to table. - iter := mem.NewIterator(nil) - defer iter.Release() - t, n, err := s.tops.createFrom(iter) - if err != nil { - return err - } - - // Pick level. - if level < 0 { - v := s.version() - level = v.pickLevel(t.imin.ukey(), t.imax.ukey()) - v.release() +func (p *cStats) addStat(level int, n *cStatStaging) { + p.lk.Lock() + if level >= len(p.stats) { + newStats := make([]cStat, level+1) + copy(newStats, p.stats) + p.stats = newStats } - c.rec.addTableFile(level, t) - - s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax) - - c.level = level - return nil + p.stats[level].add(n) + p.lk.Unlock() } -func (c *cMem) reset() { - c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()} -} - -func (c *cMem) commit(journal, seq uint64) error { - c.rec.setJournalNum(journal) - c.rec.setSeqNum(seq) - - // Commit changes. - return c.s.commit(c.rec) +func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) { + p.lk.Lock() + defer p.lk.Unlock() + if level < len(p.stats) { + return p.stats[level].get() + } + return } func (db *DB) compactionError() { - var ( - err error - wlocked bool - ) + var err error noerr: // No error. for { @@ -121,7 +91,7 @@ noerr: case err = <-db.compErrSetC: switch { case err == nil: - case errors.IsCorrupted(err): + case err == ErrReadOnly, errors.IsCorrupted(err): goto hasperr default: goto haserr @@ -139,7 +109,7 @@ haserr: switch { case err == nil: goto noerr - case errors.IsCorrupted(err): + case err == ErrReadOnly, errors.IsCorrupted(err): goto hasperr default: } @@ -155,9 +125,9 @@ hasperr: case db.compPerErrC <- err: case db.writeLockC <- struct{}{}: // Hold write lock, so that write won't pass-through. - wlocked = true + db.compWriteLocking = true case _, _ = <-db.closeC: - if wlocked { + if db.compWriteLocking { // We should release the lock or Close will hang. <-db.writeLockC } @@ -286,22 +256,27 @@ func (db *DB) compactionExitTransact() { panic(errCompactionTransactExiting) } +func (db *DB) compactionCommit(name string, rec *sessionRecord) { + db.compCommitLk.Lock() + defer db.compCommitLk.Unlock() // Defer is necessary. + db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { + return db.s.commit(rec) + }, nil) +} + func (db *DB) memCompaction() { - mem := db.getFrozenMem() - if mem == nil { + mdb := db.getFrozenMem() + if mdb == nil { return } - defer mem.decref() - - c := newCMem(db.s) - stats := new(cStatsStaging) + defer mdb.decref() - db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size())) + db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size())) // Don't compact empty memdb. - if mem.mdb.Len() == 0 { - db.logf("mem@flush skipping") - // drop frozen mem + if mdb.Len() == 0 { + db.logf("memdb@flush skipping") + // drop frozen memdb db.dropFrozenMem() return } @@ -317,35 +292,44 @@ func (db *DB) memCompaction() { return } - db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) { + var ( + rec = &sessionRecord{} + stats = &cStatStaging{} + flushLevel int + ) + + // Generate tables. + db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() - defer stats.stopTimer() - return c.flush(mem.mdb, -1) + flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel) + stats.stopTimer() + return }, func() error { - for _, r := range c.rec.addedTables { - db.logf("mem@flush revert @%d", r.num) - f := db.s.getTableFile(r.num) - if err := f.Remove(); err != nil { + for _, r := range rec.addedTables { + db.logf("memdb@flush revert @%d", r.num) + if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil { return err } } return nil }) - db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) { - stats.startTimer() - defer stats.stopTimer() - return c.commit(db.journalFile.Num(), db.frozenSeq) - }, nil) + rec.setJournalNum(db.journalFd.Num) + rec.setSeqNum(db.frozenSeq) + + // Commit. + stats.startTimer() + db.compactionCommit("memdb", rec) + stats.stopTimer() - db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration) + db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) - for _, r := range c.rec.addedTables { + for _, r := range rec.addedTables { stats.write += r.size } - db.compStats[c.level].add(stats) + db.compStats.addStat(flushLevel, stats) - // Drop frozen mem. + // Drop frozen memdb. db.dropFrozenMem() // Resume table compaction. @@ -359,7 +343,7 @@ func (db *DB) memCompaction() { } // Trigger table compaction. - db.compSendTrigger(db.tcompCmdC) + db.compTrigger(db.tcompCmdC) } type tableCompactionBuilder struct { @@ -367,7 +351,7 @@ type tableCompactionBuilder struct { s *session c *compaction rec *sessionRecord - stat0, stat1 *cStatsStaging + stat0, stat1 *cStatStaging snapHasLastUkey bool snapLastUkey []byte @@ -421,9 +405,9 @@ func (b *tableCompactionBuilder) flush() error { if err != nil { return err } - b.rec.addTableFile(b.c.level+1, t) + b.rec.addTableFile(b.c.sourceLevel+1, t) b.stat1.write += t.size - b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.level+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) + b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) b.tw = nil return nil } @@ -546,8 +530,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error { func (b *tableCompactionBuilder) revert() error { for _, at := range b.rec.addedTables { b.s.logf("table@build revert @%d", at.num) - f := b.s.getTableFile(at.num) - if err := f.Remove(); err != nil { + if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil { return err } } @@ -557,31 +540,31 @@ func (b *tableCompactionBuilder) revert() error { func (db *DB) tableCompaction(c *compaction, noTrivial bool) { defer c.release() - rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()} - rec.addCompPtr(c.level, c.imax) + rec := &sessionRecord{} + rec.addCompPtr(c.sourceLevel, c.imax) if !noTrivial && c.trivial() { - t := c.tables[0][0] - db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1) - rec.delTable(c.level, t.file.Num()) - rec.addTableFile(c.level+1, t) + t := c.levels[0][0] + db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1) + rec.delTable(c.sourceLevel, t.fd.Num) + rec.addTableFile(c.sourceLevel+1, t) db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) { return db.s.commit(rec) }, nil) return } - var stats [2]cStatsStaging - for i, tables := range c.tables { + var stats [2]cStatStaging + for i, tables := range c.levels { for _, t := range tables { stats[i].read += t.size // Insert deleted tables into record - rec.delTable(c.level+i, t.file.Num()) + rec.delTable(c.sourceLevel+i, t.fd.Num) } } sourceSize := int(stats[0].read + stats[1].read) minSeq := db.minSeq() - db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq) + db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq) b := &tableCompactionBuilder{ db: db, @@ -591,49 +574,60 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { stat1: &stats[1], minSeq: minSeq, strict: db.s.o.GetStrict(opt.StrictCompaction), - tableSize: db.s.o.GetCompactionTableSize(c.level + 1), + tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1), } db.compactionTransact("table@build", b) - // Commit changes - db.compactionTransactFunc("table@commit", func(cnt *compactionTransactCounter) (err error) { - stats[1].startTimer() - defer stats[1].stopTimer() - return db.s.commit(rec) - }, nil) + // Commit. + stats[1].startTimer() + db.compactionCommit("table", rec) + stats[1].stopTimer() resultSize := int(stats[1].write) db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration) // Save compaction stats for i := range stats { - db.compStats[c.level+1].add(&stats[i]) + db.compStats.addStat(c.sourceLevel+1, &stats[i]) } } -func (db *DB) tableRangeCompaction(level int, umin, umax []byte) { +func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { db.logf("table@compaction range L%d %q:%q", level, umin, umax) - if level >= 0 { - if c := db.s.getCompactionRange(level, umin, umax); c != nil { + if c := db.s.getCompactionRange(level, umin, umax, true); c != nil { db.tableCompaction(c, true) } } else { - v := db.s.version() - m := 1 - for i, t := range v.tables[1:] { - if t.overlaps(db.s.icmp, umin, umax, false) { - m = i + 1 + // Retry until nothing to compact. + for { + compacted := false + + // Scan for maximum level with overlapped tables. + v := db.s.version() + m := 1 + for i := m; i < len(v.levels); i++ { + tables := v.levels[i] + if tables.overlaps(db.s.icmp, umin, umax, false) { + m = i + } } - } - v.release() + v.release() - for level := 0; level < m; level++ { - if c := db.s.getCompactionRange(level, umin, umax); c != nil { - db.tableCompaction(c, true) + for level := 0; level < m; level++ { + if c := db.s.getCompactionRange(level, umin, umax, false); c != nil { + db.tableCompaction(c, true) + compacted = true + } + } + + if !compacted { + break } } } + + return nil } func (db *DB) tableAutoCompaction() { @@ -660,11 +654,11 @@ type cCmd interface { ack(err error) } -type cIdle struct { +type cAuto struct { ackC chan<- error } -func (r cIdle) ack(err error) { +func (r cAuto) ack(err error) { if r.ackC != nil { defer func() { recover() @@ -688,13 +682,21 @@ func (r cRange) ack(err error) { } } +// This will trigger auto compaction but will not wait for it. +func (db *DB) compTrigger(compC chan<- cCmd) { + select { + case compC <- cAuto{}: + default: + } +} + // This will trigger auto compation and/or wait for all compaction to be done. -func (db *DB) compSendIdle(compC chan<- cCmd) (err error) { +func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { ch := make(chan error) defer close(ch) // Send cmd. select { - case compC <- cIdle{ch}: + case compC <- cAuto{ch}: case err = <-db.compErrC: return case _, _ = <-db.closeC: @@ -710,16 +712,8 @@ func (db *DB) compSendIdle(compC chan<- cCmd) (err error) { return err } -// This will trigger auto compaction but will not wait for it. -func (db *DB) compSendTrigger(compC chan<- cCmd) { - select { - case compC <- cIdle{}: - default: - } -} - // Send range compaction request. -func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { +func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) { ch := make(chan error) defer close(ch) // Send cmd. @@ -759,7 +753,7 @@ func (db *DB) mCompaction() { select { case x = <-db.mcompCmdC: switch x.(type) { - case cIdle: + case cAuto: db.memCompaction() x.ack(nil) x = nil @@ -820,11 +814,10 @@ func (db *DB) tCompaction() { } if x != nil { switch cmd := x.(type) { - case cIdle: + case cAuto: ackQ = append(ackQ, x) case cRange: - db.tableRangeCompaction(cmd.level, cmd.min, cmd.max) - x.ack(nil) + x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) default: panic("leveldb: unknown command") } |