diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table')
6 files changed, 1671 insertions, 0 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go new file mode 100644 index 000000000..ca598f4f5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go @@ -0,0 +1,131 @@ +// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package table + +import ( + "encoding/binary" + "fmt" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/comparer" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/testutil" + "github.com/syndtr/goleveldb/leveldb/util" +) + +func (b *block) TestNewIterator(slice *util.Range) iterator.Iterator { + return b.newIterator(slice, false, nil) +} + +var _ = testutil.Defer(func() { + Describe("Block", func() { + Build := func(kv *testutil.KeyValue, restartInterval int) *block { + // Building the block. + bw := &blockWriter{ + restartInterval: restartInterval, + scratch: make([]byte, 30), + } + kv.Iterate(func(i int, key, value []byte) { + bw.append(key, value) + }) + bw.finish() + + // Opening the block. + data := bw.buf.Bytes() + restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) + return &block{ + cmp: comparer.DefaultComparer, + data: data, + restartsLen: restartsLen, + restartsOffset: len(data) - (restartsLen+1)*4, + } + } + + Describe("read test", func() { + for restartInterval := 1; restartInterval <= 5; restartInterval++ { + Describe(fmt.Sprintf("with restart interval of %d", restartInterval), func() { + kv := &testutil.KeyValue{} + Text := func() string { + return fmt.Sprintf("and %d keys", kv.Len()) + } + + Test := func() { + // Make block. + br := Build(kv, restartInterval) + // Do testing. + testutil.KeyValueTesting(nil, br, kv.Clone()) + } + + Describe(Text(), Test) + + kv.PutString("", "empty") + Describe(Text(), Test) + + kv.PutString("a1", "foo") + Describe(Text(), Test) + + kv.PutString("a2", "v") + Describe(Text(), Test) + + kv.PutString("a3qqwrkks", "hello") + Describe(Text(), Test) + + kv.PutString("a4", "bar") + Describe(Text(), Test) + + kv.PutString("a5111111", "v5") + kv.PutString("a6", "") + kv.PutString("a7", "v7") + kv.PutString("a8", "vvvvvvvvvvvvvvvvvvvvvv8") + kv.PutString("b", "v9") + kv.PutString("c9", "v9") + kv.PutString("c91", "v9") + kv.PutString("d0", "v9") + Describe(Text(), Test) + }) + } + }) + + Describe("out-of-bound slice test", func() { + kv := &testutil.KeyValue{} + kv.PutString("k1", "v1") + kv.PutString("k2", "v2") + kv.PutString("k3abcdefgg", "v3") + kv.PutString("k4", "v4") + kv.PutString("k5", "v5") + for restartInterval := 1; restartInterval <= 5; restartInterval++ { + Describe(fmt.Sprintf("with restart interval of %d", restartInterval), func() { + // Make block. + br := Build(kv, restartInterval) + + Test := func(r *util.Range) func(done Done) { + return func(done Done) { + iter := br.newIterator(r, false, nil) + Expect(iter.Error()).ShouldNot(HaveOccurred()) + + t := testutil.IteratorTesting{ + KeyValue: kv.Clone(), + Iter: iter, + } + + testutil.DoIteratorTesting(&t) + done <- true + } + } + + It("Should do iterations and seeks correctly #0", + Test(&util.Range{Start: []byte("k0"), Limit: []byte("k6")}), 2.0) + + It("Should do iterations and seeks correctly #1", + Test(&util.Range{Start: []byte(""), Limit: []byte("zzzzzzz")}), 2.0) + }) + } + }) + }) +}) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go new file mode 100644 index 000000000..8acb9f720 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -0,0 +1,848 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package table + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "sort" + "strings" + + "code.google.com/p/snappy-go/snappy" + + "github.com/syndtr/goleveldb/leveldb/cache" + "github.com/syndtr/goleveldb/leveldb/comparer" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + ErrNotFound = util.ErrNotFound + ErrIterReleased = errors.New("leveldb/table: iterator released") +) + +func max(x, y int) int { + if x > y { + return x + } + return y +} + +type block struct { + cmp comparer.BasicComparer + data []byte + restartsLen int + restartsOffset int + // Whether checksum is verified and valid. + checksum bool +} + +func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) { + n := b.restartsOffset + data := b.data + cmp := b.cmp + + index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { + offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) + offset += 1 // shared always zero, since this is a restart point + v1, n1 := binary.Uvarint(data[offset:]) // key length + _, n2 := binary.Uvarint(data[offset+n1:]) // value length + m := offset + n1 + n2 + return cmp.Compare(data[m:m+int(v1)], key) > 0 + }) + rstart - 1 + if index < rstart { + // The smallest key is greater-than key sought. + index = rstart + } + offset = int(binary.LittleEndian.Uint32(data[n+4*index:])) + return +} + +func (b *block) restartIndex(rstart, rlimit, offset int) int { + n := b.restartsOffset + data := b.data + return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { + return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset + }) + rstart - 1 +} + +func (b *block) restartOffset(index int) int { + return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) +} + +func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) { + if offset >= b.restartsOffset { + if offset != b.restartsOffset { + err = errors.New("leveldb/table: Reader: BlockEntry: invalid block (block entries offset not aligned)") + } + return + } + v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length + v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length + v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length + m := n0 + n1 + n2 + n = m + int(v1) + int(v2) + if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset { + err = errors.New("leveldb/table: Reader: invalid block (block entries corrupted)") + return + } + key = b.data[offset+m : offset+m+int(v1)] + value = b.data[offset+m+int(v1) : offset+n] + nShared = int(v0) + return +} + +func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releaser) *blockIter { + bi := &blockIter{ + block: b, + cache: cache, + // Valid key should never be nil. + key: make([]byte, 0), + dir: dirSOI, + riStart: 0, + riLimit: b.restartsLen, + offsetStart: 0, + offsetRealStart: 0, + offsetLimit: b.restartsOffset, + } + if slice != nil { + if slice.Start != nil { + if bi.Seek(slice.Start) { + bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) + bi.offsetStart = b.restartOffset(bi.riStart) + bi.offsetRealStart = bi.prevOffset + } else { + bi.riStart = b.restartsLen + bi.offsetStart = b.restartsOffset + bi.offsetRealStart = b.restartsOffset + } + } + if slice.Limit != nil { + if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { + bi.offsetLimit = bi.prevOffset + bi.riLimit = bi.restartIndex + 1 + } + } + bi.reset() + if bi.offsetStart > bi.offsetLimit { + bi.sErr(errors.New("leveldb/table: Reader: invalid slice range")) + } + } + return bi +} + +type dir int + +const ( + dirReleased dir = iota - 1 + dirSOI + dirEOI + dirBackward + dirForward +) + +type blockIter struct { + block *block + cache, releaser util.Releaser + key, value []byte + offset int + // Previous offset, only filled by Next. + prevOffset int + prevNode []int + prevKeys []byte + restartIndex int + // Iterator direction. + dir dir + // Restart index slice range. + riStart int + riLimit int + // Offset slice range. + offsetStart int + offsetRealStart int + offsetLimit int + // Error. + err error +} + +func (i *blockIter) sErr(err error) { + i.err = err + i.key = nil + i.value = nil + i.prevNode = nil + i.prevKeys = nil +} + +func (i *blockIter) reset() { + if i.dir == dirBackward { + i.prevNode = i.prevNode[:0] + i.prevKeys = i.prevKeys[:0] + } + i.restartIndex = i.riStart + i.offset = i.offsetStart + i.dir = dirSOI + i.key = i.key[:0] + i.value = nil +} + +func (i *blockIter) isFirst() bool { + switch i.dir { + case dirForward: + return i.prevOffset == i.offsetRealStart + case dirBackward: + return len(i.prevNode) == 1 && i.restartIndex == i.riStart + } + return false +} + +func (i *blockIter) isLast() bool { + switch i.dir { + case dirForward, dirBackward: + return i.offset == i.offsetLimit + } + return false +} + +func (i *blockIter) First() bool { + if i.err != nil { + return false + } else if i.dir == dirReleased { + i.err = ErrIterReleased + return false + } + + if i.dir == dirBackward { + i.prevNode = i.prevNode[:0] + i.prevKeys = i.prevKeys[:0] + } + i.dir = dirSOI + return i.Next() +} + +func (i *blockIter) Last() bool { + if i.err != nil { + return false + } else if i.dir == dirReleased { + i.err = ErrIterReleased + return false + } + + if i.dir == dirBackward { + i.prevNode = i.prevNode[:0] + i.prevKeys = i.prevKeys[:0] + } + i.dir = dirEOI + return i.Prev() +} + +func (i *blockIter) Seek(key []byte) bool { + if i.err != nil { + return false + } else if i.dir == dirReleased { + i.err = ErrIterReleased + return false + } + + ri, offset, err := i.block.seek(i.riStart, i.riLimit, key) + if err != nil { + i.sErr(err) + return false + } + i.restartIndex = ri + i.offset = max(i.offsetStart, offset) + if i.dir == dirSOI || i.dir == dirEOI { + i.dir = dirForward + } + for i.Next() { + if i.block.cmp.Compare(i.key, key) >= 0 { + return true + } + } + return false +} + +func (i *blockIter) Next() bool { + if i.dir == dirEOI || i.err != nil { + return false + } else if i.dir == dirReleased { + i.err = ErrIterReleased + return false + } + + if i.dir == dirSOI { + i.restartIndex = i.riStart + i.offset = i.offsetStart + } else if i.dir == dirBackward { + i.prevNode = i.prevNode[:0] + i.prevKeys = i.prevKeys[:0] + } + for i.offset < i.offsetRealStart { + key, value, nShared, n, err := i.block.entry(i.offset) + if err != nil { + i.sErr(err) + return false + } + if n == 0 { + i.dir = dirEOI + return false + } + i.key = append(i.key[:nShared], key...) + i.value = value + i.offset += n + } + if i.offset >= i.offsetLimit { + i.dir = dirEOI + if i.offset != i.offsetLimit { + i.sErr(errors.New("leveldb/table: Reader: Next: invalid block (block entries offset not aligned)")) + } + return false + } + key, value, nShared, n, err := i.block.entry(i.offset) + if err != nil { + i.sErr(err) + return false + } + if n == 0 { + i.dir = dirEOI + return false + } + i.key = append(i.key[:nShared], key...) + i.value = value + i.prevOffset = i.offset + i.offset += n + i.dir = dirForward + return true +} + +func (i *blockIter) Prev() bool { + if i.dir == dirSOI || i.err != nil { + return false + } else if i.dir == dirReleased { + i.err = ErrIterReleased + return false + } + + var ri int + if i.dir == dirForward { + // Change direction. + i.offset = i.prevOffset + if i.offset == i.offsetRealStart { + i.dir = dirSOI + return false + } + ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset) + i.dir = dirBackward + } else if i.dir == dirEOI { + // At the end of iterator. + i.restartIndex = i.riLimit + i.offset = i.offsetLimit + if i.offset == i.offsetRealStart { + i.dir = dirSOI + return false + } + ri = i.riLimit - 1 + i.dir = dirBackward + } else if len(i.prevNode) == 1 { + // This is the end of a restart range. + i.offset = i.prevNode[0] + i.prevNode = i.prevNode[:0] + if i.restartIndex == i.riStart { + i.dir = dirSOI + return false + } + i.restartIndex-- + ri = i.restartIndex + } else { + // In the middle of restart range, get from cache. + n := len(i.prevNode) - 3 + node := i.prevNode[n:] + i.prevNode = i.prevNode[:n] + // Get the key. + ko := node[0] + i.key = append(i.key[:0], i.prevKeys[ko:]...) + i.prevKeys = i.prevKeys[:ko] + // Get the value. + vo := node[1] + vl := vo + node[2] + i.value = i.block.data[vo:vl] + i.offset = vl + return true + } + // Build entries cache. + i.key = i.key[:0] + i.value = nil + offset := i.block.restartOffset(ri) + if offset == i.offset { + ri -= 1 + if ri < 0 { + i.dir = dirSOI + return false + } + offset = i.block.restartOffset(ri) + } + i.prevNode = append(i.prevNode, offset) + for { + key, value, nShared, n, err := i.block.entry(offset) + if err != nil { + i.sErr(err) + return false + } + if offset >= i.offsetRealStart { + if i.value != nil { + // Appends 3 variables: + // 1. Previous keys offset + // 2. Value offset in the data block + // 3. Value length + i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value)) + i.prevKeys = append(i.prevKeys, i.key...) + } + i.value = value + } + i.key = append(i.key[:nShared], key...) + offset += n + // Stop if target offset reached. + if offset >= i.offset { + if offset != i.offset { + i.sErr(errors.New("leveldb/table: Reader: Prev: invalid block (block entries offset not aligned)")) + return false + } + + break + } + } + i.restartIndex = ri + i.offset = offset + return true +} + +func (i *blockIter) Key() []byte { + if i.err != nil || i.dir <= dirEOI { + return nil + } + return i.key +} + +func (i *blockIter) Value() []byte { + if i.err != nil || i.dir <= dirEOI { + return nil + } + return i.value +} + +func (i *blockIter) Release() { + i.prevNode = nil + i.prevKeys = nil + i.key = nil + i.value = nil + i.dir = dirReleased + if i.cache != nil { + i.cache.Release() + i.cache = nil + } + if i.releaser != nil { + i.releaser.Release() + i.releaser = nil + } +} + +func (i *blockIter) SetReleaser(releaser util.Releaser) { + if i.dir > dirReleased { + i.releaser = releaser + } +} + +func (i *blockIter) Valid() bool { + return i.err == nil && (i.dir == dirBackward || i.dir == dirForward) +} + +func (i *blockIter) Error() error { + return i.err +} + +type filterBlock struct { + filter filter.Filter + data []byte + oOffset int + baseLg uint + filtersNum int +} + +func (b *filterBlock) contains(offset uint64, key []byte) bool { + i := int(offset >> b.baseLg) + if i < b.filtersNum { + o := b.data[b.oOffset+i*4:] + n := int(binary.LittleEndian.Uint32(o)) + m := int(binary.LittleEndian.Uint32(o[4:])) + if n < m && m <= b.oOffset { + return b.filter.Contains(b.data[n:m], key) + } else if n == m { + return false + } + } + return true +} + +type indexIter struct { + blockIter + tableReader *Reader + slice *util.Range + // Options + checksum bool + fillCache bool +} + +func (i *indexIter) Get() iterator.Iterator { + value := i.Value() + if value == nil { + return nil + } + dataBH, n := decodeBlockHandle(value) + if n == 0 { + return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid table (bad data block handle)")) + } + var slice *util.Range + if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { + slice = i.slice + } + return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache) +} + +// Reader is a table reader. +type Reader struct { + reader io.ReaderAt + cache cache.Namespace + err error + // Options + cmp comparer.Comparer + filter filter.Filter + checksum bool + strictIter bool + + dataEnd int64 + indexBlock *block + filterBlock *filterBlock +} + +func verifyChecksum(data []byte) bool { + n := len(data) - 4 + checksum0 := binary.LittleEndian.Uint32(data[n:]) + checksum1 := util.NewCRC(data[:n]).Value() + return checksum0 == checksum1 +} + +func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { + data := make([]byte, bh.length+blockTrailerLen) + if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { + return nil, err + } + if checksum || r.checksum { + if !verifyChecksum(data) { + return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)") + } + } + switch data[bh.length] { + case blockTypeNoCompression: + data = data[:bh.length] + case blockTypeSnappyCompression: + var err error + data, err = snappy.Decode(nil, data[:bh.length]) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length]) + } + return data, nil +} + +func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) { + data, err := r.readRawBlock(bh, checksum) + if err != nil { + return nil, err + } + restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) + b := &block{ + cmp: r.cmp, + data: data, + restartsLen: restartsLen, + restartsOffset: len(data) - (restartsLen+1)*4, + checksum: checksum || r.checksum, + } + return b, nil +} + +func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) { + data, err := r.readRawBlock(bh, true) + if err != nil { + return nil, err + } + n := len(data) + if n < 5 { + return nil, errors.New("leveldb/table: Reader: invalid filter block (too short)") + } + m := n - 5 + oOffset := int(binary.LittleEndian.Uint32(data[m:])) + if oOffset > m { + return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)") + } + b := &filterBlock{ + filter: filter, + data: data, + oOffset: oOffset, + baseLg: uint(data[n-1]), + filtersNum: (m - oOffset) / 4, + } + return b, nil +} + +func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { + if r.cache != nil { + // Get/set block cache. + var err error + cache, ok := r.cache.Get(dataBH.offset, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { + if !fillCache { + return + } + var dataBlock *block + dataBlock, err = r.readBlock(dataBH, checksum) + if err == nil { + ok = true + value = dataBlock + charge = int(dataBH.length) + } + return + }) + if err != nil { + return iterator.NewEmptyIterator(err) + } + if ok { + dataBlock := cache.Value().(*block) + if !dataBlock.checksum && (r.checksum || checksum) { + if !verifyChecksum(dataBlock.data) { + return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")) + } + dataBlock.checksum = true + } + iter := dataBlock.newIterator(slice, false, cache) + return iter + } + } + dataBlock, err := r.readBlock(dataBH, checksum) + if err != nil { + return iterator.NewEmptyIterator(err) + } + iter := dataBlock.newIterator(slice, false, nil) + return iter +} + +// NewIterator creates an iterator from the table. +// +// Slice allows slicing the iterator to only contains keys in the given +// range. A nil Range.Start is treated as a key before all keys in the +// table. And a nil Range.Limit is treated as a key after all keys in +// the table. +// +// The returned iterator is not goroutine-safe and should be released +// when not used. +// +// Also read Iterator documentation of the leveldb/iterator package. + +func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + if r.err != nil { + return iterator.NewEmptyIterator(r.err) + } + + index := &indexIter{ + blockIter: *r.indexBlock.newIterator(slice, true, nil), + tableReader: r, + slice: slice, + checksum: ro.GetStrict(opt.StrictBlockChecksum), + fillCache: !ro.GetDontFillCache(), + } + return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false) +} + +// Find finds key/value pair whose key is greater than or equal to the +// given key. It returns ErrNotFound if the table doesn't contain +// such pair. +// +// The caller should not modify the contents of the returned slice, but +// it is safe to modify the contents of the argument after Find returns. +func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err error) { + if r.err != nil { + err = r.err + return + } + + index := r.indexBlock.newIterator(nil, true, nil) + defer index.Release() + if !index.Seek(key) { + err = index.Error() + if err == nil { + err = ErrNotFound + } + return + } + dataBH, n := decodeBlockHandle(index.Value()) + if n == 0 { + err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)") + return + } + if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) { + err = ErrNotFound + return + } + data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache()) + defer data.Release() + if !data.Seek(key) { + err = data.Error() + if err == nil { + err = ErrNotFound + } + return + } + rkey = data.Key() + value = data.Value() + return +} + +// Get gets the value for the given key. It returns errors.ErrNotFound +// if the table does not contain the key. +// +// The caller should not modify the contents of the returned slice, but +// it is safe to modify the contents of the argument after Get returns. +func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + if r.err != nil { + err = r.err + return + } + + rkey, value, err := r.Find(key, ro) + if err == nil && r.cmp.Compare(rkey, key) != 0 { + value = nil + err = ErrNotFound + } + return +} + +// OffsetOf returns approximate offset for the given key. +// +// It is safe to modify the contents of the argument after Get returns. +func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { + if r.err != nil { + err = r.err + return + } + + index := r.indexBlock.newIterator(nil, true, nil) + defer index.Release() + if index.Seek(key) { + dataBH, n := decodeBlockHandle(index.Value()) + if n == 0 { + err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)") + return + } + offset = int64(dataBH.offset) + return + } + err = index.Error() + if err == nil { + offset = r.dataEnd + } + return +} + +// NewReader creates a new initialized table reader for the file. +// The cache is optional and can be nil. +func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { + r := &Reader{ + reader: f, + cache: cache, + cmp: o.GetComparer(), + checksum: o.GetStrict(opt.StrictBlockChecksum), + strictIter: o.GetStrict(opt.StrictIterator), + } + if f == nil { + r.err = errors.New("leveldb/table: Reader: nil file") + return r + } + if size < footerLen { + r.err = errors.New("leveldb/table: Reader: invalid table (file size is too small)") + return r + } + var footer [footerLen]byte + if _, err := r.reader.ReadAt(footer[:], size-footerLen); err != nil && err != io.EOF { + r.err = fmt.Errorf("leveldb/table: Reader: invalid table (could not read footer): %v", err) + } + if string(footer[footerLen-len(magic):footerLen]) != magic { + r.err = errors.New("leveldb/table: Reader: invalid table (bad magic number)") + return r + } + // Decode the metaindex block handle. + metaBH, n := decodeBlockHandle(footer[:]) + if n == 0 { + r.err = errors.New("leveldb/table: Reader: invalid table (bad metaindex block handle)") + return r + } + // Decode the index block handle. + indexBH, n := decodeBlockHandle(footer[n:]) + if n == 0 { + r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)") + return r + } + // Read index block. + r.indexBlock, r.err = r.readBlock(indexBH, true) + if r.err != nil { + return r + } + // Read metaindex block. + metaBlock, err := r.readBlock(metaBH, true) + if err != nil { + r.err = err + return r + } + // Set data end. + r.dataEnd = int64(metaBH.offset) + metaIter := metaBlock.newIterator(nil, false, nil) + for metaIter.Next() { + key := string(metaIter.Key()) + if !strings.HasPrefix(key, "filter.") { + continue + } + fn := key[7:] + var filter filter.Filter + if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn { + filter = f0 + } else { + for _, f0 := range o.GetAltFilters() { + if f0.Name() == fn { + filter = f0 + break + } + } + } + if filter != nil { + filterBH, n := decodeBlockHandle(metaIter.Value()) + if n == 0 { + continue + } + // Update data end. + r.dataEnd = int64(filterBH.offset) + filterBlock, err := r.readFilterBlock(filterBH, filter) + if err != nil { + continue + } + r.filterBlock = filterBlock + break + } + } + metaIter.Release() + return r +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table.go new file mode 100644 index 000000000..c0ac70d9e --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table.go @@ -0,0 +1,177 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Package table allows read and write sorted key/value. +package table + +import ( + "encoding/binary" +) + +/* +Table: + +Table is consist of one or more data blocks, an optional filter block +a metaindex block, an index block and a table footer. Metaindex block +is a special block used to keep parameters of the table, such as filter +block name and its block handle. Index block is a special block used to +keep record of data blocks offset and length, index block use one as +restart interval. The key used by index block are the last key of preceding +block, shorter separator of adjacent blocks or shorter successor of the +last key of the last block. Filter block is an optional block contains +sequence of filter data generated by a filter generator. + +Table data structure: + + optional + / + +--------------+--------------+--------------+------+-------+-----------------+-------------+--------+ + | data block 1 | ... | data block n | filter block | metaindex block | index block | footer | + +--------------+--------------+--------------+--------------+-----------------+-------------+--------+ + + Each block followed by a 5-bytes trailer contains compression type and checksum. + +Table block trailer: + + +---------------------------+-------------------+ + | compression type (1-byte) | checksum (4-byte) | + +---------------------------+-------------------+ + + The checksum is a CRC-32 computed using Castagnoli's polynomial. Compression + type also included in the checksum. + +Table footer: + + +------------------- 40-bytes -------------------+ + / \ + +------------------------+--------------------+------+-----------------+ + | metaindex block handle / index block handle / ---- | magic (8-bytes) | + +------------------------+--------------------+------+-----------------+ + + The magic are first 64-bit of SHA-1 sum of "http://code.google.com/p/leveldb/". + +NOTE: All fixed-length integer are little-endian. +*/ + +/* +Block: + +Block is consist of one or more key/value entries and a block trailer. +Block entry shares key prefix with its preceding key until a restart +point reached. A block should contains at least one restart point. +First restart point are always zero. + +Block data structure: + + + restart point + restart point (depends on restart interval) + / / + +---------------+---------------+---------------+---------------+---------+ + | block entry 1 | block entry 2 | ... | block entry n | trailer | + +---------------+---------------+---------------+---------------+---------+ + +Key/value entry: + + +---- key len ----+ + / \ + +-------+---------+-----------+---------+--------------------+--------------+----------------+ + | shared (varint) | not shared (varint) | value len (varint) | key (varlen) | value (varlen) | + +-----------------+---------------------+--------------------+--------------+----------------+ + + Block entry shares key prefix with its preceding key: + Conditions: + restart_interval=2 + entry one : key=deck,value=v1 + entry two : key=dock,value=v2 + entry three: key=duck,value=v3 + The entries will be encoded as follow: + + + restart point (offset=0) + restart point (offset=16) + / / + +-----+-----+-----+----------+--------+-----+-----+-----+---------+--------+-----+-----+-----+----------+--------+ + | 0 | 4 | 2 | "deck" | "v1" | 1 | 3 | 2 | "ock" | "v2" | 0 | 4 | 2 | "duck" | "v3" | + +-----+-----+-----+----------+--------+-----+-----+-----+---------+--------+-----+-----+-----+----------+--------+ + \ / \ / \ / + +----------- entry one -----------+ +----------- entry two ----------+ +---------- entry three ----------+ + + The block trailer will contains two restart points: + + +------------+-----------+--------+ + | 0 | 16 | 2 | + +------------+-----------+---+----+ + \ / \ + +-- restart points --+ + restart points length + +Block trailer: + + +-- 4-bytes --+ + / \ + +-----------------+-----------------+-----------------+------------------------------+ + | restart point 1 | .... | restart point n | restart points len (4-bytes) | + +-----------------+-----------------+-----------------+------------------------------+ + + +NOTE: All fixed-length integer are little-endian. +*/ + +/* +Filter block: + +Filter block consist of one or more filter data and a filter block trailer. +The trailer contains filter data offsets, a trailer offset and a 1-byte base Lg. + +Filter block data structure: + + + offset 1 + offset 2 + offset n + trailer offset + / / / / + +---------------+---------------+---------------+---------+ + | filter data 1 | ... | filter data n | trailer | + +---------------+---------------+---------------+---------+ + +Filter block trailer: + + +- 4-bytes -+ + / \ + +---------------+---------------+---------------+-------------------------+------------------+ + | offset 1 | .... | offset n | filter offset (4-bytes) | base Lg (1-byte) | + +-------------- +---------------+---------------+-------------------------+------------------+ + + +NOTE: All fixed-length integer are little-endian. +*/ + +const ( + blockTrailerLen = 5 + footerLen = 48 + + magic = "\x57\xfb\x80\x8b\x24\x75\x47\xdb" + + // The block type gives the per-block compression format. + // These constants are part of the file format and should not be changed. + blockTypeNoCompression = 0 + blockTypeSnappyCompression = 1 + + // Generate new filter every 2KB of data + filterBaseLg = 11 + filterBase = 1 << filterBaseLg +) + +type blockHandle struct { + offset, length uint64 +} + +func decodeBlockHandle(src []byte) (blockHandle, int) { + offset, n := binary.Uvarint(src) + length, m := binary.Uvarint(src[n:]) + if n == 0 || m == 0 { + return blockHandle{}, 0 + } + return blockHandle{offset, length}, n + m +} + +func encodeBlockHandle(dst []byte, b blockHandle) int { + n := binary.PutUvarint(dst, b.offset) + m := binary.PutUvarint(dst[n:], b.length) + return n + m +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_suite_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_suite_test.go new file mode 100644 index 000000000..bc9eb83cc --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_suite_test.go @@ -0,0 +1,17 @@ +package table + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/testutil" +) + +func TestTable(t *testing.T) { + testutil.RunDefer() + + RegisterFailHandler(Fail) + RunSpecs(t, "Table Suite") +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go new file mode 100644 index 000000000..d7d3b2a4b --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package table + +import ( + "bytes" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/testutil" + "github.com/syndtr/goleveldb/leveldb/util" +) + +type tableWrapper struct { + *Reader +} + +func (t tableWrapper) TestFind(key []byte) (rkey, rvalue []byte, err error) { + return t.Reader.Find(key, nil) +} + +func (t tableWrapper) TestGet(key []byte) (value []byte, err error) { + return t.Reader.Get(key, nil) +} + +func (t tableWrapper) TestNewIterator(slice *util.Range) iterator.Iterator { + return t.Reader.NewIterator(slice, nil) +} + +var _ = testutil.Defer(func() { + Describe("Table", func() { + Describe("approximate offset test", func() { + var ( + buf = &bytes.Buffer{} + o = &opt.Options{ + BlockSize: 1024, + Compression: opt.NoCompression, + } + ) + + // Building the table. + tw := NewWriter(buf, o) + tw.Append([]byte("k01"), []byte("hello")) + tw.Append([]byte("k02"), []byte("hello2")) + tw.Append([]byte("k03"), bytes.Repeat([]byte{'x'}, 10000)) + tw.Append([]byte("k04"), bytes.Repeat([]byte{'x'}, 200000)) + tw.Append([]byte("k05"), bytes.Repeat([]byte{'x'}, 300000)) + tw.Append([]byte("k06"), []byte("hello3")) + tw.Append([]byte("k07"), bytes.Repeat([]byte{'x'}, 100000)) + err := tw.Close() + + It("Should be able to approximate offset of a key correctly", func() { + Expect(err).ShouldNot(HaveOccurred()) + + tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) + CheckOffset := func(key string, expect, threshold int) { + offset, err := tr.OffsetOf([]byte(key)) + Expect(err).ShouldNot(HaveOccurred()) + Expect(offset).Should(BeNumerically("~", expect, threshold), "Offset of key %q", key) + } + + CheckOffset("k0", 0, 0) + CheckOffset("k01a", 0, 0) + CheckOffset("k02", 0, 0) + CheckOffset("k03", 0, 0) + CheckOffset("k04", 10000, 1000) + CheckOffset("k04a", 210000, 1000) + CheckOffset("k05", 210000, 1000) + CheckOffset("k06", 510000, 1000) + CheckOffset("k07", 510000, 1000) + CheckOffset("xyz", 610000, 2000) + }) + }) + + Describe("read test", func() { + Build := func(kv testutil.KeyValue) testutil.DB { + o := &opt.Options{ + BlockSize: 512, + BlockRestartInterval: 3, + } + buf := &bytes.Buffer{} + + // Building the table. + tw := NewWriter(buf, o) + kv.Iterate(func(i int, key, value []byte) { + tw.Append(key, value) + }) + tw.Close() + + // Opening the table. + tr := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, o) + return tableWrapper{tr} + } + Test := func(kv *testutil.KeyValue, body func(r *Reader)) func() { + return func() { + db := Build(*kv) + if body != nil { + body(db.(tableWrapper).Reader) + } + testutil.KeyValueTesting(nil, db, *kv) + } + } + + testutil.AllKeyValueTesting(nil, Build) + Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) { + It("should have correct blocks number", func() { + Expect(r.indexBlock.restartsLen).Should(Equal(9)) + }) + })) + }) + }) +}) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go new file mode 100644 index 000000000..4e19e93a9 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -0,0 +1,379 @@ +// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package table + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + + "code.google.com/p/snappy-go/snappy" + + "github.com/syndtr/goleveldb/leveldb/comparer" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +func sharedPrefixLen(a, b []byte) int { + i, n := 0, len(a) + if n > len(b) { + n = len(b) + } + for i < n && a[i] == b[i] { + i++ + } + return i +} + +type blockWriter struct { + restartInterval int + buf util.Buffer + nEntries int + prevKey []byte + restarts []uint32 + scratch []byte +} + +func (w *blockWriter) append(key, value []byte) { + nShared := 0 + if w.nEntries%w.restartInterval == 0 { + w.restarts = append(w.restarts, uint32(w.buf.Len())) + } else { + nShared = sharedPrefixLen(w.prevKey, key) + } + n := binary.PutUvarint(w.scratch[0:], uint64(nShared)) + n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared)) + n += binary.PutUvarint(w.scratch[n:], uint64(len(value))) + w.buf.Write(w.scratch[:n]) + w.buf.Write(key[nShared:]) + w.buf.Write(value) + w.prevKey = append(w.prevKey[:0], key...) + w.nEntries++ +} + +func (w *blockWriter) finish() { + // Write restarts entry. + if w.nEntries == 0 { + // Must have at least one restart entry. + w.restarts = append(w.restarts, 0) + } + w.restarts = append(w.restarts, uint32(len(w.restarts))) + for _, x := range w.restarts { + buf4 := w.buf.Alloc(4) + binary.LittleEndian.PutUint32(buf4, x) + } +} + +func (w *blockWriter) reset() { + w.buf.Reset() + w.nEntries = 0 + w.restarts = w.restarts[:0] +} + +func (w *blockWriter) bytesLen() int { + restartsLen := len(w.restarts) + if restartsLen == 0 { + restartsLen = 1 + } + return w.buf.Len() + 4*restartsLen + 4 +} + +type filterWriter struct { + generator filter.FilterGenerator + buf util.Buffer + nKeys int + offsets []uint32 +} + +func (w *filterWriter) add(key []byte) { + if w.generator == nil { + return + } + w.generator.Add(key) + w.nKeys++ +} + +func (w *filterWriter) flush(offset uint64) { + if w.generator == nil { + return + } + for x := int(offset / filterBase); x > len(w.offsets); { + w.generate() + } +} + +func (w *filterWriter) finish() { + if w.generator == nil { + return + } + // Generate last keys. + + if w.nKeys > 0 { + w.generate() + } + w.offsets = append(w.offsets, uint32(w.buf.Len())) + for _, x := range w.offsets { + buf4 := w.buf.Alloc(4) + binary.LittleEndian.PutUint32(buf4, x) + } + w.buf.WriteByte(filterBaseLg) +} + +func (w *filterWriter) generate() { + // Record offset. + w.offsets = append(w.offsets, uint32(w.buf.Len())) + // Generate filters. + if w.nKeys > 0 { + w.generator.Generate(&w.buf) + w.nKeys = 0 + } +} + +// Writer is a table writer. +type Writer struct { + writer io.Writer + err error + // Options + cmp comparer.Comparer + filter filter.Filter + compression opt.Compression + blockSize int + + dataBlock blockWriter + indexBlock blockWriter + filterBlock filterWriter + pendingBH blockHandle + offset uint64 + nEntries int + // Scratch allocated enough for 5 uvarint. Block writer should not use + // first 20-bytes since it will be used to encode block handle, which + // then passed to the block writer itself. + scratch [50]byte + comparerScratch []byte + compressionScratch []byte +} + +func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) { + // Compress the buffer if necessary. + var b []byte + if compression == opt.SnappyCompression { + // Allocate scratch enough for compression and block trailer. + if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n { + w.compressionScratch = make([]byte, n) + } + var compressed []byte + compressed, err = snappy.Encode(w.compressionScratch, buf.Bytes()) + if err != nil { + return + } + n := len(compressed) + b = compressed[:n+blockTrailerLen] + b[n] = blockTypeSnappyCompression + } else { + tmp := buf.Alloc(blockTrailerLen) + tmp[0] = blockTypeNoCompression + b = buf.Bytes() + } + + // Calculate the checksum. + n := len(b) - 4 + checksum := util.NewCRC(b[:n]).Value() + binary.LittleEndian.PutUint32(b[n:], checksum) + + // Write the buffer to the file. + _, err = w.writer.Write(b) + if err != nil { + return + } + bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)} + w.offset += uint64(len(b)) + return +} + +func (w *Writer) flushPendingBH(key []byte) { + if w.pendingBH.length == 0 { + return + } + var separator []byte + if len(key) == 0 { + separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey) + } else { + separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key) + } + if separator == nil { + separator = w.dataBlock.prevKey + } else { + w.comparerScratch = separator + } + n := encodeBlockHandle(w.scratch[:20], w.pendingBH) + // Append the block handle to the index block. + w.indexBlock.append(separator, w.scratch[:n]) + // Reset prev key of the data block. + w.dataBlock.prevKey = w.dataBlock.prevKey[:0] + // Clear pending block handle. + w.pendingBH = blockHandle{} +} + +func (w *Writer) finishBlock() error { + w.dataBlock.finish() + bh, err := w.writeBlock(&w.dataBlock.buf, w.compression) + if err != nil { + return err + } + w.pendingBH = bh + // Reset the data block. + w.dataBlock.reset() + // Flush the filter block. + w.filterBlock.flush(w.offset) + return nil +} + +// Append appends key/value pair to the table. The keys passed must +// be in increasing order. +// +// It is safe to modify the contents of the arguments after Append returns. +func (w *Writer) Append(key, value []byte) error { + if w.err != nil { + return w.err + } + if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 { + w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key) + return w.err + } + + w.flushPendingBH(key) + // Append key/value pair to the data block. + w.dataBlock.append(key, value) + // Add key to the filter block. + w.filterBlock.add(key) + + // Finish the data block if block size target reached. + if w.dataBlock.bytesLen() >= w.blockSize { + if err := w.finishBlock(); err != nil { + w.err = err + return w.err + } + } + w.nEntries++ + return nil +} + +// BlocksLen returns number of blocks written so far. +func (w *Writer) BlocksLen() int { + n := w.indexBlock.nEntries + if w.pendingBH.length > 0 { + // Includes the pending block. + n++ + } + return n +} + +// EntriesLen returns number of entries added so far. +func (w *Writer) EntriesLen() int { + return w.nEntries +} + +// BytesLen returns number of bytes written so far. +func (w *Writer) BytesLen() int { + return int(w.offset) +} + +// Close will finalize the table. Calling Append is not possible +// after Close, but calling BlocksLen, EntriesLen and BytesLen +// is still possible. +func (w *Writer) Close() error { + if w.err != nil { + return w.err + } + + // Write the last data block. Or empty data block if there + // aren't any data blocks at all. + if w.dataBlock.nEntries > 0 || w.nEntries == 0 { + if err := w.finishBlock(); err != nil { + w.err = err + return w.err + } + } + w.flushPendingBH(nil) + + // Write the filter block. + var filterBH blockHandle + w.filterBlock.finish() + if buf := &w.filterBlock.buf; buf.Len() > 0 { + filterBH, w.err = w.writeBlock(buf, opt.NoCompression) + if w.err != nil { + return w.err + } + } + + // Write the metaindex block. + if filterBH.length > 0 { + key := []byte("filter." + w.filter.Name()) + n := encodeBlockHandle(w.scratch[:20], filterBH) + w.dataBlock.append(key, w.scratch[:n]) + } + w.dataBlock.finish() + metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression) + if err != nil { + w.err = err + return w.err + } + + // Write the index block. + w.indexBlock.finish() + indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression) + if err != nil { + w.err = err + return w.err + } + + // Write the table footer. + footer := w.scratch[:footerLen] + for i := range footer { + footer[i] = 0 + } + n := encodeBlockHandle(footer, metaindexBH) + encodeBlockHandle(footer[n:], indexBH) + copy(footer[footerLen-len(magic):], magic) + if _, err := w.writer.Write(footer); err != nil { + w.err = err + return w.err + } + w.offset += footerLen + + w.err = errors.New("leveldb/table: writer is closed") + return nil +} + +// NewWriter creates a new initialized table writer for the file. +// +// Table writer is not goroutine-safe. +func NewWriter(f io.Writer, o *opt.Options) *Writer { + w := &Writer{ + writer: f, + cmp: o.GetComparer(), + filter: o.GetFilter(), + compression: o.GetCompression(), + blockSize: o.GetBlockSize(), + comparerScratch: make([]byte, 0), + } + // data block + w.dataBlock.restartInterval = o.GetBlockRestartInterval() + // The first 20-bytes are used for encoding block handle. + w.dataBlock.scratch = w.scratch[20:] + // index block + w.indexBlock.restartInterval = 1 + w.indexBlock.scratch = w.scratch[20:] + // filter block + if w.filter != nil { + w.filterBlock.generator = w.filter.NewGenerator() + w.filterBlock.flush(0) + } + return w +} |