aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/protocols/accounting.go148
-rw-r--r--swarm/network/stream/stream.go110
-rw-r--r--swarm/network/stream/streamer_test.go31
3 files changed, 185 insertions, 104 deletions
diff --git a/p2p/protocols/accounting.go b/p2p/protocols/accounting.go
index 770406a27..bdc490e59 100644
--- a/p2p/protocols/accounting.go
+++ b/p2p/protocols/accounting.go
@@ -22,31 +22,33 @@ import (
"github.com/ethereum/go-ethereum/metrics"
)
-//define some metrics
+// define some metrics
var (
- //All metrics are cumulative
+ // All metrics are cumulative
- //total amount of units credited
+ // total amount of units credited
mBalanceCredit metrics.Counter
- //total amount of units debited
+ // total amount of units debited
mBalanceDebit metrics.Counter
- //total amount of bytes credited
+ // total amount of bytes credited
mBytesCredit metrics.Counter
- //total amount of bytes debited
+ // total amount of bytes debited
mBytesDebit metrics.Counter
- //total amount of credited messages
+ // total amount of credited messages
mMsgCredit metrics.Counter
- //total amount of debited messages
+ // total amount of debited messages
mMsgDebit metrics.Counter
- //how many times local node had to drop remote peers
+ // how many times local node had to drop remote peers
mPeerDrops metrics.Counter
- //how many times local node overdrafted and dropped
+ // how many times local node overdrafted and dropped
mSelfDrops metrics.Counter
+
+ MetricsRegistry metrics.Registry
)
-//Prices defines how prices are being passed on to the accounting instance
+// Prices defines how prices are being passed on to the accounting instance
type Prices interface {
- //Return the Price for a message
+ // Return the Price for a message
Price(interface{}) *Price
}
@@ -57,20 +59,20 @@ const (
Receiver = Payer(false)
)
-//Price represents the costs of a message
+// Price represents the costs of a message
type Price struct {
- Value uint64 //
- PerByte bool //True if the price is per byte or for unit
+ Value uint64
+ PerByte bool // True if the price is per byte or for unit
Payer Payer
}
-//For gives back the price for a message
-//A protocol provides the message price in absolute value
-//This method then returns the correct signed amount,
-//depending on who pays, which is identified by the `payer` argument:
-//`Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
-//Thus: If Sending and sender pays, amount positive, otherwise negative
-//If Receiving, and receiver pays, amount positive, otherwise negative
+// For gives back the price for a message
+// A protocol provides the message price in absolute value
+// This method then returns the correct signed amount,
+// depending on who pays, which is identified by the `payer` argument:
+// `Send` will pass a `Sender` payer, `Receive` will pass the `Receiver` argument.
+// Thus: If Sending and sender pays, amount positive, otherwise negative
+// If Receiving, and receiver pays, amount positive, otherwise negative
func (p *Price) For(payer Payer, size uint32) int64 {
price := p.Value
if p.PerByte {
@@ -82,22 +84,22 @@ func (p *Price) For(payer Payer, size uint32) int64 {
return int64(price)
}
-//Balance is the actual accounting instance
-//Balance defines the operations needed for accounting
-//Implementations internally maintain the balance for every peer
+// Balance is the actual accounting instance
+// Balance defines the operations needed for accounting
+// Implementations internally maintain the balance for every peer
type Balance interface {
- //Adds amount to the local balance with remote node `peer`;
- //positive amount = credit local node
- //negative amount = debit local node
+ // Adds amount to the local balance with remote node `peer`;
+ // positive amount = credit local node
+ // negative amount = debit local node
Add(amount int64, peer *Peer) error
}
-//Accounting implements the Hook interface
-//It interfaces to the balances through the Balance interface,
-//while interfacing with protocols and its prices through the Prices interface
+// Accounting implements the Hook interface
+// It interfaces to the balances through the Balance interface,
+// while interfacing with protocols and its prices through the Prices interface
type Accounting struct {
- Balance //interface to accounting logic
- Prices //interface to prices logic
+ Balance // interface to accounting logic
+ Prices // interface to prices logic
}
func NewAccounting(balance Balance, po Prices) *Accounting {
@@ -108,79 +110,77 @@ func NewAccounting(balance Balance, po Prices) *Accounting {
return ah
}
-//SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
-//this registry should be independent of any other metrics as it persists at different endpoints.
-//It also instantiates the given metrics and starts the persisting go-routine which
-//at the passed interval writes the metrics to a LevelDB
+// SetupAccountingMetrics creates a separate registry for p2p accounting metrics;
+// this registry should be independent of any other metrics as it persists at different endpoints.
+// It also instantiates the given metrics and starts the persisting go-routine which
+// at the passed interval writes the metrics to a LevelDB
func SetupAccountingMetrics(reportInterval time.Duration, path string) *AccountingMetrics {
- //create an empty registry
- registry := metrics.NewRegistry()
- //instantiate the metrics
- mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", registry)
- mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", registry)
- mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", registry)
- mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", registry)
- mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", registry)
- mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", registry)
- mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", registry)
- mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", registry)
- //create the DB and start persisting
- return NewAccountingMetrics(registry, reportInterval, path)
+ // create an empty registry
+ MetricsRegistry = metrics.NewRegistry()
+ // instantiate the metrics
+ mBalanceCredit = metrics.NewRegisteredCounterForced("account.balance.credit", MetricsRegistry)
+ mBalanceDebit = metrics.NewRegisteredCounterForced("account.balance.debit", MetricsRegistry)
+ mBytesCredit = metrics.NewRegisteredCounterForced("account.bytes.credit", MetricsRegistry)
+ mBytesDebit = metrics.NewRegisteredCounterForced("account.bytes.debit", MetricsRegistry)
+ mMsgCredit = metrics.NewRegisteredCounterForced("account.msg.credit", MetricsRegistry)
+ mMsgDebit = metrics.NewRegisteredCounterForced("account.msg.debit", MetricsRegistry)
+ mPeerDrops = metrics.NewRegisteredCounterForced("account.peerdrops", MetricsRegistry)
+ mSelfDrops = metrics.NewRegisteredCounterForced("account.selfdrops", MetricsRegistry)
+ // create the DB and start persisting
+ return NewAccountingMetrics(MetricsRegistry, reportInterval, path)
}
-//Implement Hook.Send
// Send takes a peer, a size and a msg and
-// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
-// - credits/debits local node using balance interface
+// - calculates the cost for the local node sending a msg of size to peer using the Prices interface
+// - credits/debits local node using balance interface
func (ah *Accounting) Send(peer *Peer, size uint32, msg interface{}) error {
- //get the price for a message (through the protocol spec)
+ // get the price for a message (through the protocol spec)
price := ah.Price(msg)
- //this message doesn't need accounting
+ // this message doesn't need accounting
if price == nil {
return nil
}
- //evaluate the price for sending messages
+ // evaluate the price for sending messages
costToLocalNode := price.For(Sender, size)
- //do the accounting
+ // do the accounting
err := ah.Add(costToLocalNode, peer)
- //record metrics: just increase counters for user-facing metrics
+ // record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}
-//Implement Hook.Receive
// Receive takes a peer, a size and a msg and
-// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
-// - credits/debits local node using balance interface
+// - calculates the cost for the local node receiving a msg of size from peer using the Prices interface
+// - credits/debits local node using balance interface
func (ah *Accounting) Receive(peer *Peer, size uint32, msg interface{}) error {
- //get the price for a message (through the protocol spec)
+ // get the price for a message (through the protocol spec)
price := ah.Price(msg)
- //this message doesn't need accounting
+ // this message doesn't need accounting
if price == nil {
return nil
}
- //evaluate the price for receiving messages
+ // evaluate the price for receiving messages
costToLocalNode := price.For(Receiver, size)
- //do the accounting
+ // do the accounting
err := ah.Add(costToLocalNode, peer)
- //record metrics: just increase counters for user-facing metrics
+ // record metrics: just increase counters for user-facing metrics
ah.doMetrics(costToLocalNode, size, err)
return err
}
-//record some metrics
-//this is not an error handling. `err` is returned by both `Send` and `Receive`
-//`err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
-//if the limit has been violated and `err` is thus not nil:
-// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
-// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
+// record some metrics
+// this is not an error handling. `err` is returned by both `Send` and `Receive`
+// `err` will only be non-nil if a limit has been violated (overdraft), in which case the peer has been dropped.
+// if the limit has been violated and `err` is thus not nil:
+// * if the price is positive, local node has been credited; thus `err` implicitly signals the REMOTE has been dropped
+// * if the price is negative, local node has been debited, thus `err` implicitly signals LOCAL node "overdraft"
func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
if price > 0 {
mBalanceCredit.Inc(price)
mBytesCredit.Inc(int64(size))
mMsgCredit.Inc(1)
if err != nil {
- //increase the number of times a remote node has been dropped due to "overdraft"
+ // increase the number of times a remote node has been dropped due to "overdraft"
mPeerDrops.Inc(1)
}
} else {
@@ -188,7 +188,7 @@ func (ah *Accounting) doMetrics(price int64, size uint32, err error) {
mBytesDebit.Inc(int64(size))
mMsgDebit.Inc(1)
if err != nil {
- //increase the number of times the local node has done an "overdraft" in respect to other nodes
+ // increase the number of times the local node has done an "overdraft" in respect to other nodes
mSelfDrops.Inc(1)
}
}
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 090bef8d1..2e2c3c418 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -48,28 +48,28 @@ const (
HashSize = 32
)
-//Enumerate options for syncing and retrieval
+// Enumerate options for syncing and retrieval
type SyncingOption int
type RetrievalOption int
-//Syncing options
+// Syncing options
const (
- //Syncing disabled
+ // Syncing disabled
SyncingDisabled SyncingOption = iota
- //Register the client and the server but not subscribe
+ // Register the client and the server but not subscribe
SyncingRegisterOnly
- //Both client and server funcs are registered, subscribe sent automatically
+ // Both client and server funcs are registered, subscribe sent automatically
SyncingAutoSubscribe
)
const (
- //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+ // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
RetrievalDisabled RetrievalOption = iota
- //Only the client side of the retrieve request is registered.
- //(light nodes do not serve retrieve requests)
- //once the client is registered, subscription to retrieve request stream is always sent
+ // Only the client side of the retrieve request is registered.
+ // (light nodes do not serve retrieve requests)
+ // once the client is registered, subscription to retrieve request stream is always sent
RetrievalClientOnly
- //Both client and server funcs are registered, subscribe sent automatically
+ // Both client and server funcs are registered, subscribe sent automatically
RetrievalEnabled
)
@@ -86,18 +86,18 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- autoRetrieval bool //automatically subscribe to retrieve request stream
+ autoRetrieval bool // automatically subscribe to retrieve request stream
maxPeerServers int
- spec *protocols.Spec //this protocol's spec
- balance protocols.Balance //implements protocols.Balance, for accounting
- prices protocols.Prices //implements protocols.Prices, provides prices to accounting
+ balance protocols.Balance // implements protocols.Balance, for accounting
+ prices protocols.Prices // implements protocols.Prices, provides prices to accounting
+ spec *protocols.Spec // this protocol's spec
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- Syncing SyncingOption //Defines syncing behavior
- Retrieval RetrievalOption //Defines retrieval behavior
+ Syncing SyncingOption // Defines syncing behavior
+ Retrieval RetrievalOption // Defines retrieval behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
@@ -110,7 +110,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
- //check if retriaval has been disabled
+ // check if retrieval has been disabled
retrieval := options.Retrieval != RetrievalDisabled
streamer := &Registry{
@@ -130,7 +130,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+ // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live {
@@ -140,20 +140,20 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
})
}
- //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+ // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
if options.Retrieval != RetrievalDisabled {
streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
})
}
- //If syncing is not disabled, the syncing functions are registered (both client and server)
+ // If syncing is not disabled, the syncing functions are registered (both client and server)
if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
}
- //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+ // if syncing is set to automatically subscribe to the syncing stream, start the subscription process
if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop
@@ -235,13 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer
}
-//we need to construct a spec instance per node instance
+// This is an accounted protocol, therefore we need to provide a pricing Hook to the spec
+// For simulations to be able to run multiple nodes and not override the hook's balance,
+// we need to construct a spec instance per node instance
func (r *Registry) setupSpec() {
- //first create the "bare" spec
+ // first create the "bare" spec
r.createSpec()
- //if balance is nil, this node has been started without swap support (swapEnabled flag is false)
+ // now create the pricing object
+ r.createPriceOracle()
+ // if balance is nil, this node has been started without swap support (swapEnabled flag is false)
if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
- //swap is enabled, so setup the hook
+ // swap is enabled, so setup the hook
r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
}
}
@@ -533,11 +537,11 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
return p.handleWantedHashesMsg(ctx, msg)
case *ChunkDeliveryMsgRetrieval:
- //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *ChunkDeliveryMsgSyncing:
- //handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
+ // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg
return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg)))
case *RetrieveRequestMsg:
@@ -726,9 +730,9 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC)
}
-//GetSpec returns the streamer spec to callers
-//This used to be a global variable but for simulations with
-//multiple nodes its fields (notably the Hook) would be overwritten
+// GetSpec returns the streamer spec to callers
+// This used to be a global variable but for simulations with
+// multiple nodes its fields (notably the Hook) would be overwritten
func (r *Registry) GetSpec() *protocols.Spec {
return r.spec
}
@@ -756,6 +760,52 @@ func (r *Registry) createSpec() {
r.spec = spec
}
+// An accountable message needs some meta information attached to it
+// in order to evaluate the correct price
+type StreamerPrices struct {
+ priceMatrix map[reflect.Type]*protocols.Price
+ registry *Registry
+}
+
+// Price implements the accounting interface and returns the price for a specific message
+func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price {
+ t := reflect.TypeOf(msg).Elem()
+ return sp.priceMatrix[t]
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getRetrieveRequestMsgPrice() uint64 {
+ return uint64(1)
+}
+
+// Instead of hardcoding the price, get it
+// through a function - it could be quite complex in the future
+func (sp *StreamerPrices) getChunkDeliveryMsgRetrievalPrice() uint64 {
+ return uint64(1)
+}
+
+// createPriceOracle sets up a matrix which can be queried to get
+// the price for a message via the Price method
+func (r *Registry) createPriceOracle() {
+ sp := &StreamerPrices{
+ registry: r,
+ }
+ sp.priceMatrix = map[reflect.Type]*protocols.Price{
+ reflect.TypeOf(ChunkDeliveryMsgRetrieval{}): {
+ Value: sp.getChunkDeliveryMsgRetrievalPrice(), // arbitrary price for now
+ PerByte: true,
+ Payer: protocols.Receiver,
+ },
+ reflect.TypeOf(RetrieveRequestMsg{}): {
+ Value: sp.getRetrieveRequestMsgPrice(), // arbitrary price for now
+ PerByte: false,
+ Payer: protocols.Sender,
+ },
+ }
+ r.prices = sp
+}
+
func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index 77fe55d34..e1b1c8286 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -921,3 +921,34 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
}
}
}
+
+//TestHasPriceImplementation is to check that the Registry has a
+//`Price` interface implementation
+func TestHasPriceImplementation(t *testing.T) {
+ _, r, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
+ })
+ defer teardown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if r.prices == nil {
+ t.Fatal("No prices implementation available for the stream protocol")
+ }
+
+ pricesInstance, ok := r.prices.(*StreamerPrices)
+ if !ok {
+ t.Fatal("`Registry` does not have the expected Prices instance")
+ }
+ price := pricesInstance.Price(&ChunkDeliveryMsgRetrieval{})
+ if price == nil || price.Value == 0 || price.Value != pricesInstance.getChunkDeliveryMsgRetrievalPrice() {
+ t.Fatal("No prices set for chunk delivery msg")
+ }
+
+ price = pricesInstance.Price(&RetrieveRequestMsg{})
+ if price == nil || price.Value == 0 || price.Value != pricesInstance.getRetrieveRequestMsgPrice() {
+ t.Fatal("No prices set for chunk delivery msg")
+ }
+}