diff options
Diffstat (limited to 'swarm/network/stream/stream.go')
-rw-r--r-- | swarm/network/stream/stream.go | 76 |
1 files changed, 51 insertions, 25 deletions
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 695ff0c50..32e107823 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "reflect" "sync" "time" @@ -87,6 +88,9 @@ type Registry struct { intervalsStore state.Store autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting } // RegistryOptions holds optional values for NewRegistry constructor. @@ -99,7 +103,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } @@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy intervalsStore: intervalsStore, autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, + balance: balance, } + streamer.setupSpec() + streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer @@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy return streamer } +//we need to construct a spec instance per node instance +func (r *Registry) setupSpec() { + //first create the "bare" spec + r.createSpec() + //if balance is nil, this node has been started without swap support (swapEnabled flag is false) + if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() { + //swap is enabled, so setup the hook + r.spec.Hook = protocols.NewAccounting(r.balance, r.prices) + } +} + // RegisterClient registers an incoming streamer constructor func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) { r.clientMu.Lock() @@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() { } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := protocols.NewPeer(p, rw, Spec) + peer := protocols.NewPeer(p, rw, r.spec) bp := network.NewBzzPeer(peer) np := network.NewPeer(bp, r.delivery.kad) r.delivery.kad.On(np) @@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() { close(c.clientCreatedC) } -// Spec is the spec of the streamer protocol -var Spec = &protocols.Spec{ - Name: "stream", - Version: 8, - MaxMsgSize: 10 * 1024 * 1024, - Messages: []interface{}{ - UnsubscribeMsg{}, - OfferedHashesMsg{}, - WantedHashesMsg{}, - TakeoverProofMsg{}, - SubscribeMsg{}, - RetrieveRequestMsg{}, - ChunkDeliveryMsgRetrieval{}, - SubscribeErrorMsg{}, - RequestSubscriptionMsg{}, - QuitMsg{}, - ChunkDeliveryMsgSyncing{}, - }, +//GetSpec returns the streamer spec to callers +//This used to be a global variable but for simulations with +//multiple nodes its fields (notably the Hook) would be overwritten +func (r *Registry) GetSpec() *protocols.Spec { + return r.spec +} + +func (r *Registry) createSpec() { + // Spec is the spec of the streamer protocol + var spec = &protocols.Spec{ + Name: "stream", + Version: 8, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + UnsubscribeMsg{}, + OfferedHashesMsg{}, + WantedHashesMsg{}, + TakeoverProofMsg{}, + SubscribeMsg{}, + RetrieveRequestMsg{}, + ChunkDeliveryMsgRetrieval{}, + SubscribeErrorMsg{}, + RequestSubscriptionMsg{}, + QuitMsg{}, + ChunkDeliveryMsgSyncing{}, + }, + } + r.spec = spec } func (r *Registry) Protocols() []p2p.Protocol { return []p2p.Protocol{ { - Name: Spec.Name, - Version: Spec.Version, - Length: Spec.Length(), + Name: r.spec.Name, + Version: r.spec.Version, + Length: r.spec.Length(), Run: r.runProtocol, - // NodeInfo: , - // PeerInfo: , }, } } |