aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream
diff options
context:
space:
mode:
authorholisticode <holistic.computing@gmail.com>2018-11-16 06:41:19 +0800
committerViktor TrĂ³n <viktor.tron@gmail.com>2018-11-16 06:41:19 +0800
commitffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b (patch)
tree5746d3737127f79b7fdb4cc58f72c6bb984b6b88 /swarm/network/stream
parent324027640bcaf137b8c9e96bc26f0833711497af (diff)
downloaddexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.tar.gz
dexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.tar.zst
dexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.zip
Swarm accounting (#18050)
* swarm: completed 1st phase of swap accounting * swarm: swap accounting for swarm with p2p accounting * swarm/swap: addressed PR comments * swarm/swap: ignore ErrNotFound on stateStore.Get() * swarm/swap: GetPeerBalance test; add TODO for chequebook API check * swarm/network/stream: fix NewRegistry calls with new arguments * swarm/swap: address @justelad's PR comments
Diffstat (limited to 'swarm/network/stream')
-rw-r--r--swarm/network/stream/common_test.go2
-rw-r--r--swarm/network/stream/delivery_test.go8
-rw-r--r--swarm/network/stream/intervals_test.go2
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go2
-rw-r--r--swarm/network/stream/snapshot_sync_test.go4
-rw-r--r--swarm/network/stream/stream.go76
-rw-r--r--swarm/network/stream/syncer_test.go2
7 files changed, 61 insertions, 35 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go
index 721b873b7..c5f1fa176 100644
--- a/swarm/network/stream/common_test.go
+++ b/swarm/network/stream/common_test.go
@@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
- streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions)
+ streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil)
teardown := func() {
streamer.Close()
removeDataDir()
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 6b6025115..a6173a389 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -290,7 +290,7 @@ func TestRequestFromPeers(t *testing.T) {
Peer: protocolsPeer,
}, to)
to.On(peer)
- r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
@@ -331,7 +331,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
Peer: protocolsPeer,
}, to)
to.On(peer)
- r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
@@ -480,7 +480,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
- })
+ }, nil)
bucket.Store(bucketKeyRegistry, r)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
@@ -655,7 +655,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
- })
+ }, nil)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index b9525d4a4..defb6df50 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -84,7 +84,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck,
- })
+ }, nil)
bucket.Store(bucketKeyRegistry, r)
r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index ad1519341..5ea0b1511 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -130,7 +130,7 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
- })
+ }, nil)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 96c37bddc..6b92c32ae 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -166,7 +166,7 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
- })
+ }, nil)
bucket.Store(bucketKeyRegistry, r)
@@ -360,7 +360,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
- })
+ }, nil)
bucket.Store(bucketKeyRegistry, r)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 695ff0c50..32e107823 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
+ "reflect"
"sync"
"time"
@@ -87,6 +88,9 @@ type Registry struct {
intervalsStore state.Store
autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
+ spec *protocols.Spec //this protocol's spec
+ balance protocols.Balance //implements protocols.Balance, for accounting
+ prices protocols.Prices //implements protocols.Prices, provides prices to accounting
}
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -99,7 +103,7 @@ type RegistryOptions struct {
}
// NewRegistry is Streamer constructor
-func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
+func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
@@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
intervalsStore: intervalsStore,
autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
+ balance: balance,
}
+ streamer.setupSpec()
+
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
@@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer
}
+//we need to construct a spec instance per node instance
+func (r *Registry) setupSpec() {
+ //first create the "bare" spec
+ r.createSpec()
+ //if balance is nil, this node has been started without swap support (swapEnabled flag is false)
+ if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
+ //swap is enabled, so setup the hook
+ r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
+ }
+}
+
// RegisterClient registers an incoming streamer constructor
func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
r.clientMu.Lock()
@@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() {
}
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- peer := protocols.NewPeer(p, rw, Spec)
+ peer := protocols.NewPeer(p, rw, r.spec)
bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, r.delivery.kad)
r.delivery.kad.On(np)
@@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC)
}
-// Spec is the spec of the streamer protocol
-var Spec = &protocols.Spec{
- Name: "stream",
- Version: 8,
- MaxMsgSize: 10 * 1024 * 1024,
- Messages: []interface{}{
- UnsubscribeMsg{},
- OfferedHashesMsg{},
- WantedHashesMsg{},
- TakeoverProofMsg{},
- SubscribeMsg{},
- RetrieveRequestMsg{},
- ChunkDeliveryMsgRetrieval{},
- SubscribeErrorMsg{},
- RequestSubscriptionMsg{},
- QuitMsg{},
- ChunkDeliveryMsgSyncing{},
- },
+//GetSpec returns the streamer spec to callers
+//This used to be a global variable but for simulations with
+//multiple nodes its fields (notably the Hook) would be overwritten
+func (r *Registry) GetSpec() *protocols.Spec {
+ return r.spec
+}
+
+func (r *Registry) createSpec() {
+ // Spec is the spec of the streamer protocol
+ var spec = &protocols.Spec{
+ Name: "stream",
+ Version: 8,
+ MaxMsgSize: 10 * 1024 * 1024,
+ Messages: []interface{}{
+ UnsubscribeMsg{},
+ OfferedHashesMsg{},
+ WantedHashesMsg{},
+ TakeoverProofMsg{},
+ SubscribeMsg{},
+ RetrieveRequestMsg{},
+ ChunkDeliveryMsgRetrieval{},
+ SubscribeErrorMsg{},
+ RequestSubscriptionMsg{},
+ QuitMsg{},
+ ChunkDeliveryMsgSyncing{},
+ },
+ }
+ r.spec = spec
}
func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
- Name: Spec.Name,
- Version: Spec.Version,
- Length: Spec.Length(),
+ Name: r.spec.Name,
+ Version: r.spec.Version,
+ Length: r.spec.Length(),
Run: r.runProtocol,
- // NodeInfo: ,
- // PeerInfo: ,
},
}
}
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f4e055451..fe20bab26 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -118,7 +118,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
- })
+ }, nil)
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)