diff options
Diffstat (limited to 'swarm/storage/common_test.go')
-rw-r--r-- | swarm/storage/common_test.go | 192 |
1 files changed, 132 insertions, 60 deletions
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index dc1a3ab35..33133edd7 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -23,16 +23,20 @@ import ( "flag" "fmt" "io" + "io/ioutil" + "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/log" + ch "github.com/ethereum/go-ethereum/swarm/chunk" colorable "github.com/mattn/go-colorable" ) var ( - loglevel = flag.Int("loglevel", 3, "verbosity of logs") + loglevel = flag.Int("loglevel", 3, "verbosity of logs") + getTimeout = 30 * time.Second ) func init() { @@ -56,47 +60,73 @@ func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader } } -func mputRandomChunks(store ChunkStore, processors int, n int, chunksize int64) (hs []Address) { - return mput(store, processors, n, GenerateRandomChunk) +func newLDBStore(t *testing.T) (*LDBStore, func()) { + dir, err := ioutil.TempDir("", "bzz-storage-test") + if err != nil { + t.Fatal(err) + } + log.Trace("memstore.tempdir", "dir", dir) + + ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) + db, err := NewLDBStore(ldbparams) + if err != nil { + t.Fatal(err) + } + + cleanup := func() { + db.Close() + err := os.RemoveAll(dir) + if err != nil { + t.Fatal(err) + } + } + + return db, cleanup } -func mput(store ChunkStore, processors int, n int, f func(i int64) *Chunk) (hs []Address) { - wg := sync.WaitGroup{} - wg.Add(processors) - c := make(chan *Chunk) - for i := 0; i < processors; i++ { +func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) { + return mput(store, n, GenerateRandomChunk) +} + +func mputChunks(store ChunkStore, chunks ...Chunk) error { + i := 0 + f := func(n int64) Chunk { + chunk := chunks[i] + i++ + return chunk + } + _, err := mput(store, len(chunks), f) + return err +} + +func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) { + // put to localstore and wait for stored channel + // does not check delivery error state + errc := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + for i := int64(0); i < int64(n); i++ { + chunk := f(ch.DefaultSize) go func() { - defer wg.Done() - for chunk := range c { - wg.Add(1) - chunk := chunk - store.Put(context.TODO(), chunk) - go func() { - defer wg.Done() - <-chunk.dbStoredC - }() + select { + case errc <- store.Put(ctx, chunk): + case <-ctx.Done(): } }() + hs = append(hs, chunk) } - fa := f - if _, ok := store.(*MemStore); ok { - fa = func(i int64) *Chunk { - chunk := f(i) - chunk.markAsStored() - return chunk - } - } + + // wait for all chunks to be stored for i := 0; i < n; i++ { - chunk := fa(int64(i)) - hs = append(hs, chunk.Addr) - c <- chunk + err := <-errc + if err != nil { + return nil, err + } } - close(c) - wg.Wait() - return hs + return hs, nil } -func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) error { +func mget(store ChunkStore, hs []Address, f func(h Address, chunk Chunk) error) error { wg := sync.WaitGroup{} wg.Add(len(hs)) errc := make(chan error) @@ -104,6 +134,7 @@ func mget(store ChunkStore, hs []Address, f func(h Address, chunk *Chunk) error) for _, k := range hs { go func(h Address) { defer wg.Done() + // TODO: write timeout with context chunk, err := store.Get(context.TODO(), h) if err != nil { errc <- err @@ -143,57 +174,54 @@ func (r *brokenLimitedReader) Read(buf []byte) (int, error) { return r.lr.Read(buf) } -func generateRandomData(l int) (r io.Reader, slice []byte) { - slice = make([]byte, l) - if _, err := rand.Read(slice); err != nil { - panic("rand error") +func testStoreRandom(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) } - r = io.LimitReader(bytes.NewReader(slice), int64(l)) - return -} - -func testStoreRandom(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - err := mget(m, hs, nil) + err = mget(m, chunkAddresses(chunks), nil) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func testStoreCorrect(m ChunkStore, processors int, n int, chunksize int64, t *testing.T) { - hs := mputRandomChunks(m, processors, n, chunksize) - f := func(h Address, chunk *Chunk) error { - if !bytes.Equal(h, chunk.Addr) { - return fmt.Errorf("key does not match retrieved chunk Key") +func testStoreCorrect(m ChunkStore, n int, chunksize int64, t *testing.T) { + chunks, err := mputRandomChunks(m, n, chunksize) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + f := func(h Address, chunk Chunk) error { + if !bytes.Equal(h, chunk.Address()) { + return fmt.Errorf("key does not match retrieved chunk Address") } hasher := MakeHashFunc(DefaultHash)() - hasher.ResetWithLength(chunk.SData[:8]) - hasher.Write(chunk.SData[8:]) + hasher.ResetWithLength(chunk.SpanBytes()) + hasher.Write(chunk.Payload()) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { return fmt.Errorf("key is not hash of chunk data") } return nil } - err := mget(m, hs, f) + err = mget(m, chunkAddresses(chunks), f) if err != nil { t.Fatalf("testStore failed: %v", err) } } -func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - chunks := make([]*Chunk, n) +func benchmarkStorePut(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks := make([]Chunk, n) i := 0 - f := func(dataSize int64) *Chunk { + f := func(dataSize int64) Chunk { chunk := GenerateRandomChunk(dataSize) chunks[i] = chunk i++ return chunk } - mput(store, processors, n, f) + mput(store, n, f) - f = func(dataSize int64) *Chunk { + f = func(dataSize int64) Chunk { chunk := chunks[i] i++ return chunk @@ -204,18 +232,62 @@ func benchmarkStorePut(store ChunkStore, processors int, n int, chunksize int64, for j := 0; j < b.N; j++ { i = 0 - mput(store, processors, n, f) + mput(store, n, f) } } -func benchmarkStoreGet(store ChunkStore, processors int, n int, chunksize int64, b *testing.B) { - hs := mputRandomChunks(store, processors, n, chunksize) +func benchmarkStoreGet(store ChunkStore, n int, chunksize int64, b *testing.B) { + chunks, err := mputRandomChunks(store, n, chunksize) + if err != nil { + b.Fatalf("expected no error, got %v", err) + } b.ReportAllocs() b.ResetTimer() + addrs := chunkAddresses(chunks) for i := 0; i < b.N; i++ { - err := mget(store, hs, nil) + err := mget(store, addrs, nil) if err != nil { b.Fatalf("mget failed: %v", err) } } } + +// MapChunkStore is a very simple ChunkStore implementation to store chunks in a map in memory. +type MapChunkStore struct { + chunks map[string]Chunk + mu sync.RWMutex +} + +func NewMapChunkStore() *MapChunkStore { + return &MapChunkStore{ + chunks: make(map[string]Chunk), + } +} + +func (m *MapChunkStore) Put(_ context.Context, ch Chunk) error { + m.mu.Lock() + defer m.mu.Unlock() + m.chunks[ch.Address().Hex()] = ch + return nil +} + +func (m *MapChunkStore) Get(_ context.Context, ref Address) (Chunk, error) { + m.mu.RLock() + defer m.mu.RUnlock() + chunk := m.chunks[ref.Hex()] + if chunk == nil { + return nil, ErrChunkNotFound + } + return chunk, nil +} + +func (m *MapChunkStore) Close() { +} + +func chunkAddresses(chunks []Chunk) []Address { + addrs := make([]Address, len(chunks)) + for i, ch := range chunks { + addrs[i] = ch.Address() + } + return addrs +} |