diff options
author | holisticode <holistic.computing@gmail.com> | 2019-01-08 07:59:00 +0800 |
---|---|---|
committer | Viktor TrĂ³n <viktor.tron@gmail.com> | 2019-01-08 07:59:00 +0800 |
commit | ae857e74bfda1f961dc5741441e5b36a2bb9aa93 (patch) | |
tree | e8d75eb51adccc215c63855b235e83ee45698e85 | |
parent | 56a3f6c03cc3c7ae38ab7354f8615c014bb2102a (diff) | |
download | dexon-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.gz dexon-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.tar.zst dexon-ae857e74bfda1f961dc5741441e5b36a2bb9aa93.zip |
swarm, p2p/protocols: Stream accounting (#18337)
* swarm: completed 1st phase of swap accounting
* swarm, p2p/protocols: added stream pricing
* swarm/network/stream: gofmt simplify stream.go
* swarm: fixed review comments
* swarm: used snapshots for swap tests
* swarm: custom retrieve for swap (less cascaded requests at any one time)
* swarm: addressed PR comments
* swarm: log output formatting
* swarm: removed parallelism in swap tests
* swarm: swap tests simplification
* swarm: removed swap_test.go
* swarm/network/stream: added prefix space for comments
* swarm/network/stream: unit test for prices
* swarm/network/stream: don't hardcode price
* swarm/network/stream: fixed invalid price check
-rw-r--r-- | p2p/protocols/accounting.go | 148 | ||||
-rw-r--r-- | swarm/network/stream/stream.go | 110 | ||||
-rw-r--r-- | swarm/network/stream/streamer_test.go | 31 |
3 files changed, 185 insertions, 104 deletions
diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go index 770406a27..bdc490e59 100644 --- a/p2p/protocols/accounting.go +++ b/p2p/protocols/accounting.go @@ -22,31 +22,33 @@ import ( "github.com/ethereum/go-ethereum/metrics" ) -//define some metrics +// define some metrics var ( - //All metrics are cumulative + // All metrics are cumulative - //total amount of units credited + // total amount of units credited mBalanceCredit metrics.Counter - //total amount of units debited + // total amount of units debited mBalanceDebit metrics.Counter - //total amount of bytes credited + // total amount of bytes credited mBytesCredit metrics.Counter - //total amount of bytes debited + // total amount of bytes debited mBytesDebit metrics.Counter - //total amount of credited messages + // total amount of credited messages mMsgCredit metrics.Counter - //total amount of debited messages + // total amount of debited messages mMsgDebit metrics.Counter - //how many times local node had to drop remote peers + // how many times local node had to drop remote peers mPeerDrops metrics.Counter - //how many times local node overdrafted and dropped + // how many times local node overdrafted and dropped mSelfDrops metrics.Counter + + MetricsRegistry metrics.Registry ) -//Prices defines how prices are being passed on to the accounting instance +// Prices defines how prices are being passed on to the accounting instance type Prices interface { - //Return the Price for a message + // Return the Price for a message Price(interface{}) *Price } @@ -57,20 +59,20 @@ const ( Receiver = Payer(false) ) -//Price represents the costs of a message +// Price represents the costs of a message type Price struct { - Value uint64 // - PerByte bool //True if the price is per byte or for unit + Value uint64 + PerByte bool // True if the price is per byte or for unit Payer Payer } -//For gives back the price for a message -//A protocol provides the message price in absolute value -//This method then returns the correct signed amount, -//depending on who pays, which is identified by the `payer` argument: -//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument. -//Thus: If Sending and sender pays, amount positive, otherwise negative -//If Receiving, and receiver pays, amount positive, otherwise negative +// For gives back the price for a message +// A protocol provides the message price in absolute value +// This method then returns the correct signed amount, +// depending on who pays, which is identified by the `payer` argument: +// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument. +// Thus: If Sending and sender pays, amount positive, otherwise negative +// If Receiving, and receiver pays, amount positive, otherwise negative func (p *Price) For(payer Payer, size uint32) int64 { price := p.Value if p.PerByte { @@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 { return int64(price) } -//Balance is the actual accounting instance -//Balance defines the operations needed for accounting -//Implementations internally maintain the balance for every peer +// Balance is the actual accounting instance +// Balance defines the operations needed for accounting +// Implementations internally maintain the balance for every peer type Balance interface { - //Adds amount to the local balance with remote node `peer`; - //positive amount = credit local node - //negative amount = debit local node + // Adds amount to the local balance with remote node `peer`; + // positive amount = credit local node + // negative amount = debit local node Add(amount int64, peer *Peer) error } -//Accounting implements the Hook interface -//It interfaces to the balances through the Balance interface, -//while interfacing with protocols and its prices through the Prices interface +// Accounting implements the Hook interface +// It interfaces to the balances through the Balance interface, +// while interfacing with protocols and its prices through the Prices interface type Accounting struct { - Balance //interface to accounting logic - Prices //interface to prices logic + Balance // interface to accounting logic + Prices // interface to prices logic } func NewAccounting(balance Balance, po Prices) *Accounting { @@ -108,79 +110,77 @@ func NewAccounting(balance Balance, po Prices) *Accounting { return ah } -//SetupAccountingMetrics creates a separate registry for p2p accounting metrics; -//this registry should be independent of any other metrics as it persists at different endpoints. -//It also instantiates the given metrics and starts the persisting go-routine which -//at the passed interval writes the metrics to a LevelDB +// SetupAccountingMetrics creates a separate registry for p2p accounting metrics; +// this registry should be independent of any other metrics as it persists at different endpoints. +// It also instantiates the given metrics and starts the persisting go-routine which +// at the passed interval writes the metrics to a LevelDB func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics { - //create an empty registry - registry := metrics.NewRegistry() - //instantiate the metrics - mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry) - mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry) - mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry) - mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry) - mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry) - mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry) - mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry) - mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry) - //create the DB and start persisting - return NewAccountingMetrics(registry, reportInterval, path) + // create an empty registry + MetricsRegistry = metrics.NewRegistry() + // instantiate the metrics + mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry) + mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry) + mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry) + mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry) + mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry) + mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry) + mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry) + mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry) + // create the DB and start persisting + return NewAccountingMetrics(MetricsRegistry, reportInterval, path) } -//Implement Hook.Send // Send takes a peer, a size and a msg and -// - calculates the cost for the local node sending a msg of size to peer using the Prices interface -// - credits/debits local node using balance interface +// - calculates the cost for the local node sending a msg of size to peer using the Prices interface +// - credits/debits local node using balance interface func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error { - //get the price for a message (through the protocol spec) + // get the price for a message (through the protocol spec) price := ah.Price(msg) - //this message doesn't need accounting + // this message doesn't need accounting if price == nil { return nil } - //evaluate the price for sending messages + // evaluate the price for sending messages costToLocalNode := price.For(Sender, size) - //do the accounting + // do the accounting err := ah.Add(costToLocalNode, peer) - //record metrics: just increase counters for user-facing metrics + // record metrics: just increase counters for user-facing metrics ah.doMetrics(costToLocalNode, size, err) return err } -//Implement Hook.Receive // Receive takes a peer, a size and a msg and -// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface -// - credits/debits local node using balance interface +// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface +// - credits/debits local node using balance interface func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error { - //get the price for a message (through the protocol spec) + // get the price for a message (through the protocol spec) price := ah.Price(msg) - //this message doesn't need accounting + // this message doesn't need accounting if price == nil { return nil } - //evaluate the price for receiving messages + // evaluate the price for receiving messages costToLocalNode := price.For(Receiver, size) - //do the accounting + // do the accounting err := ah.Add(costToLocalNode, peer) - //record metrics: just increase counters for user-facing metrics + // record metrics: just increase counters for user-facing metrics ah.doMetrics(costToLocalNode, size, err) return err } -//record some metrics -//this is not an error handling. `err` is returned by both `Send` and `Receive` -//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped. -//if the limit has been violated and `err` is thus not nil: -// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped -// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft" +// record some metrics +// this is not an error handling. `err` is returned by both `Send` and `Receive` +// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped. +// if the limit has been violated and `err` is thus not nil: +// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped +// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft" func (ah *Accounting) doMetrics(price int64, size uint32, err error) { if price > 0 { mBalanceCredit.Inc(price) mBytesCredit.Inc(int64(size)) mMsgCredit.Inc(1) if err != nil { - //increase the number of times a remote node has been dropped due to "overdraft" + // increase the number of times a remote node has been dropped due to "overdraft" mPeerDrops.Inc(1) } } else { @@ -188,7 +188,7 @@ func (ah *Accounting) doMetrics(price int64, size uint32, err error) { mBytesDebit.Inc(int64(size)) mMsgDebit.Inc(1) if err != nil { - //increase the number of times the local node has done an "overdraft" in respect to other nodes + // increase the number of times the local node has done an "overdraft" in respect to other nodes mSelfDrops.Inc(1) } } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 090bef8d1..2e2c3c418 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -48,28 +48,28 @@ const ( HashSize = 32 ) -//Enumerate options for syncing and retrieval +// Enumerate options for syncing and retrieval type SyncingOption int type RetrievalOption int -//Syncing options +// Syncing options const ( - //Syncing disabled + // Syncing disabled SyncingDisabled SyncingOption = iota - //Register the client and the server but not subscribe + // Register the client and the server but not subscribe SyncingRegisterOnly - //Both client and server funcs are registered, subscribe sent automatically + // Both client and server funcs are registered, subscribe sent automatically SyncingAutoSubscribe ) const ( - //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) + // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) RetrievalDisabled RetrievalOption = iota - //Only the client side of the retrieve request is registered. - //(light nodes do not serve retrieve requests) - //once the client is registered, subscription to retrieve request stream is always sent + // Only the client side of the retrieve request is registered. + // (light nodes do not serve retrieve requests) + // once the client is registered, subscription to retrieve request stream is always sent RetrievalClientOnly - //Both client and server funcs are registered, subscribe sent automatically + // Both client and server funcs are registered, subscribe sent automatically RetrievalEnabled ) @@ -86,18 +86,18 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - autoRetrieval bool //automatically subscribe to retrieve request stream + autoRetrieval bool // automatically subscribe to retrieve request stream maxPeerServers int - spec *protocols.Spec //this protocol's spec - balance protocols.Balance //implements protocols.Balance, for accounting - prices protocols.Prices //implements protocols.Prices, provides prices to accounting + balance protocols.Balance // implements protocols.Balance, for accounting + prices protocols.Prices // implements protocols.Prices, provides prices to accounting + spec *protocols.Spec // this protocol's spec } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption //Defines syncing behavior - Retrieval RetrievalOption //Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior + Retrieval RetrievalOption // Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } @@ -110,7 +110,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - //check if retriaval has been disabled + // check if retrieval has been disabled retrieval := options.Retrieval != RetrievalDisabled streamer := &Registry{ @@ -130,7 +130,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) + // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { @@ -140,20 +140,20 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy }) } - //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) + // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) if options.Retrieval != RetrievalDisabled { streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) }) } - //If syncing is not disabled, the syncing functions are registered (both client and server) + // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } - //if syncing is set to automatically subscribe to the syncing stream, start the subscription process + // if syncing is set to automatically subscribe to the syncing stream, start the subscription process if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop @@ -235,13 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy return streamer } -//we need to construct a spec instance per node instance +// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec +// For simulations to be able to run multiple nodes and not override the hook's balance, +// we need to construct a spec instance per node instance func (r *Registry) setupSpec() { - //first create the "bare" spec + // first create the "bare" spec r.createSpec() - //if balance is nil, this node has been started without swap support (swapEnabled flag is false) + // now create the pricing object + r.createPriceOracle() + // if balance is nil, this node has been started without swap support (swapEnabled flag is false) if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() { - //swap is enabled, so setup the hook + // swap is enabled, so setup the hook r.spec.Hook = protocols.NewAccounting(r.balance, r.prices) } } @@ -533,11 +537,11 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleWantedHashesMsg(ctx, msg) case *ChunkDeliveryMsgRetrieval: - //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *ChunkDeliveryMsgSyncing: - //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) case *RetrieveRequestMsg: @@ -726,9 +730,9 @@ func (c *clientParams) clientCreated() { close(c.clientCreatedC) } -//GetSpec returns the streamer spec to callers -//This used to be a global variable but for simulations with -//multiple nodes its fields (notably the Hook) would be overwritten +// GetSpec returns the streamer spec to callers +// This used to be a global variable but for simulations with +// multiple nodes its fields (notably the Hook) would be overwritten func (r *Registry) GetSpec() *protocols.Spec { return r.spec } @@ -756,6 +760,52 @@ func (r *Registry) createSpec() { r.spec = spec } +// An accountable message needs some meta information attached to it +// in order to evaluate the correct price +type StreamerPrices struct { + priceMatrix map[reflect.Type]*protocols.Price + registry *Registry +} + +// Price implements the accounting interface and returns the price for a specific message +func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price { + t := reflect.TypeOf(msg).Elem() + return sp.priceMatrix[t] +} + +// Instead of hardcoding the price, get it +// through a function - it could be quite complex in the future +func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 { + return uint64(1) +} + +// Instead of hardcoding the price, get it +// through a function - it could be quite complex in the future +func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 { + return uint64(1) +} + +// createPriceOracle sets up a matrix which can be queried to get +// the price for a message via the Price method +func (r *Registry) createPriceOracle() { + sp := &StreamerPrices{ + registry: r, + } + sp.priceMatrix = map[reflect.Type]*protocols.Price{ + reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): { + Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now + PerByte: true, + Payer: protocols.Receiver, + }, + reflect.TypeOf(RetrieveRequestMsg{}): { + Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now + PerByte: false, + Payer: protocols.Sender, + }, + } + r.prices = sp +} + func (r *Registry) Protocols() []p2p.Protocol { return []p2p.Protocol{ { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 77fe55d34..e1b1c8286 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -921,3 +921,34 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { } } } + +//TestHasPriceImplementation is to check that the Registry has a +//`Price` interface implementation +func TestHasPriceImplementation(t *testing.T) { + _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + if r.prices == nil { + t.Fatal("No prices implementation available for the stream protocol") + } + + pricesInstance, ok := r.prices.(*StreamerPrices) + if !ok { + t.Fatal("`Registry` does not have the expected Prices instance") + } + price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{}) + if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() { + t.Fatal("No prices set for chunk delivery msg") + } + + price = pricesInstance.Price(&RetrieveRequestMsg{}) + if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() { + t.Fatal("No prices set for chunk delivery msg") + } +} |