diff options
Diffstat (limited to 'swarm/network/stream/syncer.go')
-rw-r--r-- | swarm/network/stream/syncer.go | 82 |
1 files changed, 23 insertions, 59 deletions
diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index c573da5d2..79b04a307 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -34,27 +34,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store chunk.FetchStore - quit chan struct{} + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.LastPullSubscriptionBinID(s.po) + return s.netStore.LastPullSubscriptionBinID(s.po) } // SetNextBatch retrieves the next batch of hashes from the localstore. @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to) + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() const batchTimeout = 2 * time.Second @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // This is the most naive approach to label the chunk as synced // allowing it to be garbage collected. A proper way requires // validating that the chunk is successfully stored by the peer. - err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address) + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { return nil, 0, 0, nil, err } @@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // SwarmSyncerClient type SwarmSyncerClient struct { - store chunk.FetchStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone |