aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-06-22 01:23:37 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-06-24 23:33:33 +0800
commit6994a3daaa0acfc8431e33da535c85e23ab319e0 (patch)
tree124b792dfa89a7897b2414d156814e2c56cd0ed0
parent821e01b0139eee9bfab9647e4ac1f2d6f1fb01bc (diff)
downloaddexon-6994a3daaa0acfc8431e33da535c85e23ab319e0.tar.gz
dexon-6994a3daaa0acfc8431e33da535c85e23ab319e0.tar.zst
dexon-6994a3daaa0acfc8431e33da535c85e23ab319e0.zip
p2p: instrument P2P networking layer
-rw-r--r--p2p/dial.go4
-rw-r--r--p2p/metrics.go49
-rw-r--r--p2p/server.go6
3 files changed, 56 insertions, 3 deletions
diff --git a/p2p/dial.go b/p2p/dial.go
index b82d6d1f5..45cd8116b 100644
--- a/p2p/dial.go
+++ b/p2p/dial.go
@@ -196,7 +196,9 @@ func (t *dialTask) Do(srv *Server) {
glog.V(logger.Detail).Infof("dial error: %v", err)
return
}
- srv.setupConn(fd, t.flags, t.dest)
+ mfd := newMeteredConn(fd, false)
+
+ srv.setupConn(mfd, t.flags, t.dest)
}
func (t *dialTask) String() string {
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
diff --git a/p2p/metrics.go b/p2p/metrics.go
new file mode 100644
index 000000000..fbe5b1e90
--- /dev/null
+++ b/p2p/metrics.go
@@ -0,0 +1,49 @@
+// Contains the meters and timers used by the networking layer.
+
+package p2p
+
+import (
+ "net"
+
+ "github.com/rcrowley/go-metrics"
+)
+
+var (
+ ingressConnectMeter = metrics.GetOrRegisterMeter("p2p/InboundConnects", metrics.DefaultRegistry)
+ ingressTrafficMeter = metrics.GetOrRegisterMeter("p2p/InboundTraffic", metrics.DefaultRegistry)
+ egressConnectMeter = metrics.GetOrRegisterMeter("p2p/OutboundConnects", metrics.DefaultRegistry)
+ egressTrafficMeter = metrics.GetOrRegisterMeter("p2p/OutboundTraffic", metrics.DefaultRegistry)
+)
+
+// meteredConn is a wrapper around a network TCP connection that meters both the
+// inbound and outbound network traffic.
+type meteredConn struct {
+ *net.TCPConn // Network connection to wrap with metering
+}
+
+// newMeteredConn creates a new metered connection, also bumping the ingress or
+// egress connection meter.
+func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
+ if ingress {
+ ingressConnectMeter.Mark(1)
+ } else {
+ egressConnectMeter.Mark(1)
+ }
+ return &meteredConn{conn.(*net.TCPConn)}
+}
+
+// Read delegates a network read to the underlying connection, bumping the ingress
+// traffic meter along the way.
+func (c *meteredConn) Read(b []byte) (n int, err error) {
+ n, err = c.TCPConn.Read(b)
+ ingressTrafficMeter.Mark(int64(n))
+ return
+}
+
+// Write delegates a network write to the underlying connection, bumping the
+// egress traffic meter along the way.
+func (c *meteredConn) Write(b []byte) (n int, err error) {
+ n, err = c.TCPConn.Write(b)
+ egressTrafficMeter.Mark(int64(n))
+ return
+}
diff --git a/p2p/server.go b/p2p/server.go
index 5eff70345..9078841a8 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -548,9 +548,11 @@ func (srv *Server) listenLoop() {
if err != nil {
return
}
- glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr())
+ mfd := newMeteredConn(fd, true)
+
+ glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr())
go func() {
- srv.setupConn(fd, inboundConn, nil)
+ srv.setupConn(mfd, inboundConn, nil)
slots <- struct{}{}
}()
}