diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go | 345 |
1 files changed, 216 insertions, 129 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go index 501006717..225920002 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/batch.go @@ -9,13 +9,15 @@ package leveldb import ( "encoding/binary" "fmt" + "io" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/storage" ) -// ErrBatchCorrupted records reason of batch corruption. +// ErrBatchCorrupted records reason of batch corruption. This error will be +// wrapped with errors.ErrCorrupted. type ErrBatchCorrupted struct { Reason string } @@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error { } const ( - batchHdrLen = 8 + 4 - batchGrowRec = 3000 + batchHeaderLen = 8 + 4 + batchGrowRec = 3000 + batchBufioSize = 16 ) // BatchReplay wraps basic batch operations. @@ -39,34 +42,46 @@ type BatchReplay interface { Delete(key []byte) } +type batchIndex struct { + keyType keyType + keyPos, keyLen int + valuePos, valueLen int +} + +func (index batchIndex) k(data []byte) []byte { + return data[index.keyPos : index.keyPos+index.keyLen] +} + +func (index batchIndex) v(data []byte) []byte { + if index.valueLen != 0 { + return data[index.valuePos : index.valuePos+index.valueLen] + } + return nil +} + +func (index batchIndex) kv(data []byte) (key, value []byte) { + return index.k(data), index.v(data) +} + // Batch is a write batch. type Batch struct { - data []byte - rLen, bLen int - seq uint64 - sync bool + data []byte + index []batchIndex + + // internalLen is sums of key/value pair length plus 8-bytes internal key. + internalLen int } func (b *Batch) grow(n int) { - off := len(b.data) - if off == 0 { - off = batchHdrLen - if b.data != nil { - b.data = b.data[:off] - } - } - if cap(b.data)-off < n { - if b.data == nil { - b.data = make([]byte, off, off+n) - } else { - odata := b.data - div := 1 - if b.rLen > batchGrowRec { - div = b.rLen / batchGrowRec - } - b.data = make([]byte, off, off+n+(off-batchHdrLen)/div) - copy(b.data, odata) + o := len(b.data) + if cap(b.data)-o < n { + div := 1 + if len(b.index) > batchGrowRec { + div = len(b.index) / batchGrowRec } + ndata := make([]byte, o, o+n+o/div) + copy(ndata, b.data) + b.data = ndata } } @@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) { n += binary.MaxVarintLen32 + len(value) } b.grow(n) - off := len(b.data) - data := b.data[:off+n] - data[off] = byte(kt) - off++ - off += binary.PutUvarint(data[off:], uint64(len(key))) - copy(data[off:], key) - off += len(key) + index := batchIndex{keyType: kt} + o := len(b.data) + data := b.data[:o+n] + data[o] = byte(kt) + o++ + o += binary.PutUvarint(data[o:], uint64(len(key))) + index.keyPos = o + index.keyLen = len(key) + o += copy(data[o:], key) if kt == keyTypeVal { - off += binary.PutUvarint(data[off:], uint64(len(value))) - copy(data[off:], value) - off += len(value) + o += binary.PutUvarint(data[o:], uint64(len(value))) + index.valuePos = o + index.valueLen = len(value) + o += copy(data[o:], value) } - b.data = data[:off] - b.rLen++ - // Include 8-byte ikey header - b.bLen += len(key) + len(value) + 8 + b.data = data[:o] + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 } // Put appends 'put operation' of the given key/value pair to the batch. -// It is safe to modify the contents of the argument after Put returns. +// It is safe to modify the contents of the argument after Put returns but not +// before. func (b *Batch) Put(key, value []byte) { b.appendRec(keyTypeVal, key, value) } // Delete appends 'delete operation' of the given key to the batch. -// It is safe to modify the contents of the argument after Delete returns. +// It is safe to modify the contents of the argument after Delete returns but +// not before. func (b *Batch) Delete(key []byte) { b.appendRec(keyTypeDel, key, nil) } @@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) { // The returned slice is not its own copy, so the contents should not be // modified. func (b *Batch) Dump() []byte { - return b.encode() + return b.data } // Load loads given slice into the batch. Previous contents of the batch @@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte { // The given slice will not be copied and will be used as batch buffer, so // it is not safe to modify the contents of the slice. func (b *Batch) Load(data []byte) error { - return b.decode(0, data) + return b.decode(data, -1) } // Replay replays batch contents. func (b *Batch) Replay(r BatchReplay) error { - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - switch kt { + for _, index := range b.index { + switch index.keyType { case keyTypeVal: - r.Put(key, value) + r.Put(index.k(b.data), index.v(b.data)) case keyTypeDel: - r.Delete(key) + r.Delete(index.k(b.data)) } - return nil - }) + } + return nil } // Len returns number of records in the batch. func (b *Batch) Len() int { - return b.rLen + return len(b.index) } // Reset resets the batch. func (b *Batch) Reset() { b.data = b.data[:0] - b.seq = 0 - b.rLen = 0 - b.bLen = 0 - b.sync = false + b.index = b.index[:0] + b.internalLen = 0 } -func (b *Batch) init(sync bool) { - b.sync = sync +func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { + for i, index := range b.index { + if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { + return err + } + } + return nil } func (b *Batch) append(p *Batch) { - if p.rLen > 0 { - b.grow(len(p.data) - batchHdrLen) - b.data = append(b.data, p.data[batchHdrLen:]...) - b.rLen += p.rLen - b.bLen += p.bLen - } - if p.sync { - b.sync = true + ob := len(b.data) + oi := len(b.index) + b.data = append(b.data, p.data...) + b.index = append(b.index, p.index...) + b.internalLen += p.internalLen + + // Updating index offset. + if ob != 0 { + for ; oi < len(b.index); oi++ { + index := &b.index[oi] + index.keyPos += ob + if index.valueLen != 0 { + index.valuePos += ob + } + } } } -// size returns sums of key/value pair length plus 8-bytes ikey. -func (b *Batch) size() int { - return b.bLen -} - -func (b *Batch) encode() []byte { - b.grow(0) - binary.LittleEndian.PutUint64(b.data, b.seq) - binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen)) - - return b.data +func (b *Batch) decode(data []byte, expectedLen int) error { + b.data = data + b.index = b.index[:0] + b.internalLen = 0 + err := decodeBatch(data, func(i int, index batchIndex) error { + b.index = append(b.index, index) + b.internalLen += index.keyLen + index.valueLen + 8 + return nil + }) + if err != nil { + return err + } + if expectedLen >= 0 && len(b.index) != expectedLen { + return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) + } + return nil } -func (b *Batch) decode(prevSeq uint64, data []byte) error { - if len(data) < batchHdrLen { - return newErrBatchCorrupted("too short") +func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(b.data)); err != nil { + return err + } } + return nil +} - b.seq = binary.LittleEndian.Uint64(data) - if b.seq < prevSeq { - return newErrBatchCorrupted("invalid sequence number") - } - b.rLen = int(binary.LittleEndian.Uint32(data[8:])) - if b.rLen < 0 { - return newErrBatchCorrupted("invalid records length") +func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { + var ik []byte + for i, index := range b.index { + ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) + if err := mdb.Delete(ik); err != nil { + return err + } } - // No need to be precise at this point, it won't be used anyway - b.bLen = len(data) - batchHdrLen - b.data = data - return nil } -func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error { - off := batchHdrLen - for i := 0; i < b.rLen; i++ { - if off >= len(b.data) { - return newErrBatchCorrupted("invalid records length") - } +func newBatch() interface{} { + return &Batch{} +} - kt := keyType(b.data[off]) - if kt > keyTypeVal { - panic(kt) - return newErrBatchCorrupted("bad record: invalid type") +func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { + var index batchIndex + for i, o := 0, 0; o < len(data); i++ { + // Key type. + index.keyType = keyType(data[o]) + if index.keyType > keyTypeVal { + return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) } - off++ + o++ - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { + // Key. + x, n := binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { return newErrBatchCorrupted("bad record: invalid key length") } - key := b.data[off : off+int(x)] - off += int(x) - var value []byte - if kt == keyTypeVal { - x, n := binary.Uvarint(b.data[off:]) - off += n - if n <= 0 || off+int(x) > len(b.data) { + index.keyPos = o + index.keyLen = int(x) + o += index.keyLen + + // Value. + if index.keyType == keyTypeVal { + x, n = binary.Uvarint(data[o:]) + o += n + if n <= 0 || o+int(x) > len(data) { return newErrBatchCorrupted("bad record: invalid value length") } - value = b.data[off : off+int(x)] - off += int(x) + index.valuePos = o + index.valueLen = int(x) + o += index.valueLen + } else { + index.valuePos = 0 + index.valueLen = 0 } - if err := f(i, kt, key, value); err != nil { + if err := fn(i, index); err != nil { return err } } - return nil } -func (b *Batch) memReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Put(ikScratch, value) +func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { + seq, batchLen, err = decodeBatchHeader(data) + if err != nil { + return 0, 0, err + } + if seq < expectSeq { + return 0, 0, newErrBatchCorrupted("invalid sequence number") + } + data = data[batchHeaderLen:] + var ik []byte + var decodedLen int + err = decodeBatch(data, func(i int, index batchIndex) error { + if i >= batchLen { + return newErrBatchCorrupted("invalid records length") + } + ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) + if err := mdb.Put(ik, index.v(data)); err != nil { + return err + } + decodedLen++ + return nil }) + if err == nil && decodedLen != batchLen { + err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) + } + return } -func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { - if err := b.decode(prevSeq, data); err != nil { - return err +func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { + dst = ensureBuffer(dst, batchHeaderLen) + binary.LittleEndian.PutUint64(dst, seq) + binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) + return dst +} + +func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { + if len(data) < batchHeaderLen { + return 0, 0, newErrBatchCorrupted("too short") + } + + seq = binary.LittleEndian.Uint64(data) + batchLen = int(binary.LittleEndian.Uint32(data[8:])) + if batchLen < 0 { + return 0, 0, newErrBatchCorrupted("invalid records length") } - return b.memReplay(to) + return } -func (b *Batch) revertMemReplay(to *memdb.DB) error { - var ikScratch []byte - return b.decodeRec(func(i int, kt keyType, key, value []byte) error { - ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) - return to.Delete(ikScratch) - }) +func batchesLen(batches []*Batch) int { + batchLen := 0 + for _, batch := range batches { + batchLen += batch.Len() + } + return batchLen +} + +func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { + if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { + return err + } + for _, batch := range batches { + if _, err := wr.Write(batch.data); err != nil { + return err + } + } + return nil } |