aboutsummaryrefslogtreecommitdiffstats
path: root/dashboard/dashboard.go
diff options
context:
space:
mode:
authorKurkó Mihály <kurkomisi@users.noreply.github.com>2018-07-11 15:59:04 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-07-11 15:59:04 +0800
commita9835c1816bc49ee54c82b4f2a5b05cbcd89881b (patch)
treee1badefd627aa3a7c4e1937eab22b8fe3eb204d1 /dashboard/dashboard.go
parent2eedbe799f5eb8766e4808d8a1810cc1c90c4b93 (diff)
downloaddexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.tar.gz
dexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.tar.zst
dexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.zip
cmd, dashboard, log: log collection and exploration (#17097)
* cmd, dashboard, internal, log, node: logging feature * cmd, dashboard, internal, log: requested changes * dashboard, vendor: gofmt, govendor, use vendored file watcher * dashboard, log: gofmt -s -w, goimports * dashboard, log: gosimple
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r--dashboard/dashboard.go171
1 files changed, 86 insertions, 85 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go
index 399fa34c0..55c254869 100644
--- a/dashboard/dashboard.go
+++ b/dashboard/dashboard.go
@@ -32,12 +32,15 @@ import (
"sync/atomic"
"time"
+ "io"
+
"github.com/elastic/gosigar"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
+ "github.com/mohae/deepcopy"
"golang.org/x/net/websocket"
)
@@ -60,10 +63,11 @@ type Dashboard struct {
listener net.Listener
conns map[uint32]*client // Currently live websocket connections
- charts *SystemMessage
- commit string
+ history *Message
lock sync.RWMutex // Lock protecting the dashboard's internals
+ logdir string
+
quit chan chan error // Channel used for graceful exit
wg sync.WaitGroup
}
@@ -71,30 +75,39 @@ type Dashboard struct {
// client represents active websocket connection with a remote browser.
type client struct {
conn *websocket.Conn // Particular live websocket connection
- msg chan Message // Message queue for the update messages
+ msg chan *Message // Message queue for the update messages
logger log.Logger // Logger for the particular live websocket connection
}
// New creates a new dashboard instance with the given configuration.
-func New(config *Config, commit string) (*Dashboard, error) {
+func New(config *Config, commit string, logdir string) *Dashboard {
now := time.Now()
- db := &Dashboard{
+ versionMeta := ""
+ if len(params.VersionMeta) > 0 {
+ versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
+ }
+ return &Dashboard{
conns: make(map[uint32]*client),
config: config,
quit: make(chan chan error),
- charts: &SystemMessage{
- ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
- VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
- NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
- NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
- ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
- SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
- DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
- DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
+ history: &Message{
+ General: &GeneralMessage{
+ Commit: commit,
+ Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
+ },
+ System: &SystemMessage{
+ ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
+ VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
+ NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
+ NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
+ ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
+ SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
+ DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
+ DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
+ },
},
- commit: commit,
+ logdir: logdir,
}
- return db, nil
}
// emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
@@ -108,19 +121,20 @@ func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntri
return ce
}
-// Protocols is a meaningless implementation of node.Service.
+// Protocols implements the node.Service interface.
func (db *Dashboard) Protocols() []p2p.Protocol { return nil }
-// APIs is a meaningless implementation of node.Service.
+// APIs implements the node.Service interface.
func (db *Dashboard) APIs() []rpc.API { return nil }
-// Start implements node.Service, starting the data collection thread and the listening server of the dashboard.
+// Start starts the data collection thread and the listening server of the dashboard.
+// Implements the node.Service interface.
func (db *Dashboard) Start(server *p2p.Server) error {
log.Info("Starting dashboard")
db.wg.Add(2)
go db.collectData()
- go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add.
+ go db.streamLogs()
http.HandleFunc("/", db.webHandler)
http.Handle("/api", websocket.Handler(db.apiHandler))
@@ -136,7 +150,8 @@ func (db *Dashboard) Start(server *p2p.Server) error {
return nil
}
-// Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard.
+// Stop stops the data collection thread and the connection listener of the dashboard.
+// Implements the node.Service interface.
func (db *Dashboard) Stop() error {
// Close the connection listener.
var errs []error
@@ -194,7 +209,7 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
id := atomic.AddUint32(&nextID, 1)
client := &client{
conn: conn,
- msg: make(chan Message, 128),
+ msg: make(chan *Message, 128),
logger: log.New("id", id),
}
done := make(chan struct{})
@@ -218,29 +233,10 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
}
}()
- versionMeta := ""
- if len(params.VersionMeta) > 0 {
- versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
- }
+ db.lock.Lock()
// Send the past data.
- client.msg <- Message{
- General: &GeneralMessage{
- Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
- Commit: db.commit,
- },
- System: &SystemMessage{
- ActiveMemory: db.charts.ActiveMemory,
- VirtualMemory: db.charts.VirtualMemory,
- NetworkIngress: db.charts.NetworkIngress,
- NetworkEgress: db.charts.NetworkEgress,
- ProcessCPU: db.charts.ProcessCPU,
- SystemCPU: db.charts.SystemCPU,
- DiskRead: db.charts.DiskRead,
- DiskWrite: db.charts.DiskWrite,
- },
- }
+ client.msg <- deepcopy.Copy(db.history).(*Message)
// Start tracking the connection and drop at connection loss.
- db.lock.Lock()
db.conns[id] = client
db.lock.Unlock()
defer func() {
@@ -249,29 +245,53 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) {
db.lock.Unlock()
}()
for {
- fail := []byte{}
- if _, err := conn.Read(fail); err != nil {
+ r := new(Request)
+ if err := websocket.JSON.Receive(conn, r); err != nil {
+ if err != io.EOF {
+ client.logger.Warn("Failed to receive request", "err", err)
+ }
close(done)
return
}
- // Ignore all messages
+ if r.Logs != nil {
+ db.handleLogRequest(r.Logs, client)
+ }
+ }
+}
+
+// meterCollector returns a function, which retrieves a specific meter.
+func meterCollector(name string) func() int64 {
+ if metric := metrics.DefaultRegistry.Get(name); metric != nil {
+ m := metric.(metrics.Meter)
+ return func() int64 {
+ return m.Count()
+ }
+ }
+ return func() int64 {
+ return 0
}
}
// collectData collects the required data to plot on the dashboard.
func (db *Dashboard) collectData() {
defer db.wg.Done()
+
systemCPUUsage := gosigar.Cpu{}
systemCPUUsage.Get()
var (
mem runtime.MemStats
- prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
- prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
+ collectNetworkIngress = meterCollector("p2p/InboundTraffic")
+ collectNetworkEgress = meterCollector("p2p/OutboundTraffic")
+ collectDiskRead = meterCollector("eth/db/chaindata/disk/read")
+ collectDiskWrite = meterCollector("eth/db/chaindata/disk/write")
+
+ prevNetworkIngress = collectNetworkIngress()
+ prevNetworkEgress = collectNetworkEgress()
prevProcessCPUTime = getProcessCPUTime()
prevSystemCPUUsage = systemCPUUsage
- prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
- prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
+ prevDiskRead = collectDiskRead()
+ prevDiskWrite = collectDiskWrite()
frequency = float64(db.config.Refresh / time.Second)
numCPU = float64(runtime.NumCPU())
@@ -285,12 +305,12 @@ func (db *Dashboard) collectData() {
case <-time.After(db.config.Refresh):
systemCPUUsage.Get()
var (
- curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
- curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
+ curNetworkIngress = collectNetworkIngress()
+ curNetworkEgress = collectNetworkEgress()
curProcessCPUTime = getProcessCPUTime()
curSystemCPUUsage = systemCPUUsage
- curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
- curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
+ curDiskRead = collectDiskRead()
+ curDiskWrite = collectDiskWrite()
deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
@@ -341,14 +361,17 @@ func (db *Dashboard) collectData() {
Time: now,
Value: float64(deltaDiskWrite) / frequency,
}
- db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory)
- db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory)
- db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress)
- db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress)
- db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU)
- db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU)
- db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead)
- db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite)
+ sys := db.history.System
+ db.lock.Lock()
+ sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory)
+ sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory)
+ sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress)
+ sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress)
+ sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU)
+ sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU)
+ sys.DiskRead = append(sys.DiskRead[1:], diskRead)
+ sys.DiskWrite = append(sys.DiskRead[1:], diskWrite)
+ db.lock.Unlock()
db.sendToAll(&Message{
System: &SystemMessage{
@@ -366,34 +389,12 @@ func (db *Dashboard) collectData() {
}
}
-// collectLogs collects and sends the logs to the active dashboards.
-func (db *Dashboard) collectLogs() {
- defer db.wg.Done()
-
- id := 1
- // TODO (kurkomisi): log collection comes here.
- for {
- select {
- case errc := <-db.quit:
- errc <- nil
- return
- case <-time.After(db.config.Refresh / 2):
- db.sendToAll(&Message{
- Logs: &LogsMessage{
- Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)},
- },
- })
- id++
- }
- }
-}
-
// sendToAll sends the given message to the active dashboards.
func (db *Dashboard) sendToAll(msg *Message) {
db.lock.Lock()
for _, c := range db.conns {
select {
- case c.msg <- *msg:
+ case c.msg <- msg:
default:
c.conn.Close()
}