diff options
author | lash <nolash@users.noreply.github.com> | 2018-10-12 22:25:38 +0800 |
---|---|---|
committer | Anton Evangelatov <anton.evangelatov@gmail.com> | 2018-10-12 22:25:38 +0800 |
commit | dc3c3fb1e177c5d01ae3ca63717130eea924271e (patch) | |
tree | ae4c947e9fc1ead31e6a976b5f220e69e8623c50 | |
parent | 862d6f2fbf55313805a96cf2e17cb2493e25aa3b (diff) | |
download | dexon-dc3c3fb1e177c5d01ae3ca63717130eea924271e.tar.gz dexon-dc3c3fb1e177c5d01ae3ca63717130eea924271e.tar.zst dexon-dc3c3fb1e177c5d01ae3ca63717130eea924271e.zip |
swarm/storage: Add accessCnt for GC (#17845)
-rw-r--r-- | swarm/storage/ldbstore.go | 282 | ||||
-rw-r--r-- | swarm/storage/ldbstore_test.go | 219 |
2 files changed, 358 insertions, 143 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 2a7f51cb3..49508911f 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -32,7 +32,6 @@ import ( "fmt" "io" "io/ioutil" - "sort" "sync" "github.com/ethereum/go-ethereum/metrics" @@ -44,8 +43,13 @@ import ( ) const ( - gcArrayFreeRatio = 0.1 - maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage() + defaultGCRatio = 10 + defaultMaxGCRound = 10000 + defaultMaxGCBatch = 5000 + + wEntryCnt = 1 << 0 + wIndexCnt = 1 << 1 + wAccessCnt = 1 << 2 ) var ( @@ -61,6 +65,7 @@ var ( keyData = byte(6) keyDistanceCnt = byte(7) keySchema = []byte{8} + keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry ) var ( @@ -68,7 +73,7 @@ var ( ) type gcItem struct { - idx uint64 + idx *dpaDBIndex value uint64 idxKey []byte po uint8 @@ -89,6 +94,16 @@ func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams { } } +type garbage struct { + maxRound int // maximum number of chunks to delete in one garbage collection round + maxBatch int // maximum number of chunks to delete in one db request batch + ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db + count int // number of chunks deleted in running round + target int // number of chunks to delete in running round + batch *dbBatch // the delete batch + runC chan struct{} // struct in chan means gc is NOT running +} + type LDBStore struct { db *LDBDatabase @@ -102,12 +117,12 @@ type LDBStore struct { hashfunc SwarmHasher po func(Address) uint8 - batchC chan bool batchesC chan struct{} closed bool batch *dbBatch lock sync.RWMutex quit chan struct{} + gc *garbage // Functions encodeDataFunc is used to bypass // the default functionality of DbStore with @@ -166,9 +181,33 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { data, _ = s.db.Get(keyDataIdx) s.dataIdx = BytesToU64(data) + // set up garbage collection + s.gc = &garbage{ + maxBatch: defaultMaxGCBatch, + maxRound: defaultMaxGCRound, + ratio: defaultGCRatio, + } + + s.gc.runC = make(chan struct{}, 1) + s.gc.runC <- struct{}{} + return s, nil } +// initialize and set values for processing of gc round +func (s *LDBStore) startGC(c int) { + + s.gc.count = 0 + // calculate the target number of deletions + if c >= s.gc.maxRound { + s.gc.target = s.gc.maxRound + } else { + s.gc.target = c / s.gc.ratio + } + s.gc.batch = newBatch() + log.Debug("startgc", "requested", c, "target", s.gc.target) +} + // NewMockDbStore creates a new instance of DbStore with // mockStore set to a provided value. If mockStore argument is nil, // this function behaves exactly as NewDbStore. @@ -225,6 +264,31 @@ func getDataKey(idx uint64, po uint8) []byte { return key } +func getGCIdxKey(index *dpaDBIndex) []byte { + key := make([]byte, 9) + key[0] = keyGCIdx + binary.BigEndian.PutUint64(key[1:], index.Access) + return key +} + +func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte { + val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32 + val[0] = po + binary.BigEndian.PutUint64(val[1:], index.Idx) + copy(val[9:], addr) + return val +} + +func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) { + index = &dpaDBIndex{ + Idx: binary.BigEndian.Uint64(val[1:]), + Access: binary.BigEndian.Uint64(accessCnt), + } + po = val[0] + addr = val[9:] + return +} + func encodeIndex(index *dpaDBIndex) []byte { data, _ := rlp.EncodeToBytes(index) return data @@ -247,55 +311,70 @@ func decodeData(addr Address, data []byte) (*chunk, error) { return NewChunk(addr, data[32:]), nil } -func (s *LDBStore) collectGarbage(ratio float32) { - log.Trace("collectGarbage", "ratio", ratio) +func (s *LDBStore) collectGarbage() error { - metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1) + // prevent duplicate gc from starting when one is already running + select { + case <-s.gc.runC: + default: + return nil + } - it := s.db.NewIterator() - defer it.Release() + s.lock.Lock() + entryCnt := s.entryCnt + s.lock.Unlock() - garbage := []*gcItem{} - gcnt := 0 + metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1) - for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() { - itkey := it.Key() + // calculate the amount of chunks to collect and reset counter + s.startGC(int(entryCnt)) + log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt) - if (itkey == nil) || (itkey[0] != keyIndex) { - break - } + var totalDeleted int + for s.gc.count < s.gc.target { + it := s.db.NewIterator() + ok := it.Seek([]byte{keyGCIdx}) + var singleIterationCount int - // it.Key() contents change on next call to it.Next(), so we must copy it - key := make([]byte, len(it.Key())) - copy(key, it.Key()) + // every batch needs a lock so we avoid entries changing accessidx in the meantime + s.lock.Lock() + for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() { - val := it.Value() + // quit if no more access index keys + itkey := it.Key() + if (itkey == nil) || (itkey[0] != keyGCIdx) { + break + } - var index dpaDBIndex + // get chunk data entry from access index + val := it.Value() + index, po, hash := parseGCIdxEntry(itkey[1:], val) + keyIdx := make([]byte, 33) + keyIdx[0] = keyIndex + copy(keyIdx[1:], hash) - hash := key[1:] - decodeIndex(val, &index) - po := s.po(hash) + // add delete operation to batch + s.delete(s.gc.batch.Batch, index, keyIdx, po) + singleIterationCount++ + s.gc.count++ - gci := &gcItem{ - idxKey: key, - idx: index.Idx, - value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below. - po: po, + // break if target is not on max garbage batch boundary + if s.gc.count >= s.gc.target { + break + } } - garbage = append(garbage, gci) - gcnt++ + s.writeBatch(s.gc.batch, wEntryCnt) + s.lock.Unlock() + it.Release() + log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count) } - sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value }) - - cutoff := int(float32(gcnt) * ratio) - metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(cutoff)) + s.gc.runC <- struct{}{} + log.Debug("garbage collect done", "c", s.gc.count) - for i := 0; i < cutoff; i++ { - s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po) - } + metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(totalDeleted)) + return nil } // Export writes all chunks from the store to a tar archive, returning the @@ -474,7 +553,7 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) { // if chunk is to be removed if f(c) { log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs) - s.delete(index.Idx, getIndexKey(key[1:]), po) + s.deleteNow(&index, getIndexKey(key[1:]), po) removed++ errorsFound++ } @@ -526,24 +605,43 @@ func (s *LDBStore) ReIndex() { log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) } -func (s *LDBStore) Delete(addr Address) { +// Delete is removes a chunk and updates indices. +// Is thread safe +func (s *LDBStore) Delete(addr Address) error { s.lock.Lock() defer s.lock.Unlock() ikey := getIndexKey(addr) - var indx dpaDBIndex - s.tryAccessIdx(ikey, &indx) + idata, err := s.db.Get(ikey) + if err != nil { + return err + } - s.delete(indx.Idx, ikey, s.po(addr)) + var idx dpaDBIndex + decodeIndex(idata, &idx) + proximity := s.po(addr) + return s.deleteNow(&idx, ikey, proximity) } -func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { +// executes one delete operation immediately +// see *LDBStore.delete +func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error { + batch := new(leveldb.Batch) + s.delete(batch, idx, idxKey, po) + return s.db.Write(batch) +} + +// adds a delete chunk operation to the provided batch +// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64 +func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) { metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1) - batch := new(leveldb.Batch) + gcIdxKey := getGCIdxKey(idx) + batch.Delete(gcIdxKey) + dataKey := getDataKey(idx.Idx, po) + batch.Delete(dataKey) batch.Delete(idxKey) - batch.Delete(getDataKey(idx, po)) s.entryCnt-- dbEntryCount.Dec(1) cntKey := make([]byte, 2) @@ -551,7 +649,6 @@ func (s *LDBStore) delete(idx uint64, idxKey []byte, po uint8) { cntKey[1] = po batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) batch.Put(cntKey, U64ToBytes(s.bucketCnt[po])) - s.db.Write(batch) } func (s *LDBStore) BinIndex(po uint8) uint64 { @@ -572,6 +669,9 @@ func (s *LDBStore) CurrentStorageIndex() uint64 { return s.dataIdx } +// Put adds a chunk to the database, adding indices and incrementing global counters. +// If it already exists, it merely increments the access count of the existing entry. +// Is thread safe func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1) log.Trace("ldbstore.put", "key", chunk.Address()) @@ -594,7 +694,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { if err != nil { s.doPut(chunk, &index, po) } else { - log.Trace("ldbstore.put: chunk already exists, only update access", "key", chunk.Address) + log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po) decodeIndex(idata, &index) } index.Access = s.accessCnt @@ -602,6 +702,10 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { idata = encodeIndex(&index) s.batch.Put(ikey, idata) + // add the access-chunkindex index for garbage collection + gcIdxKey := getGCIdxKey(&index) + gcIdxData := getGCIdxValue(&index, po, chunk.Address()) + s.batch.Put(gcIdxKey, gcIdxData) s.lock.Unlock() select { @@ -617,7 +721,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { } } -// force putting into db, does not check access index +// force putting into db, does not check or update necessary indices func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { data := s.encodeDataFunc(chunk) dkey := getDataKey(s.dataIdx, po) @@ -659,38 +763,26 @@ func (s *LDBStore) writeCurrentBatch() error { if l == 0 { return nil } - e := s.entryCnt - d := s.dataIdx - a := s.accessCnt s.batch = newBatch() - b.err = s.writeBatch(b, e, d, a) + b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt) close(b.c) - for e > s.capacity { - log.Trace("for >", "e", e, "s.capacity", s.capacity) - // Collect garbage in a separate goroutine - // to be able to interrupt this loop by s.quit. - done := make(chan struct{}) - go func() { - s.collectGarbage(gcArrayFreeRatio) - log.Trace("collectGarbage closing done") - close(done) - }() - - select { - case <-s.quit: - return errors.New("CollectGarbage terminated due to quit") - case <-done: - } - e = s.entryCnt + if s.entryCnt >= s.capacity { + go s.collectGarbage() } return nil } // must be called non concurrently -func (s *LDBStore) writeBatch(b *dbBatch, entryCnt, dataIdx, accessCnt uint64) error { - b.Put(keyEntryCnt, U64ToBytes(entryCnt)) - b.Put(keyDataIdx, U64ToBytes(dataIdx)) - b.Put(keyAccessCnt, U64ToBytes(accessCnt)) +func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error { + if wFlag&wEntryCnt > 0 { + b.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) + } + if wFlag&wIndexCnt > 0 { + b.Put(keyDataIdx, U64ToBytes(s.dataIdx)) + } + if wFlag&wAccessCnt > 0 { + b.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) + } l := b.Len() if err := s.db.Write(b.Batch); err != nil { return fmt.Errorf("unable to write batch: %v", err) @@ -713,17 +805,22 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { } // try to find index; if found, update access cnt and return true -func (s *LDBStore) tryAccessIdx(ikey []byte, index *dpaDBIndex) bool { +func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { idata, err := s.db.Get(ikey) if err != nil { return false } decodeIndex(idata, index) + oldGCIdxKey := getGCIdxKey(index) s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) s.accessCnt++ index.Access = s.accessCnt idata = encodeIndex(index) s.batch.Put(ikey, idata) + newGCIdxKey := getGCIdxKey(index) + newGCIdxData := getGCIdxValue(index, po, ikey) + s.batch.Delete(oldGCIdxKey) + s.batch.Put(newGCIdxKey, newGCIdxData) select { case s.batchesC <- struct{}{}: default: @@ -755,6 +852,9 @@ func (s *LDBStore) PutSchema(schema string) error { return s.db.Put(keySchema, []byte(schema)) } +// Get retrieves the chunk matching the provided key from the database. +// If the chunk entry does not exist, it returns an error +// Updates access count and is thread safe func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) { metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1) log.Trace("ldbstore.get", "key", addr) @@ -764,12 +864,14 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) return s.get(addr) } +// TODO: To conform with other private methods of this object indices should not be updated func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { var indx dpaDBIndex if s.closed { return nil, ErrDBClosed } - if s.tryAccessIdx(getIndexKey(addr), &indx) { + proximity := s.po(addr) + if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) { var data []byte if s.getDataFunc != nil { // if getDataFunc is defined, use it to retrieve the chunk data @@ -780,13 +882,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { } } else { // default DbStore functionality to retrieve chunk data - proximity := s.po(addr) datakey := getDataKey(indx.Idx, proximity) data, err = s.db.Get(datakey) log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) if err != nil { log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err) - s.delete(indx.Idx, getIndexKey(addr), s.po(addr)) + s.deleteNow(&indx, getIndexKey(addr), s.po(addr)) return } } @@ -813,33 +914,14 @@ func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []by } } -func (s *LDBStore) updateAccessCnt(addr Address) { - - s.lock.Lock() - defer s.lock.Unlock() - - var index dpaDBIndex - s.tryAccessIdx(getIndexKey(addr), &index) // result_chn == nil, only update access cnt - -} - func (s *LDBStore) setCapacity(c uint64) { s.lock.Lock() defer s.lock.Unlock() s.capacity = c - if s.entryCnt > c { - ratio := float32(1.01) - float32(c)/float32(s.entryCnt) - if ratio < gcArrayFreeRatio { - ratio = gcArrayFreeRatio - } - if ratio > 1 { - ratio = 1 - } - for s.entryCnt > c { - s.collectGarbage(ratio) - } + for s.entryCnt > c { + s.collectGarbage() } } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 14a42b5e3..48af8c57c 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -22,6 +22,8 @@ import ( "fmt" "io/ioutil" "os" + "strconv" + "strings" "testing" "time" @@ -297,27 +299,73 @@ func TestLDBStoreWithoutCollectGarbage(t *testing.T) { } // TestLDBStoreCollectGarbage tests that we can put more chunks than LevelDB's capacity, and -// retrieve only some of them, because garbage collection must have cleared some of them +// retrieve only some of them, because garbage collection must have partially cleared the store +// Also tests that we can delete chunks and that we can trigger garbage collection func TestLDBStoreCollectGarbage(t *testing.T) { - capacity := 500 - n := 2000 + + // below max ronud + cap := defaultMaxGCRound / 2 + t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) + t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) + + // at max round + cap = defaultMaxGCRound + t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) + t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) + + // more than max around, not on threshold + cap = defaultMaxGCRound * 1.1 + t.Run(fmt.Sprintf("A/%d/%d", cap, cap*4), testLDBStoreCollectGarbage) + t.Run(fmt.Sprintf("B/%d/%d", cap, cap*4), testLDBStoreRemoveThenCollectGarbage) + +} + +func testLDBStoreCollectGarbage(t *testing.T) { + params := strings.Split(t.Name(), "/") + capacity, err := strconv.Atoi(params[2]) + if err != nil { + t.Fatal(err) + } + n, err := strconv.Atoi(params[3]) + if err != nil { + t.Fatal(err) + } ldb, cleanup := newLDBStore(t) ldb.setCapacity(uint64(capacity)) defer cleanup() - chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) - if err != nil { - t.Fatal(err.Error()) - } - log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + // retrieve the gc round target count for the db capacity + ldb.startGC(capacity) + roundTarget := ldb.gc.target + + // split put counts to gc target count threshold, and wait for gc to finish in between + var allChunks []Chunk + remaining := n + for remaining > 0 { + var putCount int + if remaining < roundTarget { + putCount = remaining + } else { + putCount = roundTarget + } + remaining -= putCount + chunks, err := mputRandomChunks(ldb, putCount, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) + } + allChunks = append(allChunks, chunks...) + log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n) - // wait for garbage collection to kick in on the responsible actor - time.Sleep(1 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + waitGc(ctx, ldb) + } + // attempt gets on all put chunks var missing int - for _, ch := range chunks { - ret, err := ldb.Get(context.Background(), ch.Address()) + for _, ch := range allChunks { + ret, err := ldb.Get(context.TODO(), ch.Address()) if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { missing++ continue @@ -333,8 +381,10 @@ func TestLDBStoreCollectGarbage(t *testing.T) { log.Trace("got back chunk", "chunk", ret) } - if missing < n-capacity { - t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", n-capacity, missing) + // all surplus chunks should be missing + expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget) + if missing != expectMissing { + t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", expectMissing, missing) } log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) @@ -367,7 +417,6 @@ func TestLDBStoreAddRemove(t *testing.T) { if i%2 == 0 { // expect even chunks to be missing if err == nil { - // if err != ErrChunkNotFound { t.Fatal("expected chunk to be missing, but got no error") } } else { @@ -383,30 +432,48 @@ func TestLDBStoreAddRemove(t *testing.T) { } } -// TestLDBStoreRemoveThenCollectGarbage tests that we can delete chunks and that we can trigger garbage collection -func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { - capacity := 11 - surplus := 4 +func testLDBStoreRemoveThenCollectGarbage(t *testing.T) { + + params := strings.Split(t.Name(), "/") + capacity, err := strconv.Atoi(params[2]) + if err != nil { + t.Fatal(err) + } + n, err := strconv.Atoi(params[3]) + if err != nil { + t.Fatal(err) + } ldb, cleanup := newLDBStore(t) + defer cleanup() ldb.setCapacity(uint64(capacity)) - n := capacity - - chunks := []Chunk{} - for i := 0; i < n+surplus; i++ { + // put capacity count number of chunks + chunks := make([]Chunk, n) + for i := 0; i < n; i++ { c := GenerateRandomChunk(ch.DefaultSize) - chunks = append(chunks, c) + chunks[i] = c log.Trace("generate random chunk", "idx", i, "chunk", c) } for i := 0; i < n; i++ { - ldb.Put(context.TODO(), chunks[i]) + err := ldb.Put(context.TODO(), chunks[i]) + if err != nil { + t.Fatal(err) + } } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + waitGc(ctx, ldb) + // delete all chunks + // (only count the ones actually deleted, the rest will have been gc'd) + deletes := 0 for i := 0; i < n; i++ { - ldb.Delete(chunks[i].Address()) + if ldb.Delete(chunks[i].Address()) == nil { + deletes++ + } } log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) @@ -415,37 +482,49 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { t.Fatalf("ldb.entrCnt expected 0 got %v", ldb.entryCnt) } - expAccessCnt := uint64(n * 2) + // the manual deletes will have increased accesscnt, so we need to add this when we verify the current count + expAccessCnt := uint64(n) if ldb.accessCnt != expAccessCnt { - t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.entryCnt) + t.Fatalf("ldb.accessCnt expected %v got %v", expAccessCnt, ldb.accessCnt) } - cleanup() + // retrieve the gc round target count for the db capacity + ldb.startGC(capacity) + roundTarget := ldb.gc.target - ldb, cleanup = newLDBStore(t) - capacity = 10 - ldb.setCapacity(uint64(capacity)) - defer cleanup() - - n = capacity + surplus + remaining := n + var puts int + for remaining > 0 { + var putCount int + if remaining < roundTarget { + putCount = remaining + } else { + putCount = roundTarget + } + remaining -= putCount + for putCount > 0 { + ldb.Put(context.TODO(), chunks[puts]) + log.Debug("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt, "cap", capacity, "n", n, "puts", puts, "remaining", remaining, "roundtarget", roundTarget) + puts++ + putCount-- + } - for i := 0; i < n; i++ { - ldb.Put(context.TODO(), chunks[i]) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + waitGc(ctx, ldb) } - // wait for garbage collection - time.Sleep(1 * time.Second) - // expect first surplus chunks to be missing, because they have the smallest access value - for i := 0; i < surplus; i++ { + expectMissing := roundTarget + (((n - capacity) / roundTarget) * roundTarget) + for i := 0; i < expectMissing; i++ { _, err := ldb.Get(context.TODO(), chunks[i].Address()) if err == nil { - t.Fatal("expected surplus chunk to be missing, but got no error") + t.Fatalf("expected surplus chunk %d to be missing, but got no error", i) } } // expect last chunks to be present, as they have the largest access value - for i := surplus; i < surplus+capacity; i++ { + for i := expectMissing; i < n; i++ { ret, err := ldb.Get(context.TODO(), chunks[i].Address()) if err != nil { t.Fatalf("chunk %v: expected no error, but got %s", i, err) @@ -455,3 +534,57 @@ func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { } } } + +// TestLDBStoreCollectGarbageAccessUnlikeIndex tests garbage collection where accesscount differs from indexcount +func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) { + + capacity := defaultMaxGCRound * 2 + n := capacity - 1 + + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + defer cleanup() + + chunks, err := mputRandomChunks(ldb, n, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) + } + log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + + // set first added capacity/2 chunks to highest accesscount + for i := 0; i < capacity/2; i++ { + _, err := ldb.Get(context.TODO(), chunks[i].Address()) + if err != nil { + t.Fatalf("fail add chunk #%d - %s: %v", i, chunks[i].Address(), err) + } + } + _, err = mputRandomChunks(ldb, 2, int64(ch.DefaultSize)) + if err != nil { + t.Fatal(err.Error()) + } + + // wait for garbage collection to kick in on the responsible actor + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + waitGc(ctx, ldb) + + var missing int + for i, ch := range chunks[2 : capacity/2] { + ret, err := ldb.Get(context.TODO(), ch.Address()) + if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { + t.Fatalf("fail find chunk #%d - %s: %v", i, ch.Address(), err) + } + + if !bytes.Equal(ret.Data(), ch.Data()) { + t.Fatal("expected to get the same data back, but got smth else") + } + log.Trace("got back chunk", "chunk", ret) + } + + log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) +} + +func waitGc(ctx context.Context, ldb *LDBStore) { + <-ldb.gc.runC + ldb.gc.runC <- struct{}{} +} |