diff options
author | Kurkó Mihály <kurkomisi@users.noreply.github.com> | 2018-07-11 15:59:04 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-07-11 15:59:04 +0800 |
commit | a9835c1816bc49ee54c82b4f2a5b05cbcd89881b (patch) | |
tree | e1badefd627aa3a7c4e1937eab22b8fe3eb204d1 /dashboard/dashboard.go | |
parent | 2eedbe799f5eb8766e4808d8a1810cc1c90c4b93 (diff) | |
download | dexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.tar.gz dexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.tar.zst dexon-a9835c1816bc49ee54c82b4f2a5b05cbcd89881b.zip |
cmd, dashboard, log: log collection and exploration (#17097)
* cmd, dashboard, internal, log, node: logging feature
* cmd, dashboard, internal, log: requested changes
* dashboard, vendor: gofmt, govendor, use vendored file watcher
* dashboard, log: gofmt -s -w, goimports
* dashboard, log: gosimple
Diffstat (limited to 'dashboard/dashboard.go')
-rw-r--r-- | dashboard/dashboard.go | 171 |
1 files changed, 86 insertions, 85 deletions
diff --git a/dashboard/dashboard.go b/dashboard/dashboard.go index 399fa34c0..55c254869 100644 --- a/dashboard/dashboard.go +++ b/dashboard/dashboard.go @@ -32,12 +32,15 @@ import ( "sync/atomic" "time" + "io" + "github.com/elastic/gosigar" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/mohae/deepcopy" "golang.org/x/net/websocket" ) @@ -60,10 +63,11 @@ type Dashboard struct { listener net.Listener conns map[uint32]*client // Currently live websocket connections - charts *SystemMessage - commit string + history *Message lock sync.RWMutex // Lock protecting the dashboard's internals + logdir string + quit chan chan error // Channel used for graceful exit wg sync.WaitGroup } @@ -71,30 +75,39 @@ type Dashboard struct { // client represents active websocket connection with a remote browser. type client struct { conn *websocket.Conn // Particular live websocket connection - msg chan Message // Message queue for the update messages + msg chan *Message // Message queue for the update messages logger log.Logger // Logger for the particular live websocket connection } // New creates a new dashboard instance with the given configuration. -func New(config *Config, commit string) (*Dashboard, error) { +func New(config *Config, commit string, logdir string) *Dashboard { now := time.Now() - db := &Dashboard{ + versionMeta := "" + if len(params.VersionMeta) > 0 { + versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta) + } + return &Dashboard{ conns: make(map[uint32]*client), config: config, quit: make(chan chan error), - charts: &SystemMessage{ - ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh), - VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh), - NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh), - NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh), - ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh), - SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh), - DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh), - DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh), + history: &Message{ + General: &GeneralMessage{ + Commit: commit, + Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta), + }, + System: &SystemMessage{ + ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh), + VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh), + NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh), + NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh), + ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh), + SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh), + DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh), + DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh), + }, }, - commit: commit, + logdir: logdir, } - return db, nil } // emptyChartEntries returns a ChartEntry array containing limit number of empty samples. @@ -108,19 +121,20 @@ func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntri return ce } -// Protocols is a meaningless implementation of node.Service. +// Protocols implements the node.Service interface. func (db *Dashboard) Protocols() []p2p.Protocol { return nil } -// APIs is a meaningless implementation of node.Service. +// APIs implements the node.Service interface. func (db *Dashboard) APIs() []rpc.API { return nil } -// Start implements node.Service, starting the data collection thread and the listening server of the dashboard. +// Start starts the data collection thread and the listening server of the dashboard. +// Implements the node.Service interface. func (db *Dashboard) Start(server *p2p.Server) error { log.Info("Starting dashboard") db.wg.Add(2) go db.collectData() - go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add. + go db.streamLogs() http.HandleFunc("/", db.webHandler) http.Handle("/api", websocket.Handler(db.apiHandler)) @@ -136,7 +150,8 @@ func (db *Dashboard) Start(server *p2p.Server) error { return nil } -// Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard. +// Stop stops the data collection thread and the connection listener of the dashboard. +// Implements the node.Service interface. func (db *Dashboard) Stop() error { // Close the connection listener. var errs []error @@ -194,7 +209,7 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { id := atomic.AddUint32(&nextID, 1) client := &client{ conn: conn, - msg: make(chan Message, 128), + msg: make(chan *Message, 128), logger: log.New("id", id), } done := make(chan struct{}) @@ -218,29 +233,10 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { } }() - versionMeta := "" - if len(params.VersionMeta) > 0 { - versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta) - } + db.lock.Lock() // Send the past data. - client.msg <- Message{ - General: &GeneralMessage{ - Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta), - Commit: db.commit, - }, - System: &SystemMessage{ - ActiveMemory: db.charts.ActiveMemory, - VirtualMemory: db.charts.VirtualMemory, - NetworkIngress: db.charts.NetworkIngress, - NetworkEgress: db.charts.NetworkEgress, - ProcessCPU: db.charts.ProcessCPU, - SystemCPU: db.charts.SystemCPU, - DiskRead: db.charts.DiskRead, - DiskWrite: db.charts.DiskWrite, - }, - } + client.msg <- deepcopy.Copy(db.history).(*Message) // Start tracking the connection and drop at connection loss. - db.lock.Lock() db.conns[id] = client db.lock.Unlock() defer func() { @@ -249,29 +245,53 @@ func (db *Dashboard) apiHandler(conn *websocket.Conn) { db.lock.Unlock() }() for { - fail := []byte{} - if _, err := conn.Read(fail); err != nil { + r := new(Request) + if err := websocket.JSON.Receive(conn, r); err != nil { + if err != io.EOF { + client.logger.Warn("Failed to receive request", "err", err) + } close(done) return } - // Ignore all messages + if r.Logs != nil { + db.handleLogRequest(r.Logs, client) + } + } +} + +// meterCollector returns a function, which retrieves a specific meter. +func meterCollector(name string) func() int64 { + if metric := metrics.DefaultRegistry.Get(name); metric != nil { + m := metric.(metrics.Meter) + return func() int64 { + return m.Count() + } + } + return func() int64 { + return 0 } } // collectData collects the required data to plot on the dashboard. func (db *Dashboard) collectData() { defer db.wg.Done() + systemCPUUsage := gosigar.Cpu{} systemCPUUsage.Get() var ( mem runtime.MemStats - prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count() - prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count() + collectNetworkIngress = meterCollector("p2p/InboundTraffic") + collectNetworkEgress = meterCollector("p2p/OutboundTraffic") + collectDiskRead = meterCollector("eth/db/chaindata/disk/read") + collectDiskWrite = meterCollector("eth/db/chaindata/disk/write") + + prevNetworkIngress = collectNetworkIngress() + prevNetworkEgress = collectNetworkEgress() prevProcessCPUTime = getProcessCPUTime() prevSystemCPUUsage = systemCPUUsage - prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count() - prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count() + prevDiskRead = collectDiskRead() + prevDiskWrite = collectDiskWrite() frequency = float64(db.config.Refresh / time.Second) numCPU = float64(runtime.NumCPU()) @@ -285,12 +305,12 @@ func (db *Dashboard) collectData() { case <-time.After(db.config.Refresh): systemCPUUsage.Get() var ( - curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count() - curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count() + curNetworkIngress = collectNetworkIngress() + curNetworkEgress = collectNetworkEgress() curProcessCPUTime = getProcessCPUTime() curSystemCPUUsage = systemCPUUsage - curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count() - curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count() + curDiskRead = collectDiskRead() + curDiskWrite = collectDiskWrite() deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress) deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress) @@ -341,14 +361,17 @@ func (db *Dashboard) collectData() { Time: now, Value: float64(deltaDiskWrite) / frequency, } - db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory) - db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory) - db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress) - db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress) - db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU) - db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU) - db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead) - db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite) + sys := db.history.System + db.lock.Lock() + sys.ActiveMemory = append(sys.ActiveMemory[1:], activeMemory) + sys.VirtualMemory = append(sys.VirtualMemory[1:], virtualMemory) + sys.NetworkIngress = append(sys.NetworkIngress[1:], networkIngress) + sys.NetworkEgress = append(sys.NetworkEgress[1:], networkEgress) + sys.ProcessCPU = append(sys.ProcessCPU[1:], processCPU) + sys.SystemCPU = append(sys.SystemCPU[1:], systemCPU) + sys.DiskRead = append(sys.DiskRead[1:], diskRead) + sys.DiskWrite = append(sys.DiskRead[1:], diskWrite) + db.lock.Unlock() db.sendToAll(&Message{ System: &SystemMessage{ @@ -366,34 +389,12 @@ func (db *Dashboard) collectData() { } } -// collectLogs collects and sends the logs to the active dashboards. -func (db *Dashboard) collectLogs() { - defer db.wg.Done() - - id := 1 - // TODO (kurkomisi): log collection comes here. - for { - select { - case errc := <-db.quit: - errc <- nil - return - case <-time.After(db.config.Refresh / 2): - db.sendToAll(&Message{ - Logs: &LogsMessage{ - Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)}, - }, - }) - id++ - } - } -} - // sendToAll sends the given message to the active dashboards. func (db *Dashboard) sendToAll(msg *Message) { db.lock.Lock() for _, c := range db.conns { select { - case c.msg <- *msg: + case c.msg <- msg: default: c.conn.Close() } |