aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-07-02 19:13:46 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-08-21 15:30:57 +0800
commitc51e153b5c5327f971e7b410e49e7babfc3f0b8e (patch)
tree3a2f25a668a887fcb1026ae323480ac5fda3548a
parentd51d0022cee91d6588186455afbe6e54fae2cbf7 (diff)
downloadgo-tangerine-c51e153b5c5327f971e7b410e49e7babfc3f0b8e.tar.gz
go-tangerine-c51e153b5c5327f971e7b410e49e7babfc3f0b8e.tar.zst
go-tangerine-c51e153b5c5327f971e7b410e49e7babfc3f0b8e.zip
eth, metrics, p2p: prepare metrics and net packets to eth/62
-rw-r--r--eth/downloader/downloader.go3
-rw-r--r--eth/handler.go15
-rw-r--r--eth/metrics.go84
-rw-r--r--eth/peer.go50
-rw-r--r--eth/protocol.go25
-rw-r--r--metrics/metrics.go12
-rw-r--r--p2p/metrics.go8
7 files changed, 149 insertions, 48 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index e3e22a784..23d2e045e 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -39,6 +39,7 @@ import (
const (
eth60 = 60 // Constant to check for old protocol support
eth61 = 61 // Constant to check for new protocol support
+ eth62 = 62 // Constant to check for experimental protocol support
)
var (
@@ -329,7 +330,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
if err = d.fetchBlocks60(); err != nil {
return err
}
- case eth61:
+ case eth61, eth62:
// New eth/61, use forward, concurrent hash and block retrieval algorithm
number, err := d.findAncestor(p)
if err != nil {
diff --git a/eth/handler.go b/eth/handler.go
index 5d233dd96..6c1895bbd 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -176,7 +176,7 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- return newPeer(pv, nv, p, rw)
+ return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@@ -281,14 +281,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
case BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
- reqHashInPacketsMeter.Mark(1)
var hashes []common.Hash
if err := msgStream.Decode(&hashes); err != nil {
break
}
- reqHashInTrafficMeter.Mark(int64(32 * len(hashes)))
-
// Deliver them all to the downloader for queuing
err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
@@ -340,7 +337,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
case BlocksMsg:
// Decode the arrived block message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
- reqBlockInPacketsMeter.Mark(1)
var blocks []*types.Block
if err := msgStream.Decode(&blocks); err != nil {
@@ -349,7 +345,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Update the receive timestamp of each block
for _, block := range blocks {
- reqBlockInTrafficMeter.Mark(block.Size().Int64())
block.ReceivedAt = msg.ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
@@ -365,9 +360,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msgStream.Decode(&hashes); err != nil {
break
}
- propHashInPacketsMeter.Mark(1)
- propHashInTrafficMeter.Mark(int64(32 * len(hashes)))
-
// Mark the hashes as present at the remote node
for _, hash := range hashes {
p.MarkBlock(hash)
@@ -390,9 +382,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
- propBlockInPacketsMeter.Mark(1)
- propBlockInTrafficMeter.Mark(request.Block.Size().Int64())
-
if err := request.Block.ValidateFields(); err != nil {
return errResp(ErrDecode, "block validation %v: %v", msg, err)
}
@@ -427,7 +416,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
@@ -436,7 +424,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkTransaction(tx.Hash())
// Log it's arrival for later analysis
- propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
RemoteId: p.ID().String(),
diff --git a/eth/metrics.go b/eth/metrics.go
index 625b90b67..13745dc43 100644
--- a/eth/metrics.go
+++ b/eth/metrics.go
@@ -18,6 +18,7 @@ package eth
import (
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
)
var (
@@ -41,4 +42,87 @@ var (
reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic")
reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets")
reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic")
+ reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/header/in/packets")
+ reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/header/in/traffic")
+ reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/header/out/packets")
+ reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/header/out/traffic")
+ reqStateInPacketsMeter = metrics.NewMeter("eth/req/state/in/packets")
+ reqStateInTrafficMeter = metrics.NewMeter("eth/req/state/in/traffic")
+ reqStateOutPacketsMeter = metrics.NewMeter("eth/req/state/out/packets")
+ reqStateOutTrafficMeter = metrics.NewMeter("eth/req/state/out/traffic")
+ miscInPacketsMeter = metrics.NewMeter("eth/misc/in/packets")
+ miscInTrafficMeter = metrics.NewMeter("eth/misc/in/traffic")
+ miscOutPacketsMeter = metrics.NewMeter("eth/misc/out/packets")
+ miscOutTrafficMeter = metrics.NewMeter("eth/misc/out/traffic")
)
+
+// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
+// accumulating the above defined metrics based on the data stream contents.
+type meteredMsgReadWriter struct {
+ p2p.MsgReadWriter
+}
+
+// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
+// metrics system is disabled, this fucntion returns the original object.
+func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
+ if !metrics.Enabled {
+ return rw
+ }
+ return &meteredMsgReadWriter{rw}
+}
+
+func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ // Read the message and short circuit in case of an error
+ msg, err := rw.MsgReadWriter.ReadMsg()
+ if err != nil {
+ return msg, err
+ }
+ // Account for the data traffic
+ packets, traffic := miscInPacketsMeter, miscInTrafficMeter
+ switch msg.Code {
+ case BlockHashesMsg:
+ packets, traffic = reqHashInPacketsMeter, reqHashInTrafficMeter
+ case BlocksMsg:
+ packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter
+ case BlockHeadersMsg:
+ packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
+ case NodeDataMsg:
+ packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
+ case NewBlockHashesMsg:
+ packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
+ case NewBlockMsg:
+ packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
+ case TxMsg:
+ packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ return msg, err
+}
+
+func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ // Account for the data traffic
+ packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
+ switch msg.Code {
+ case BlockHashesMsg:
+ packets, traffic = reqHashOutPacketsMeter, reqHashOutTrafficMeter
+ case BlocksMsg:
+ packets, traffic = reqBlockOutPacketsMeter, reqBlockOutTrafficMeter
+ case BlockHeadersMsg:
+ packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
+ case NodeDataMsg:
+ packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
+ case NewBlockHashesMsg:
+ packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
+ case NewBlockMsg:
+ packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
+ case TxMsg:
+ packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ // Send the packet to the p2p layer
+ return rw.MsgReadWriter.WriteMsg(msg)
+}
diff --git a/eth/peer.go b/eth/peer.go
index ade1f37ea..c17cdfca7 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -129,9 +129,7 @@ func (p *peer) MarkTransaction(hash common.Hash) {
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
- propTxnOutPacketsMeter.Mark(1)
for _, tx := range txs {
- propTxnOutTrafficMeter.Mark(tx.Size().Int64())
p.knownTxs.Add(tx.Hash())
}
return p2p.Send(p.rw, TxMsg, txs)
@@ -139,27 +137,17 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
// SendBlockHashes sends a batch of known hashes to the remote peer.
func (p *peer) SendBlockHashes(hashes []common.Hash) error {
- reqHashOutPacketsMeter.Mark(1)
- reqHashOutTrafficMeter.Mark(int64(32 * len(hashes)))
-
return p2p.Send(p.rw, BlockHashesMsg, hashes)
}
// SendBlocks sends a batch of blocks to the remote peer.
func (p *peer) SendBlocks(blocks []*types.Block) error {
- reqBlockOutPacketsMeter.Mark(1)
- for _, block := range blocks {
- reqBlockOutTrafficMeter.Mark(block.Size().Int64())
- }
return p2p.Send(p.rw, BlocksMsg, blocks)
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash) error {
- propHashOutPacketsMeter.Mark(1)
- propHashOutTrafficMeter.Mark(int64(32 * len(hashes)))
-
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
@@ -168,33 +156,55 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash) error {
// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
- propBlockOutPacketsMeter.Mark(1)
- propBlockOutTrafficMeter.Mark(block.Size().Int64())
-
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}
+// SendBlockHeaders sends a batch of block headers to the remote peer.
+func (p *peer) SendBlockHeaders(headers []*types.Header) error {
+ return p2p.Send(p.rw, BlockHeadersMsg, headers)
+}
+
+// SendNodeData sends a batch of arbitrary internal data, corresponding to the
+// hashes requested.
+func (p *peer) SendNodeData(data [][]byte) error {
+ return p2p.Send(p.rw, NodeDataMsg, data)
+}
+
// RequestHashes fetches a batch of hashes from a peer, starting at from, going
// towards the genesis block.
func (p *peer) RequestHashes(from common.Hash) error {
- glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from %x...\n", p.id, downloader.MaxHashFetch, from[:4])
+ glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...\n", p, downloader.MaxHashFetch, from[:4])
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)})
}
-// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the
-// requested block number, going upwards towards the genesis block.
+// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at
+// the requested block number, going upwards towards the genesis block.
func (p *peer) RequestHashesFromNumber(from uint64, count int) error {
- glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, count, from)
+ glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...\n", p, count, from)
return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)})
}
// RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
func (p *peer) RequestBlocks(hashes []common.Hash) error {
- glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes))
+ glog.V(logger.Debug).Infof("%v fetching %v blocks\n", p, len(hashes))
return p2p.Send(p.rw, GetBlocksMsg, hashes)
}
+// RequestHeaders fetches a batch of blocks' headers corresponding to the
+// specified hashes.
+func (p *peer) RequestHeaders(hashes []common.Hash) error {
+ glog.V(logger.Debug).Infof("%v fetching %v headers\n", p, len(hashes))
+ return p2p.Send(p.rw, GetBlockHeadersMsg, hashes)
+}
+
+// RequestNodeData fetches a batch of arbitrary data from a node's known state
+// data, corresponding to the specified hashes.
+func (p *peer) RequestNodeData(hashes []common.Hash) error {
+ glog.V(logger.Debug).Infof("%v fetching %v state data\n", p, len(hashes))
+ return p2p.Send(p.rw, GetNodeDataMsg, hashes)
+}
+
// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error {
diff --git a/eth/protocol.go b/eth/protocol.go
index b8c9b50d0..fcc5f21e2 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -24,10 +24,10 @@ import (
)
// Supported versions of the eth protocol (first is primary).
-var ProtocolVersions = []uint{61, 60}
+var ProtocolVersions = []uint{62, 61, 60}
// Number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{9, 8}
+var ProtocolLengths = []uint64{13, 9, 8}
const (
NetworkId = 1
@@ -36,6 +36,7 @@ const (
// eth protocol message codes
const (
+ // Protocol messages belonging to eth/60
StatusMsg = iota
NewBlockHashesMsg
TxMsg
@@ -44,7 +45,15 @@ const (
GetBlocksMsg
BlocksMsg
NewBlockMsg
+
+ // Protocol messages belonging to eth/61
GetBlockHashesFromNumberMsg
+
+ // Protocol messages belonging to eth/62
+ GetBlockHeadersMsg
+ BlockHeadersMsg
+ GetNodeDataMsg
+ NodeDataMsg
)
type errCode int
@@ -102,15 +111,14 @@ type statusData struct {
GenesisBlock common.Hash
}
-// getBlockHashesData is the network packet for the hash based block retrieval
-// message.
+// getBlockHashesData is the network packet for the hash based hash retrieval.
type getBlockHashesData struct {
Hash common.Hash
Amount uint64
}
-// getBlockHashesFromNumberData is the network packet for the number based block
-// retrieval message.
+// getBlockHashesFromNumberData is the network packet for the number based hash
+// retrieval.
type getBlockHashesFromNumberData struct {
Number uint64
Amount uint64
@@ -121,3 +129,8 @@ type newBlockData struct {
Block *types.Block
TD *big.Int
}
+
+// nodeDataData is the network response packet for a node data retrieval.
+type nodeDataData []struct {
+ Value []byte
+}
diff --git a/metrics/metrics.go b/metrics/metrics.go
index 6d1a065ed..fcf8b5c32 100644
--- a/metrics/metrics.go
+++ b/metrics/metrics.go
@@ -31,8 +31,8 @@ import (
// MetricsEnabledFlag is the CLI flag name to use to enable metrics collections.
var MetricsEnabledFlag = "metrics"
-// enabled is the flag specifying if metrics are enable or not.
-var enabled = false
+// Enabled is the flag specifying if metrics are enable or not.
+var Enabled = false
// Init enables or disables the metrics system. Since we need this to run before
// any other code gets to create meters and timers, we'll actually do an ugly hack
@@ -41,7 +41,7 @@ func init() {
for _, arg := range os.Args {
if strings.TrimLeft(arg, "-") == MetricsEnabledFlag {
glog.V(logger.Info).Infof("Enabling metrics collection")
- enabled = true
+ Enabled = true
}
}
}
@@ -49,7 +49,7 @@ func init() {
// NewMeter create a new metrics Meter, either a real one of a NOP stub depending
// on the metrics flag.
func NewMeter(name string) metrics.Meter {
- if !enabled {
+ if !Enabled {
return new(metrics.NilMeter)
}
return metrics.GetOrRegisterMeter(name, metrics.DefaultRegistry)
@@ -58,7 +58,7 @@ func NewMeter(name string) metrics.Meter {
// NewTimer create a new metrics Timer, either a real one of a NOP stub depending
// on the metrics flag.
func NewTimer(name string) metrics.Timer {
- if !enabled {
+ if !Enabled {
return new(metrics.NilTimer)
}
return metrics.GetOrRegisterTimer(name, metrics.DefaultRegistry)
@@ -68,7 +68,7 @@ func NewTimer(name string) metrics.Timer {
// process.
func CollectProcessMetrics(refresh time.Duration) {
// Short circuit if the metrics system is disabled
- if !enabled {
+ if !Enabled {
return
}
// Create the various data collectors
diff --git a/p2p/metrics.go b/p2p/metrics.go
index f98cac274..98b61901d 100644
--- a/p2p/metrics.go
+++ b/p2p/metrics.go
@@ -38,8 +38,14 @@ type meteredConn struct {
}
// newMeteredConn creates a new metered connection, also bumping the ingress or
-// egress connection meter.
+// egress connection meter. If the metrics system is disabled, this function
+// returns the original object.
func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
+ // Short circuit if metrics are disabled
+ if !metrics.Enabled {
+ return conn
+ }
+ // Otherwise bump the connection counters and wrap the connection
if ingress {
ingressConnectMeter.Mark(1)
} else {