aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
authorAnton Evangelatov <anton.evangelatov@gmail.com>2018-08-20 20:10:30 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-20 20:10:30 +0800
commita8aa89accb0bcd4f24cd430d8f100e0ff9e239d0 (patch)
tree3e07be2fa796af0999e774feaab8b7b8122d8f4a /swarm
parentc4078fc80527b2cf0c2fd3e8771e9db06a597152 (diff)
downloaddexon-a8aa89accb0bcd4f24cd430d8f100e0ff9e239d0.tar.gz
dexon-a8aa89accb0bcd4f24cd430d8f100e0ff9e239d0.tar.zst
dexon-a8aa89accb0bcd4f24cd430d8f100e0ff9e239d0.zip
swarm/storage: cleanup task - remove bigger chunks (#17424)
Diffstat (limited to 'swarm')
-rw-r--r--swarm/storage/ldbstore.go67
1 files changed, 45 insertions, 22 deletions
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 7920ee767..b95aa13b0 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
@@ -384,14 +385,13 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
}
func (s *LDBStore) Cleanup() {
- //Iterates over the database and checks that there are no faulty chunks
+ //Iterates over the database and checks that there are no chunks bigger than 4kb
+ var errorsFound, removed, total int
+
it := s.db.NewIterator()
- startPosition := []byte{keyIndex}
- it.Seek(startPosition)
- var key []byte
- var errorsFound, total int
- for it.Valid() {
- key = it.Key()
+ defer it.Release()
+ for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
+ key := it.Key()
if (key == nil) || (key[0] != keyIndex) {
break
}
@@ -399,27 +399,50 @@ func (s *LDBStore) Cleanup() {
var index dpaDBIndex
err := decodeIndex(it.Value(), &index)
if err != nil {
- it.Next()
+ log.Warn("Cannot decode")
+ errorsFound++
continue
}
- data, err := s.db.Get(getDataKey(index.Idx, s.po(Address(key[1:]))))
+ hash := key[1:]
+ po := s.po(hash)
+ datakey := getDataKey(index.Idx, po)
+ data, err := s.db.Get(datakey)
if err != nil {
- log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
- s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:])))
- errorsFound++
- } else {
- hasher := s.hashfunc()
- hasher.Write(data[32:])
- hash := hasher.Sum(nil)
- if !bytes.Equal(hash, key[1:]) {
- log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]))
- s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:])))
+ found := false
+
+ // highest possible proximity is 255
+ for po = 1; po <= 255; po++ {
+ datakey = getDataKey(index.Idx, po)
+ data, err = s.db.Get(datakey)
+ if err == nil {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key[:]))
+ errorsFound++
+ continue
}
}
- it.Next()
+
+ c := &Chunk{}
+ ck := data[:32]
+ decodeData(data, c)
+
+ cs := int64(binary.LittleEndian.Uint64(c.SData[:8]))
+ log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
+
+ if len(c.SData) > chunk.DefaultSize+8 {
+ log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
+ s.delete(index.Idx, getIndexKey(key[1:]), po)
+ removed++
+ errorsFound++
+ }
}
- it.Release()
- log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
+
+ log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}
func (s *LDBStore) ReIndex() {