aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlash <nolash@users.noreply.github.com>2018-11-15 21:57:03 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2018-11-15 21:57:03 +0800
commita6942b9f25bd6c67a5ab9e80ab95aae25a08da6d (patch)
tree3c6fb2287364f2e21daa171905c4c877c8485535
parent17d67c5834679f2b27ef08eddfce3b3a154a96a8 (diff)
downloadgo-tangerine-a6942b9f25bd6c67a5ab9e80ab95aae25a08da6d.tar.gz
go-tangerine-a6942b9f25bd6c67a5ab9e80ab95aae25a08da6d.tar.zst
go-tangerine-a6942b9f25bd6c67a5ab9e80ab95aae25a08da6d.zip
swarm/storage: Batched database migration (#18113)
-rw-r--r--swarm/storage/ldbstore.go123
-rw-r--r--swarm/storage/ldbstore_test.go32
2 files changed, 127 insertions, 28 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index fbae59fac..bd4f6b916 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -284,7 +284,7 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
return val
}
-func parseGCIdxKey(key []byte) (byte, []byte) {
+func parseIdxKey(key []byte) (byte, []byte) {
return key[0], key[1:]
}
@@ -589,7 +589,7 @@ func (s *LDBStore) CleanGCIndex() error {
it.Seek([]byte{keyGCIdx})
var gcDeletes int
for it.Valid() {
- rowType, _ := parseGCIdxKey(it.Key())
+ rowType, _ := parseIdxKey(it.Key())
if rowType != keyGCIdx {
break
}
@@ -601,47 +601,113 @@ func (s *LDBStore) CleanGCIndex() error {
if err := s.db.Write(&batch); err != nil {
return err
}
+ batch.Reset()
- it.Seek([]byte{keyIndex})
- var idx dpaDBIndex
+ it.Release()
+
+ // corrected po index pointer values
var poPtrs [256]uint64
- for it.Valid() {
- rowType, chunkHash := parseGCIdxKey(it.Key())
- if rowType != keyIndex {
- break
+
+ // set to true if chunk count not on 4096 iteration boundary
+ var doneIterating bool
+
+ // last key index in previous iteration
+ lastIdxKey := []byte{keyIndex}
+
+ // counter for debug output
+ var cleanBatchCount int
+
+ // go through all key index entries
+ for !doneIterating {
+ cleanBatchCount++
+ var idxs []dpaDBIndex
+ var chunkHashes [][]byte
+ var pos []uint8
+ it := s.db.NewIterator()
+
+ it.Seek(lastIdxKey)
+
+ // 4096 is just a nice number, don't look for any hidden meaning here...
+ var i int
+ for i = 0; i < 4096; i++ {
+
+ // this really shouldn't happen unless database is empty
+ // but let's keep it to be safe
+ if !it.Valid() {
+ doneIterating = true
+ break
+ }
+
+ // if it's not keyindex anymore we're done iterating
+ rowType, chunkHash := parseIdxKey(it.Key())
+ if rowType != keyIndex {
+ doneIterating = true
+ break
+ }
+
+ // decode the retrieved index
+ var idx dpaDBIndex
+ err := decodeIndex(it.Value(), &idx)
+ if err != nil {
+ return fmt.Errorf("corrupt index: %v", err)
+ }
+ po := s.po(chunkHash)
+ lastIdxKey = it.Key()
+
+ // if we don't find the data key, remove the entry
+ // if we find it, add to the array of new gc indices to create
+ dataKey := getDataKey(idx.Idx, po)
+ _, err = s.db.Get(dataKey)
+ if err != nil {
+ log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
+ batch.Delete(it.Key())
+ } else {
+ idxs = append(idxs, idx)
+ chunkHashes = append(chunkHashes, chunkHash)
+ pos = append(pos, po)
+ okEntryCount++
+ if idx.Idx > poPtrs[po] {
+ poPtrs[po] = idx.Idx
+ }
+ }
+ totalEntryCount++
+ it.Next()
}
- err := decodeIndex(it.Value(), &idx)
+ it.Release()
+
+ // flush the key index corrections
+ err := s.db.Write(&batch)
if err != nil {
- return fmt.Errorf("corrupt index: %v", err)
+ return err
}
- po := s.po(chunkHash)
+ batch.Reset()
- // if we don't find the data key, remove the entry
- dataKey := getDataKey(idx.Idx, po)
- _, err = s.db.Get(dataKey)
- if err != nil {
- log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
- batch.Delete(it.Key())
- } else {
- gcIdxKey := getGCIdxKey(&idx)
- gcIdxData := getGCIdxValue(&idx, po, chunkHash)
+ // add correct gc indices
+ for i, okIdx := range idxs {
+ gcIdxKey := getGCIdxKey(&okIdx)
+ gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
batch.Put(gcIdxKey, gcIdxData)
- log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
- okEntryCount++
- if idx.Idx > poPtrs[po] {
- poPtrs[po] = idx.Idx
- }
+ log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
}
- totalEntryCount++
- it.Next()
+
+ // flush them
+ err = s.db.Write(&batch)
+ if err != nil {
+ return err
+ }
+ batch.Reset()
+
+ log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
}
- it.Release()
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
+ // lastly add updated entry count
var entryCount [8]byte
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
batch.Put(keyEntryCnt, entryCount[:])
+
+ // and add the new po index pointers
var poKey [2]byte
poKey[0] = keyDistanceCnt
for i, poPtr := range poPtrs {
@@ -655,6 +721,7 @@ func (s *LDBStore) CleanGCIndex() error {
}
}
+ // if you made it this far your harddisk has survived. Congratulations
return s.db.Write(&batch)
}
diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go
index 07557980c..9c10b3629 100644
--- a/swarm/storage/ldbstore_test.go
+++ b/swarm/storage/ldbstore_test.go
@@ -761,6 +761,38 @@ func TestCleanIndex(t *testing.T) {
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
}
}
+
+ // check that the iterator quits properly
+ chunks, err = mputRandomChunks(ldb, 4100, 4096)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ po = ldb.po(chunks[4099].Address()[:])
+ dataKey = make([]byte, 10)
+ dataKey[0] = keyData
+ dataKey[1] = byte(po)
+ binary.BigEndian.PutUint64(dataKey[2:], 4099+3)
+ if _, err := ldb.db.Get(dataKey); err != nil {
+ t.Fatal(err)
+ }
+ if err := ldb.db.Delete(dataKey); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := ldb.CleanGCIndex(); err != nil {
+ t.Fatal(err)
+ }
+
+ // entrycount should now be one less of added chunks
+ c, err = ldb.db.Get(keyEntryCnt)
+ if err != nil {
+ t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
+ }
+ entryCount = binary.BigEndian.Uint64(c)
+ if entryCount != 4099+2 {
+ t.Fatalf("expected entrycnt to be 2, was %d", c)
+ }
}
func waitGc(ctx context.Context, ldb *LDBStore) {