aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r--swarm/network/stream/stream.go28
1 files changed, 8 insertions, 20 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index deffdfc3f..1f1f34b7b 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -32,10 +32,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/pot"
- "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
- opentracing "github.com/opentracing/opentracing-go"
)
const (
@@ -43,8 +41,8 @@ const (
Mid
High
Top
- PriorityQueue // number of queues
- PriorityQueueCap = 32 // queue capacity
+ PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top
+ PriorityQueueCap = 128 // queue capacity
HashSize = 32
)
@@ -73,7 +71,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -93,13 +91,13 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, i
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, _ bool) (Server, error) {
- return NewSwarmChunkServer(delivery.db), nil
+ return NewSwarmChunkServer(delivery.chunkStore), nil
})
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, delivery.db, false, NewStream(swarmChunkServerStreamName, t, live))
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
- RegisterSwarmSyncerServer(streamer, db)
- RegisterSwarmSyncerClient(streamer, db)
+ RegisterSwarmSyncerServer(streamer, syncChunkStore)
+ RegisterSwarmSyncerClient(streamer, syncChunkStore)
if options.DoSync {
// latestIntC function ensures that
@@ -325,16 +323,6 @@ func (r *Registry) Quit(peerId discover.NodeID, s Stream) error {
return peer.Send(context.TODO(), msg)
}
-func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error {
- var sp opentracing.Span
- ctx, sp = spancontext.StartSpan(
- ctx,
- "registry.retrieve")
- defer sp.Finish()
-
- return r.delivery.RequestFromPeers(ctx, chunk.Addr[:], r.skipCheck)
-}
-
func (r *Registry) NodeInfo() interface{} {
return nil
}
@@ -557,7 +545,7 @@ func (c client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) func()
+ NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}