diff options
Diffstat (limited to 'Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go')
-rw-r--r-- | Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go | 701 |
1 files changed, 480 insertions, 221 deletions
diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 8acb9f720..6f38e84b3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -8,27 +8,41 @@ package table import ( "encoding/binary" - "errors" "fmt" "io" "sort" "strings" + "sync" - "code.google.com/p/snappy-go/snappy" + "github.com/syndtr/gosnappy/snappy" "github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/comparer" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/util" ) var ( - ErrNotFound = util.ErrNotFound - ErrIterReleased = errors.New("leveldb/table: iterator released") + ErrNotFound = errors.ErrNotFound + ErrReaderReleased = errors.New("leveldb/table: reader released") + ErrIterReleased = errors.New("leveldb/table: iterator released") ) +type ErrCorrupted struct { + Pos int64 + Size int64 + Kind string + Reason string +} + +func (e *ErrCorrupted) Error() string { + return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason) +} + func max(x, y int) int { if x > y { return x @@ -37,40 +51,33 @@ func max(x, y int) int { } type block struct { - cmp comparer.BasicComparer + bpool *util.BufferPool + bh blockHandle data []byte restartsLen int restartsOffset int - // Whether checksum is verified and valid. - checksum bool } -func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) { - n := b.restartsOffset - data := b.data - cmp := b.cmp - +func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { - offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) - offset += 1 // shared always zero, since this is a restart point - v1, n1 := binary.Uvarint(data[offset:]) // key length - _, n2 := binary.Uvarint(data[offset+n1:]) // value length + offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) + offset += 1 // shared always zero, since this is a restart point + v1, n1 := binary.Uvarint(b.data[offset:]) // key length + _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 - return cmp.Compare(data[m:m+int(v1)], key) > 0 + return cmp.Compare(b.data[m:m+int(v1)], key) > 0 }) + rstart - 1 if index < rstart { // The smallest key is greater-than key sought. index = rstart } - offset = int(binary.LittleEndian.Uint32(data[n+4*index:])) + offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) return } func (b *block) restartIndex(rstart, rlimit, offset int) int { - n := b.restartsOffset - data := b.data return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { - return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset + return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset }) + rstart - 1 } @@ -81,7 +88,7 @@ func (b *block) restartOffset(index int) int { func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) { if offset >= b.restartsOffset { if offset != b.restartsOffset { - err = errors.New("leveldb/table: Reader: BlockEntry: invalid block (block entries offset not aligned)") + err = &ErrCorrupted{Reason: "entries offset not aligned"} } return } @@ -91,7 +98,7 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) m := n0 + n1 + n2 n = m + int(v1) + int(v2) if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset { - err = errors.New("leveldb/table: Reader: invalid block (block entries corrupted)") + err = &ErrCorrupted{Reason: "entries corrupted"} return } key = b.data[offset+m : offset+m+int(v1)] @@ -100,43 +107,10 @@ func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) return } -func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releaser) *blockIter { - bi := &blockIter{ - block: b, - cache: cache, - // Valid key should never be nil. - key: make([]byte, 0), - dir: dirSOI, - riStart: 0, - riLimit: b.restartsLen, - offsetStart: 0, - offsetRealStart: 0, - offsetLimit: b.restartsOffset, - } - if slice != nil { - if slice.Start != nil { - if bi.Seek(slice.Start) { - bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) - bi.offsetStart = b.restartOffset(bi.riStart) - bi.offsetRealStart = bi.prevOffset - } else { - bi.riStart = b.restartsLen - bi.offsetStart = b.restartsOffset - bi.offsetRealStart = b.restartsOffset - } - } - if slice.Limit != nil { - if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { - bi.offsetLimit = bi.prevOffset - bi.riLimit = bi.restartIndex + 1 - } - } - bi.reset() - if bi.offsetStart > bi.offsetLimit { - bi.sErr(errors.New("leveldb/table: Reader: invalid slice range")) - } - } - return bi +func (b *block) Release() { + b.bpool.Put(b.data) + b.bpool = nil + b.data = nil } type dir int @@ -150,10 +124,12 @@ const ( ) type blockIter struct { - block *block - cache, releaser util.Releaser - key, value []byte - offset int + tr *Reader + block *block + blockReleaser util.Releaser + releaser util.Releaser + key, value []byte + offset int // Previous offset, only filled by Next. prevOffset int prevNode []int @@ -250,7 +226,7 @@ func (i *blockIter) Seek(key []byte) bool { return false } - ri, offset, err := i.block.seek(i.riStart, i.riLimit, key) + ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) if err != nil { i.sErr(err) return false @@ -261,7 +237,7 @@ func (i *blockIter) Seek(key []byte) bool { i.dir = dirForward } for i.Next() { - if i.block.cmp.Compare(i.key, key) >= 0 { + if i.tr.cmp.Compare(i.key, key) >= 0 { return true } } @@ -286,7 +262,7 @@ func (i *blockIter) Next() bool { for i.offset < i.offsetRealStart { key, value, nShared, n, err := i.block.entry(i.offset) if err != nil { - i.sErr(err) + i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) return false } if n == 0 { @@ -300,13 +276,13 @@ func (i *blockIter) Next() bool { if i.offset >= i.offsetLimit { i.dir = dirEOI if i.offset != i.offsetLimit { - i.sErr(errors.New("leveldb/table: Reader: Next: invalid block (block entries offset not aligned)")) + i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) } return false } key, value, nShared, n, err := i.block.entry(i.offset) if err != nil { - i.sErr(err) + i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) return false } if n == 0 { @@ -391,7 +367,7 @@ func (i *blockIter) Prev() bool { for { key, value, nShared, n, err := i.block.entry(offset) if err != nil { - i.sErr(err) + i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) return false } if offset >= i.offsetRealStart { @@ -410,7 +386,7 @@ func (i *blockIter) Prev() bool { // Stop if target offset reached. if offset >= i.offset { if offset != i.offset { - i.sErr(errors.New("leveldb/table: Reader: Prev: invalid block (block entries offset not aligned)")) + i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned")) return false } @@ -437,25 +413,33 @@ func (i *blockIter) Value() []byte { } func (i *blockIter) Release() { - i.prevNode = nil - i.prevKeys = nil - i.key = nil - i.value = nil - i.dir = dirReleased - if i.cache != nil { - i.cache.Release() - i.cache = nil - } - if i.releaser != nil { - i.releaser.Release() - i.releaser = nil + if i.dir != dirReleased { + i.tr = nil + i.block = nil + i.prevNode = nil + i.prevKeys = nil + i.key = nil + i.value = nil + i.dir = dirReleased + if i.blockReleaser != nil { + i.blockReleaser.Release() + i.blockReleaser = nil + } + if i.releaser != nil { + i.releaser.Release() + i.releaser = nil + } } } func (i *blockIter) SetReleaser(releaser util.Releaser) { - if i.dir > dirReleased { - i.releaser = releaser + if i.dir == dirReleased { + panic(util.ErrReleased) } + if i.releaser != nil && releaser != nil { + panic(util.ErrHasReleaser) + } + i.releaser = releaser } func (i *blockIter) Valid() bool { @@ -467,21 +451,21 @@ func (i *blockIter) Error() error { } type filterBlock struct { - filter filter.Filter + bpool *util.BufferPool data []byte oOffset int baseLg uint filtersNum int } -func (b *filterBlock) contains(offset uint64, key []byte) bool { +func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool { i := int(offset >> b.baseLg) if i < b.filtersNum { o := b.data[b.oOffset+i*4:] n := int(binary.LittleEndian.Uint32(o)) m := int(binary.LittleEndian.Uint32(o[4:])) if n < m && m <= b.oOffset { - return b.filter.Contains(b.data[n:m], key) + return filter.Contains(b.data[n:m], key) } else if n == m { return false } @@ -489,12 +473,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool { return true } +func (b *filterBlock) Release() { + b.bpool.Put(b.data) + b.bpool = nil + b.data = nil +} + type indexIter struct { - blockIter - tableReader *Reader - slice *util.Range + *blockIter + tr *Reader + slice *util.Range // Options - checksum bool fillCache bool } @@ -505,95 +494,173 @@ func (i *indexIter) Get() iterator.Iterator { } dataBH, n := decodeBlockHandle(value) if n == 0 { - return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid table (bad data block handle)")) + return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle")) } + var slice *util.Range if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { slice = i.slice } - return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache) + return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache) } // Reader is a table reader. type Reader struct { + mu sync.RWMutex + fi *storage.FileInfo reader io.ReaderAt - cache cache.Namespace + cache *cache.CacheGetter err error + bpool *util.BufferPool // Options - cmp comparer.Comparer - filter filter.Filter - checksum bool - strictIter bool + o *opt.Options + cmp comparer.Comparer + filter filter.Filter + verifyChecksum bool - dataEnd int64 - indexBlock *block - filterBlock *filterBlock + dataEnd int64 + metaBH, indexBH, filterBH blockHandle + indexBlock *block + filterBlock *filterBlock } -func verifyChecksum(data []byte) bool { - n := len(data) - 4 - checksum0 := binary.LittleEndian.Uint32(data[n:]) - checksum1 := util.NewCRC(data[:n]).Value() - return checksum0 == checksum1 +func (r *Reader) blockKind(bh blockHandle) string { + switch bh.offset { + case r.metaBH.offset: + return "meta-block" + case r.indexBH.offset: + return "index-block" + case r.filterBH.offset: + if r.filterBH.length > 0 { + return "filter-block" + } + } + return "data-block" } -func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { - data := make([]byte, bh.length+blockTrailerLen) +func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error { + return &errors.ErrCorrupted{File: r.fi, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}} +} + +func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error { + return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason) +} + +func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error { + if cerr, ok := err.(*ErrCorrupted); ok { + cerr.Pos = int64(bh.offset) + cerr.Size = int64(bh.length) + cerr.Kind = r.blockKind(bh) + return &errors.ErrCorrupted{File: r.fi, Err: cerr} + } + return err +} + +func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) { + data := r.bpool.Get(int(bh.length + blockTrailerLen)) if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF { return nil, err } - if checksum || r.checksum { - if !verifyChecksum(data) { - return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)") + + if verifyChecksum { + n := bh.length + 1 + checksum0 := binary.LittleEndian.Uint32(data[n:]) + checksum1 := util.NewCRC(data[:n]).Value() + if checksum0 != checksum1 { + r.bpool.Put(data) + return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1)) } } + switch data[bh.length] { case blockTypeNoCompression: data = data[:bh.length] case blockTypeSnappyCompression: - var err error - data, err = snappy.Decode(nil, data[:bh.length]) + decLen, err := snappy.DecodedLen(data[:bh.length]) if err != nil { - return nil, err + return nil, r.newErrCorruptedBH(bh, err.Error()) + } + decData := r.bpool.Get(decLen) + decData, err = snappy.Decode(decData, data[:bh.length]) + r.bpool.Put(data) + if err != nil { + r.bpool.Put(decData) + return nil, r.newErrCorruptedBH(bh, err.Error()) } + data = decData default: - return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length]) + r.bpool.Put(data) + return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length])) } return data, nil } -func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) { - data, err := r.readRawBlock(bh, checksum) +func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) { + data, err := r.readRawBlock(bh, verifyChecksum) if err != nil { return nil, err } restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) b := &block{ - cmp: r.cmp, + bpool: r.bpool, + bh: bh, data: data, restartsLen: restartsLen, restartsOffset: len(data) - (restartsLen+1)*4, - checksum: checksum || r.checksum, } return b, nil } -func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) { +func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) { + if r.cache != nil { + var ( + err error + ch *cache.Handle + ) + if fillCache { + ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { + var b *block + b, err = r.readBlock(bh, verifyChecksum) + if err != nil { + return 0, nil + } + return cap(b.data), b + }) + } else { + ch = r.cache.Get(bh.offset, nil) + } + if ch != nil { + b, ok := ch.Value().(*block) + if !ok { + ch.Release() + return nil, nil, errors.New("leveldb/table: inconsistent block type") + } + return b, ch, err + } else if err != nil { + return nil, nil, err + } + } + + b, err := r.readBlock(bh, verifyChecksum) + return b, b, err +} + +func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) { data, err := r.readRawBlock(bh, true) if err != nil { return nil, err } n := len(data) if n < 5 { - return nil, errors.New("leveldb/table: Reader: invalid filter block (too short)") + return nil, r.newErrCorruptedBH(bh, "too short") } m := n - 5 oOffset := int(binary.LittleEndian.Uint32(data[m:])) if oOffset > m { - return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)") + return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset") } b := &filterBlock{ - filter: filter, + bpool: r.bpool, data: data, oOffset: oOffset, baseLg: uint(data[n-1]), @@ -602,44 +669,111 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB return b, nil } -func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { +func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) { if r.cache != nil { - // Get/set block cache. - var err error - cache, ok := r.cache.Get(dataBH.offset, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { - if !fillCache { - return + var ( + err error + ch *cache.Handle + ) + if fillCache { + ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) { + var b *filterBlock + b, err = r.readFilterBlock(bh) + if err != nil { + return 0, nil + } + return cap(b.data), b + }) + } else { + ch = r.cache.Get(bh.offset, nil) + } + if ch != nil { + b, ok := ch.Value().(*filterBlock) + if !ok { + ch.Release() + return nil, nil, errors.New("leveldb/table: inconsistent block type") } - var dataBlock *block - dataBlock, err = r.readBlock(dataBH, checksum) - if err == nil { - ok = true - value = dataBlock - charge = int(dataBH.length) + return b, ch, err + } else if err != nil { + return nil, nil, err + } + } + + b, err := r.readFilterBlock(bh) + return b, b, err +} + +func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) { + if r.indexBlock == nil { + return r.readBlockCached(r.indexBH, true, fillCache) + } + return r.indexBlock, util.NoopReleaser{}, nil +} + +func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) { + if r.filterBlock == nil { + return r.readFilterBlockCached(r.filterBH, fillCache) + } + return r.filterBlock, util.NoopReleaser{}, nil +} + +func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter { + bi := &blockIter{ + tr: r, + block: b, + blockReleaser: bReleaser, + // Valid key should never be nil. + key: make([]byte, 0), + dir: dirSOI, + riStart: 0, + riLimit: b.restartsLen, + offsetStart: 0, + offsetRealStart: 0, + offsetLimit: b.restartsOffset, + } + if slice != nil { + if slice.Start != nil { + if bi.Seek(slice.Start) { + bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset) + bi.offsetStart = b.restartOffset(bi.riStart) + bi.offsetRealStart = bi.prevOffset + } else { + bi.riStart = b.restartsLen + bi.offsetStart = b.restartsOffset + bi.offsetRealStart = b.restartsOffset } - return - }) - if err != nil { - return iterator.NewEmptyIterator(err) } - if ok { - dataBlock := cache.Value().(*block) - if !dataBlock.checksum && (r.checksum || checksum) { - if !verifyChecksum(dataBlock.data) { - return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")) - } - dataBlock.checksum = true + if slice.Limit != nil { + if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) { + bi.offsetLimit = bi.prevOffset + bi.riLimit = bi.restartIndex + 1 } - iter := dataBlock.newIterator(slice, false, cache) - return iter + } + bi.reset() + if bi.offsetStart > bi.offsetLimit { + bi.sErr(errors.New("leveldb/table: invalid slice range")) } } - dataBlock, err := r.readBlock(dataBH, checksum) + return bi +} + +func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { + b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache) if err != nil { return iterator.NewEmptyIterator(err) } - iter := dataBlock.newIterator(slice, false, nil) - return iter + return r.newBlockIter(b, rel, slice, false) +} + +func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.err != nil { + return iterator.NewEmptyIterator(r.err) + } + + return r.getDataIter(dataBH, slice, verifyChecksum, fillCache) } // NewIterator creates an iterator from the table. @@ -653,35 +787,44 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi // when not used. // // Also read Iterator documentation of the leveldb/iterator package. - func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { return iterator.NewEmptyIterator(r.err) } + fillCache := !ro.GetDontFillCache() + indexBlock, rel, err := r.getIndexBlock(fillCache) + if err != nil { + return iterator.NewEmptyIterator(err) + } index := &indexIter{ - blockIter: *r.indexBlock.newIterator(slice, true, nil), - tableReader: r, - slice: slice, - checksum: ro.GetStrict(opt.StrictBlockChecksum), - fillCache: !ro.GetDontFillCache(), + blockIter: r.newBlockIter(indexBlock, rel, slice, true), + tr: r, + slice: slice, + fillCache: !ro.GetDontFillCache(), } - return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false) + return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader)) } -// Find finds key/value pair whose key is greater than or equal to the -// given key. It returns ErrNotFound if the table doesn't contain -// such pair. -// -// The caller should not modify the contents of the returned slice, but -// it is safe to modify the contents of the argument after Find returns. -func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err error) { +func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return } - index := r.indexBlock.newIterator(nil, true, nil) + indexBlock, rel, err := r.getIndexBlock(true) + if err != nil { + return + } + defer rel.Release() + + index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() if !index.Seek(key) { err = index.Error() @@ -692,14 +835,23 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err } dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { - err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)") + r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") return } - if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) { - err = ErrNotFound - return + if filtered && r.filter != nil { + filterBlock, frel, ferr := r.getFilterBlock(true) + if ferr == nil { + if !filterBlock.contains(r.filter, dataBH.offset, key) { + frel.Release() + return nil, nil, ErrNotFound + } + frel.Release() + } else if !errors.IsCorrupted(ferr) { + err = ferr + return + } } - data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache()) + data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) defer data.Release() if !data.Seek(key) { err = data.Error() @@ -708,23 +860,64 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err } return } + // Don't use block buffer, no need to copy the buffer. rkey = data.Key() - value = data.Value() + if !noValue { + if r.bpool == nil { + value = data.Value() + } else { + // Use block buffer, and since the buffer will be recycled, the buffer + // need to be copied. + value = append([]byte{}, data.Value()...) + } + } + return +} + +// Find finds key/value pair whose key is greater than or equal to the +// given key. It returns ErrNotFound if the table doesn't contain +// such pair. +// If filtered is true then the nearest 'block' will be checked against +// 'filter data' (if present) and will immediately return ErrNotFound if +// 'filter data' indicates that such pair doesn't exist. +// +// The caller may modify the contents of the returned slice as it is its +// own copy. +// It is safe to modify the contents of the argument after Find returns. +func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) { + return r.find(key, filtered, ro, false) +} + +// Find finds key that is greater than or equal to the given key. +// It returns ErrNotFound if the table doesn't contain such key. +// If filtered is true then the nearest 'block' will be checked against +// 'filter data' (if present) and will immediately return ErrNotFound if +// 'filter data' indicates that such key doesn't exist. +// +// The caller may modify the contents of the returned slice as it is its +// own copy. +// It is safe to modify the contents of the argument after Find returns. +func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) { + rkey, _, err = r.find(key, filtered, ro, true) return } // Get gets the value for the given key. It returns errors.ErrNotFound // if the table does not contain the key. // -// The caller should not modify the contents of the returned slice, but -// it is safe to modify the contents of the argument after Get returns. +// The caller may modify the contents of the returned slice as it is its +// own copy. +// It is safe to modify the contents of the argument after Find returns. func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return } - rkey, value, err := r.Find(key, ro) + rkey, value, err := r.find(key, false, ro, false) if err == nil && r.cmp.Compare(rkey, key) != 0 { value = nil err = ErrNotFound @@ -736,17 +929,26 @@ func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) // // It is safe to modify the contents of the argument after Get returns. func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.err != nil { err = r.err return } - index := r.indexBlock.newIterator(nil, true, nil) + indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) + if err != nil { + return + } + defer rel.Release() + + index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() if index.Seek(key) { dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { - err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)") + r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") return } offset = int64(dataBH.offset) @@ -759,90 +961,147 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { return } -// NewReader creates a new initialized table reader for the file. -// The cache is optional and can be nil. -func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { - r := &Reader{ - reader: f, - cache: cache, - cmp: o.GetComparer(), - checksum: o.GetStrict(opt.StrictBlockChecksum), - strictIter: o.GetStrict(opt.StrictIterator), +// Release implements util.Releaser. +// It also close the file if it is an io.Closer. +func (r *Reader) Release() { + r.mu.Lock() + defer r.mu.Unlock() + + if closer, ok := r.reader.(io.Closer); ok { + closer.Close() + } + if r.indexBlock != nil { + r.indexBlock.Release() + r.indexBlock = nil } + if r.filterBlock != nil { + r.filterBlock.Release() + r.filterBlock = nil + } + r.reader = nil + r.cache = nil + r.bpool = nil + r.err = ErrReaderReleased +} + +// NewReader creates a new initialized table reader for the file. +// The fi, cache and bpool is optional and can be nil. +// +// The returned table reader instance is goroutine-safe. +func NewReader(f io.ReaderAt, size int64, fi *storage.FileInfo, cache *cache.CacheGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { if f == nil { - r.err = errors.New("leveldb/table: Reader: nil file") - return r + return nil, errors.New("leveldb/table: nil file") } + + r := &Reader{ + fi: fi, + reader: f, + cache: cache, + bpool: bpool, + o: o, + cmp: o.GetComparer(), + verifyChecksum: o.GetStrict(opt.StrictBlockChecksum), + } + if size < footerLen { - r.err = errors.New("leveldb/table: Reader: invalid table (file size is too small)") - return r + r.err = r.newErrCorrupted(0, size, "table", "too small") + return r, nil } + + footerPos := size - footerLen var footer [footerLen]byte - if _, err := r.reader.ReadAt(footer[:], size-footerLen); err != nil && err != io.EOF { - r.err = fmt.Errorf("leveldb/table: Reader: invalid table (could not read footer): %v", err) + if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF { + return nil, err } if string(footer[footerLen-len(magic):footerLen]) != magic { - r.err = errors.New("leveldb/table: Reader: invalid table (bad magic number)") - return r + r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number") + return r, nil } + + var n int // Decode the metaindex block handle. - metaBH, n := decodeBlockHandle(footer[:]) + r.metaBH, n = decodeBlockHandle(footer[:]) if n == 0 { - r.err = errors.New("leveldb/table: Reader: invalid table (bad metaindex block handle)") - return r + r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle") + return r, nil } + // Decode the index block handle. - indexBH, n := decodeBlockHandle(footer[n:]) + r.indexBH, n = decodeBlockHandle(footer[n:]) if n == 0 { - r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)") - return r - } - // Read index block. - r.indexBlock, r.err = r.readBlock(indexBH, true) - if r.err != nil { - return r + r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle") + return r, nil } + // Read metaindex block. - metaBlock, err := r.readBlock(metaBH, true) + metaBlock, err := r.readBlock(r.metaBH, true) if err != nil { - r.err = err - return r + if errors.IsCorrupted(err) { + r.err = err + return r, nil + } else { + return nil, err + } } + // Set data end. - r.dataEnd = int64(metaBH.offset) - metaIter := metaBlock.newIterator(nil, false, nil) + r.dataEnd = int64(r.metaBH.offset) + + // Read metaindex. + metaIter := r.newBlockIter(metaBlock, nil, nil, true) for metaIter.Next() { key := string(metaIter.Key()) if !strings.HasPrefix(key, "filter.") { continue } fn := key[7:] - var filter filter.Filter if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn { - filter = f0 + r.filter = f0 } else { for _, f0 := range o.GetAltFilters() { if f0.Name() == fn { - filter = f0 + r.filter = f0 break } } } - if filter != nil { + if r.filter != nil { filterBH, n := decodeBlockHandle(metaIter.Value()) if n == 0 { continue } + r.filterBH = filterBH // Update data end. r.dataEnd = int64(filterBH.offset) - filterBlock, err := r.readFilterBlock(filterBH, filter) - if err != nil { - continue - } - r.filterBlock = filterBlock break } } metaIter.Release() - return r + metaBlock.Release() + + // Cache index and filter block locally, since we don't have global cache. + if cache == nil { + r.indexBlock, err = r.readBlock(r.indexBH, true) + if err != nil { + if errors.IsCorrupted(err) { + r.err = err + return r, nil + } else { + return nil, err + } + } + if r.filter != nil { + r.filterBlock, err = r.readFilterBlock(r.filterBH) + if err != nil { + if !errors.IsCorrupted(err) { + return nil, err + } + + // Don't use filter then. + r.filter = nil + } + } + } + + return r, nil } |