aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/syncer.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r--swarm/network/stream/syncer.go94
1 files changed, 45 insertions, 49 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go
index d7febe4a3..e9811a678 100644
--- a/swarm/network/stream/syncer.go
+++ b/swarm/network/stream/syncer.go
@@ -28,7 +28,6 @@ import (
)
const (
- // BatchSize = 2
BatchSize = 128
)
@@ -38,35 +37,37 @@ const (
// * (live/non-live historical) chunk syncing per proximity bin
type SwarmSyncerServer struct {
po uint8
- db *storage.DBAPI
+ store storage.SyncChunkStore
sessionAt uint64
start uint64
+ live bool
quit chan struct{}
}
// NewSwarmSyncerServer is contructor for SwarmSyncerServer
-func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) {
- sessionAt := db.CurrentBucketStorageIndex(po)
+func NewSwarmSyncerServer(live bool, po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error) {
+ sessionAt := syncChunkStore.BinIndex(po)
var start uint64
if live {
start = sessionAt
}
return &SwarmSyncerServer{
po: po,
- db: db,
+ store: syncChunkStore,
sessionAt: sessionAt,
start: start,
+ live: live,
quit: make(chan struct{}),
}, nil
}
-func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore) {
streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) {
po, err := ParseSyncBinKey(t)
if err != nil {
return nil, err
}
- return NewSwarmSyncerServer(live, po, db)
+ return NewSwarmSyncerServer(live, po, syncChunkStore)
})
// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) {
// return NewOutgoingProvableSwarmSyncer(po, db)
@@ -78,27 +79,35 @@ func (s *SwarmSyncerServer) Close() {
close(s.quit)
}
-// GetSection retrieves the actual chunk from localstore
+// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- chunk, err := s.db.Get(ctx, storage.Address(key))
- if err == storage.ErrFetching {
- <-chunk.ReqC
- } else if err != nil {
+ chunk, err := s.store.Get(ctx, storage.Address(key))
+ if err != nil {
return nil, err
}
- return chunk.SData, nil
+ return chunk.Data(), nil
}
// GetBatch retrieves the next batch of hashes from the dbstore
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
var batch []byte
i := 0
- if from == 0 {
- from = s.start
- }
- if to <= from || from >= s.sessionAt {
- to = math.MaxUint64
+ if s.live {
+ if from == 0 {
+ from = s.start
+ }
+ if to <= from || from >= s.sessionAt {
+ to = math.MaxUint64
+ }
+ } else {
+ if (to < from && to != 0) || from > s.sessionAt {
+ return nil, 0, 0, nil, nil
+ }
+ if to == 0 || to > s.sessionAt {
+ to = s.sessionAt
+ }
}
+
var ticker *time.Ticker
defer func() {
if ticker != nil {
@@ -119,8 +128,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
}
metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1)
- err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool {
- batch = append(batch, addr[:]...)
+ err := s.store.Iterator(from, to, s.po, func(key storage.Address, idx uint64) bool {
+ batch = append(batch, key[:]...)
i++
to = idx
return i < BatchSize
@@ -134,7 +143,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
wait = true
}
- log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po))
+ log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.store.BinIndex(s.po))
return batch, from, to, nil, nil
}
@@ -146,28 +155,26 @@ type SwarmSyncerClient struct {
sessionReader storage.LazySectionReader
retrieveC chan *storage.Chunk
storeC chan *storage.Chunk
- db *storage.DBAPI
+ store storage.SyncChunkStore
// chunker storage.Chunker
- currentRoot storage.Address
- requestFunc func(chunk *storage.Chunk)
- end, start uint64
- peer *Peer
- ignoreExistingRequest bool
- stream Stream
+ currentRoot storage.Address
+ requestFunc func(chunk *storage.Chunk)
+ end, start uint64
+ peer *Peer
+ stream Stream
}
// NewSwarmSyncerClient is a contructor for provable data exchange syncer
-func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) {
+func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error) {
return &SwarmSyncerClient{
- db: db,
- peer: p,
- ignoreExistingRequest: ignoreExistingRequest,
- stream: stream,
+ store: store,
+ peer: p,
+ stream: stream,
}, nil
}
// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer
-// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
+// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient {
// retrieveC := make(storage.Chunk, chunksCap)
// RunChunkRequestor(p, retrieveC)
// storeC := make(storage.Chunk, chunksCap)
@@ -204,26 +211,15 @@ func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool
// RegisterSwarmSyncerClient registers the client constructor function for
// to handle incoming sync streams
-func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) {
+func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore) {
streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live))
+ return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live))
})
}
// NeedData
-func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func()) {
- chunk, _ := s.db.GetOrCreateRequest(ctx, key)
- // TODO: we may want to request from this peer anyway even if the request exists
-
- // ignoreExistingRequest is temporary commented out until its functionality is verified.
- // For now, this optimization can be disabled.
- if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) {
- return nil
- }
- // create request and wait until the chunk data arrives and is stored
- return func() {
- chunk.WaitToStore()
- }
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
+ return s.store.FetchFunc(ctx, key)
}
// BatchDone