diff options
Diffstat (limited to 'ethdb/database.go')
-rw-r--r-- | ethdb/database.go | 102 |
1 files changed, 59 insertions, 43 deletions
diff --git a/ethdb/database.go b/ethdb/database.go index 57d38f7f5..8c557e482 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -37,15 +37,11 @@ type LDBDatabase struct { fn string // filename for reporting db *leveldb.DB // LevelDB instance - getTimer metrics.Timer // Timer for measuring the database get request counts and latencies - putTimer metrics.Timer // Timer for measuring the database put request counts and latencies - delTimer metrics.Timer // Timer for measuring the database delete request counts and latencies - missMeter metrics.Meter // Meter for measuring the missed database get requests - readMeter metrics.Meter // Meter for measuring the database get request data usage - writeMeter metrics.Meter // Meter for measuring the database put request data usage compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction compReadMeter metrics.Meter // Meter for measuring the data read during compaction compWriteMeter metrics.Meter // Meter for measuring the data written during compaction + diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read + diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written quitLock sync.Mutex // Mutex protecting the quit channel access quitChan chan chan error // Quit channel to stop the metrics collection before closing the database @@ -94,16 +90,9 @@ func (db *LDBDatabase) Path() string { // Put puts the given key / value to the queue func (db *LDBDatabase) Put(key []byte, value []byte) error { - // Measure the database put latency, if requested - if db.putTimer != nil { - defer db.putTimer.UpdateSince(time.Now()) - } // Generate the data to write to disk, update the meter and write //value = rle.Compress(value) - if db.writeMeter != nil { - db.writeMeter.Mark(int64(len(value))) - } return db.db.Put(key, value, nil) } @@ -113,32 +102,17 @@ func (db *LDBDatabase) Has(key []byte) (bool, error) { // Get returns the given key if it's present. func (db *LDBDatabase) Get(key []byte) ([]byte, error) { - // Measure the database get latency, if requested - if db.getTimer != nil { - defer db.getTimer.UpdateSince(time.Now()) - } // Retrieve the key and increment the miss counter if not found dat, err := db.db.Get(key, nil) if err != nil { - if db.missMeter != nil { - db.missMeter.Mark(1) - } return nil, err } - // Otherwise update the actually retrieved amount of data - if db.readMeter != nil { - db.readMeter.Mark(int64(len(dat))) - } return dat, nil //return rle.Decompress(dat) } // Delete deletes the key from the queue and database func (db *LDBDatabase) Delete(key []byte) error { - // Measure the database delete latency, if requested - if db.delTimer != nil { - defer db.delTimer.UpdateSince(time.Now()) - } // Execute the actual operation return db.db.Delete(key, nil) } @@ -178,15 +152,11 @@ func (db *LDBDatabase) Meter(prefix string) { return } // Initialize all the metrics collector at the requested prefix - db.getTimer = metrics.NewRegisteredTimer(prefix+"user/gets", nil) - db.putTimer = metrics.NewRegisteredTimer(prefix+"user/puts", nil) - db.delTimer = metrics.NewRegisteredTimer(prefix+"user/dels", nil) - db.missMeter = metrics.NewRegisteredMeter(prefix+"user/misses", nil) - db.readMeter = metrics.NewRegisteredMeter(prefix+"user/reads", nil) - db.writeMeter = metrics.NewRegisteredMeter(prefix+"user/writes", nil) db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) + db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) + db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) // Create a quit channel for the periodic collector and run it db.quitLock.Lock() @@ -207,12 +177,17 @@ func (db *LDBDatabase) Meter(prefix string) { // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294 // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884 // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000 +// +// This is how the iostats look like (currently): +// Read(MB):3895.04860 Write(MB):3654.64712 func (db *LDBDatabase) meter(refresh time.Duration) { - // Create the counters to store current and previous values - counters := make([][]float64, 2) + // Create the counters to store current and previous compaction values + compactions := make([][]float64, 2) for i := 0; i < 2; i++ { - counters[i] = make([]float64, 3) + compactions[i] = make([]float64, 3) } + // Create storage for iostats. + var iostats [2]float64 // Iterate ad infinitum and collect the stats for i := 1; ; i++ { // Retrieve the database stats @@ -233,8 +208,8 @@ func (db *LDBDatabase) meter(refresh time.Duration) { lines = lines[3:] // Iterate over all the table rows, and accumulate the entries - for j := 0; j < len(counters[i%2]); j++ { - counters[i%2][j] = 0 + for j := 0; j < len(compactions[i%2]); j++ { + compactions[i%2][j] = 0 } for _, line := range lines { parts := strings.Split(line, "|") @@ -247,19 +222,60 @@ func (db *LDBDatabase) meter(refresh time.Duration) { db.log.Error("Compaction entry parsing failed", "err", err) return } - counters[i%2][idx] += value + compactions[i%2][idx] += value } } // Update all the requested meters if db.compTimeMeter != nil { - db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000)) + db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) } if db.compReadMeter != nil { - db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024)) + db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) } if db.compWriteMeter != nil { - db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024)) + db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + } + + // Retrieve the database iostats. + ioStats, err := db.db.GetProperty("leveldb.iostats") + if err != nil { + db.log.Error("Failed to read database iostats", "err", err) + return + } + parts := strings.Split(ioStats, " ") + if len(parts) < 2 { + db.log.Error("Bad syntax of ioStats", "ioStats", ioStats) + return } + r := strings.Split(parts[0], ":") + if len(r) < 2 { + db.log.Error("Bad syntax of read entry", "entry", parts[0]) + return + } + read, err := strconv.ParseFloat(r[1], 64) + if err != nil { + db.log.Error("Read entry parsing failed", "err", err) + return + } + w := strings.Split(parts[1], ":") + if len(w) < 2 { + db.log.Error("Bad syntax of write entry", "entry", parts[1]) + return + } + write, err := strconv.ParseFloat(w[1], 64) + if err != nil { + db.log.Error("Write entry parsing failed", "err", err) + return + } + if db.diskReadMeter != nil { + db.diskReadMeter.Mark(int64((read - iostats[0]) * 1024 * 1024)) + } + if db.diskWriteMeter != nil { + db.diskWriteMeter.Mark(int64((write - iostats[1]) * 1024 * 1024)) + } + iostats[0] = read + iostats[1] = write + // Sleep a bit, then repeat the stats collection select { case errc := <-db.quitChan: |