diff options
author | holisticode <holistic.computing@gmail.com> | 2018-11-16 06:41:19 +0800 |
---|---|---|
committer | Viktor TrĂ³n <viktor.tron@gmail.com> | 2018-11-16 06:41:19 +0800 |
commit | ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b (patch) | |
tree | 5746d3737127f79b7fdb4cc58f72c6bb984b6b88 /swarm/network/stream | |
parent | 324027640bcaf137b8c9e96bc26f0833711497af (diff) | |
download | dexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.tar.gz dexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.tar.zst dexon-ffe2fc3bc4d77ad3f503d2bc1cdd62eac8d03c5b.zip |
Swarm accounting (#18050)
* swarm: completed 1st phase of swap accounting
* swarm: swap accounting for swarm with p2p accounting
* swarm/swap: addressed PR comments
* swarm/swap: ignore ErrNotFound on stateStore.Get()
* swarm/swap: GetPeerBalance test; add TODO for chequebook API check
* swarm/network/stream: fix NewRegistry calls with new arguments
* swarm/swap: address @justelad's PR comments
Diffstat (limited to 'swarm/network/stream')
-rw-r--r-- | swarm/network/stream/common_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/delivery_test.go | 8 | ||||
-rw-r--r-- | swarm/network/stream/intervals_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_retrieval_test.go | 2 | ||||
-rw-r--r-- | swarm/network/stream/snapshot_sync_test.go | 4 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 76 | ||||
-rw-r--r-- | swarm/network/stream/syncer_test.go | 2 |
7 files changed, 61 insertions, 35 deletions
diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 721b873b7..c5f1fa176 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions) + streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil) teardown := func() { streamer.Close() removeDataDir() diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 6b6025115..a6173a389 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -290,7 +290,7 @@ func TestRequestFromPeers(t *testing.T) { Peer: protocolsPeer, }, to) to.On(peer) - r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ @@ -331,7 +331,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { Peer: protocolsPeer, }, to) to.On(peer) - r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ Peer: protocolsPeer, @@ -480,7 +480,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck SkipCheck: skipCheck, Syncing: SyncingDisabled, Retrieval: RetrievalEnabled, - }) + }, nil) bucket.Store(bucketKeyRegistry, r) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) @@ -655,7 +655,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip Syncing: SyncingDisabled, Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, - }) + }, nil) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index b9525d4a4..defb6df50 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -84,7 +84,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, - }) + }, nil) bucket.Store(bucketKeyRegistry, r) r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index ad1519341..5ea0b1511 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -130,7 +130,7 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, - }) + }, nil) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 96c37bddc..6b92c32ae 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -166,7 +166,7 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, - }) + }, nil) bucket.Store(bucketKeyRegistry, r) @@ -360,7 +360,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, - }) + }, nil) bucket.Store(bucketKeyRegistry, r) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 695ff0c50..32e107823 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math" + "reflect" "sync" "time" @@ -87,6 +88,9 @@ type Registry struct { intervalsStore state.Store autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting } // RegistryOptions holds optional values for NewRegistry constructor. @@ -99,7 +103,7 @@ type RegistryOptions struct { } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } @@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy intervalsStore: intervalsStore, autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, + balance: balance, } + streamer.setupSpec() + streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer @@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy return streamer } +//we need to construct a spec instance per node instance +func (r *Registry) setupSpec() { + //first create the "bare" spec + r.createSpec() + //if balance is nil, this node has been started without swap support (swapEnabled flag is false) + if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() { + //swap is enabled, so setup the hook + r.spec.Hook = protocols.NewAccounting(r.balance, r.prices) + } +} + // RegisterClient registers an incoming streamer constructor func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) { r.clientMu.Lock() @@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() { } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := protocols.NewPeer(p, rw, Spec) + peer := protocols.NewPeer(p, rw, r.spec) bp := network.NewBzzPeer(peer) np := network.NewPeer(bp, r.delivery.kad) r.delivery.kad.On(np) @@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() { close(c.clientCreatedC) } -// Spec is the spec of the streamer protocol -var Spec = &protocols.Spec{ - Name: "stream", - Version: 8, - MaxMsgSize: 10 * 1024 * 1024, - Messages: []interface{}{ - UnsubscribeMsg{}, - OfferedHashesMsg{}, - WantedHashesMsg{}, - TakeoverProofMsg{}, - SubscribeMsg{}, - RetrieveRequestMsg{}, - ChunkDeliveryMsgRetrieval{}, - SubscribeErrorMsg{}, - RequestSubscriptionMsg{}, - QuitMsg{}, - ChunkDeliveryMsgSyncing{}, - }, +//GetSpec returns the streamer spec to callers +//This used to be a global variable but for simulations with +//multiple nodes its fields (notably the Hook) would be overwritten +func (r *Registry) GetSpec() *protocols.Spec { + return r.spec +} + +func (r *Registry) createSpec() { + // Spec is the spec of the streamer protocol + var spec = &protocols.Spec{ + Name: "stream", + Version: 8, + MaxMsgSize: 10 * 1024 * 1024, + Messages: []interface{}{ + UnsubscribeMsg{}, + OfferedHashesMsg{}, + WantedHashesMsg{}, + TakeoverProofMsg{}, + SubscribeMsg{}, + RetrieveRequestMsg{}, + ChunkDeliveryMsgRetrieval{}, + SubscribeErrorMsg{}, + RequestSubscriptionMsg{}, + QuitMsg{}, + ChunkDeliveryMsgSyncing{}, + }, + } + r.spec = spec } func (r *Registry) Protocols() []p2p.Protocol { return []p2p.Protocol{ { - Name: Spec.Name, - Version: Spec.Version, - Length: Spec.Length(), + Name: r.spec.Name, + Version: r.spec.Version, + Length: r.spec.Length(), Run: r.runProtocol, - // NodeInfo: , - // PeerInfo: , }, } } diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f4e055451..fe20bab26 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -118,7 +118,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, - }) + }, nil) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) bucket.Store(bucketKeyFileStore, fileStore) |