aboutsummaryrefslogtreecommitdiffstats
path: root/ethdb/database.go
diff options
context:
space:
mode:
Diffstat (limited to 'ethdb/database.go')
-rw-r--r--ethdb/database.go151
1 files changed, 69 insertions, 82 deletions
diff --git a/ethdb/database.go b/ethdb/database.go
index e82528f25..7d5fb0b9e 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -17,8 +17,6 @@
package ethdb
import (
- "fmt"
- "path/filepath"
"strconv"
"strings"
"sync"
@@ -37,20 +35,6 @@ import (
var OpenFileLimit = 64
-// cacheRatio specifies how the total allotted cache is distributed between the
-// various system databases.
-var cacheRatio = map[string]float64{
- "chaindata": 1.0,
- "lightchaindata": 1.0,
-}
-
-// handleRatio specifies how the total allotted file descriptors is distributed
-// between the various system databases.
-var handleRatio = map[string]float64{
- "chaindata": 1.0,
- "lightchaindata": 1.0,
-}
-
type LDBDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
@@ -67,20 +51,22 @@ type LDBDatabase struct {
quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
+
+ log log.Logger // Contextual logger tracking the database path
}
// NewLDBDatabase returns a LevelDB wrapped object.
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
- // Calculate the cache and file descriptor allowance for this particular database
- cache = int(float64(cache) * cacheRatio[filepath.Base(file)])
+ logger := log.New("database", file)
+
+ // Ensure we have some minimal caching and file guarantees
if cache < 16 {
cache = 16
}
- handles = int(float64(handles) * handleRatio[filepath.Base(file)])
if handles < 16 {
handles = 16
}
- log.Info(fmt.Sprintf("Allotted %dMB cache and %d file handles to %s", cache, handles, file))
+ logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)
// Open the db and recover any potential corruptions
db, err := leveldb.OpenFile(file, &opt.Options{
@@ -97,8 +83,9 @@ func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
return nil, err
}
return &LDBDatabase{
- fn: file,
- db: db,
+ fn: file,
+ db: db,
+ log: logger,
}, nil
}
@@ -108,103 +95,103 @@ func (db *LDBDatabase) Path() string {
}
// Put puts the given key / value to the queue
-func (self *LDBDatabase) Put(key []byte, value []byte) error {
+func (db *LDBDatabase) Put(key []byte, value []byte) error {
// Measure the database put latency, if requested
- if self.putTimer != nil {
- defer self.putTimer.UpdateSince(time.Now())
+ if db.putTimer != nil {
+ defer db.putTimer.UpdateSince(time.Now())
}
// Generate the data to write to disk, update the meter and write
//value = rle.Compress(value)
- if self.writeMeter != nil {
- self.writeMeter.Mark(int64(len(value)))
+ if db.writeMeter != nil {
+ db.writeMeter.Mark(int64(len(value)))
}
- return self.db.Put(key, value, nil)
+ return db.db.Put(key, value, nil)
}
// Get returns the given key if it's present.
-func (self *LDBDatabase) Get(key []byte) ([]byte, error) {
+func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
// Measure the database get latency, if requested
- if self.getTimer != nil {
- defer self.getTimer.UpdateSince(time.Now())
+ if db.getTimer != nil {
+ defer db.getTimer.UpdateSince(time.Now())
}
// Retrieve the key and increment the miss counter if not found
- dat, err := self.db.Get(key, nil)
+ dat, err := db.db.Get(key, nil)
if err != nil {
- if self.missMeter != nil {
- self.missMeter.Mark(1)
+ if db.missMeter != nil {
+ db.missMeter.Mark(1)
}
return nil, err
}
// Otherwise update the actually retrieved amount of data
- if self.readMeter != nil {
- self.readMeter.Mark(int64(len(dat)))
+ if db.readMeter != nil {
+ db.readMeter.Mark(int64(len(dat)))
}
return dat, nil
//return rle.Decompress(dat)
}
// Delete deletes the key from the queue and database
-func (self *LDBDatabase) Delete(key []byte) error {
+func (db *LDBDatabase) Delete(key []byte) error {
// Measure the database delete latency, if requested
- if self.delTimer != nil {
- defer self.delTimer.UpdateSince(time.Now())
+ if db.delTimer != nil {
+ defer db.delTimer.UpdateSince(time.Now())
}
// Execute the actual operation
- return self.db.Delete(key, nil)
+ return db.db.Delete(key, nil)
}
-func (self *LDBDatabase) NewIterator() iterator.Iterator {
- return self.db.NewIterator(nil, nil)
+func (db *LDBDatabase) NewIterator() iterator.Iterator {
+ return db.db.NewIterator(nil, nil)
}
-func (self *LDBDatabase) Close() {
+func (db *LDBDatabase) Close() {
// Stop the metrics collection to avoid internal database races
- self.quitLock.Lock()
- defer self.quitLock.Unlock()
+ db.quitLock.Lock()
+ defer db.quitLock.Unlock()
- if self.quitChan != nil {
+ if db.quitChan != nil {
errc := make(chan error)
- self.quitChan <- errc
+ db.quitChan <- errc
if err := <-errc; err != nil {
- log.Error(fmt.Sprintf("metrics failure in '%s': %v\n", self.fn, err))
+ db.log.Error("Metrics collection failed", "err", err)
}
}
- err := self.db.Close()
+ err := db.db.Close()
if err == nil {
- log.Info(fmt.Sprint("closed db:", self.fn))
+ db.log.Info("Database closed")
} else {
- log.Error(fmt.Sprintf("error closing db %s: %v", self.fn, err))
+ db.log.Error("Failed to close database", "err", err)
}
}
-func (self *LDBDatabase) LDB() *leveldb.DB {
- return self.db
+func (db *LDBDatabase) LDB() *leveldb.DB {
+ return db.db
}
// Meter configures the database metrics collectors and
-func (self *LDBDatabase) Meter(prefix string) {
+func (db *LDBDatabase) Meter(prefix string) {
// Short circuit metering if the metrics system is disabled
if !metrics.Enabled {
return
}
// Initialize all the metrics collector at the requested prefix
- self.getTimer = metrics.NewTimer(prefix + "user/gets")
- self.putTimer = metrics.NewTimer(prefix + "user/puts")
- self.delTimer = metrics.NewTimer(prefix + "user/dels")
- self.missMeter = metrics.NewMeter(prefix + "user/misses")
- self.readMeter = metrics.NewMeter(prefix + "user/reads")
- self.writeMeter = metrics.NewMeter(prefix + "user/writes")
- self.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
- self.compReadMeter = metrics.NewMeter(prefix + "compact/input")
- self.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
+ db.getTimer = metrics.NewTimer(prefix + "user/gets")
+ db.putTimer = metrics.NewTimer(prefix + "user/puts")
+ db.delTimer = metrics.NewTimer(prefix + "user/dels")
+ db.missMeter = metrics.NewMeter(prefix + "user/misses")
+ db.readMeter = metrics.NewMeter(prefix + "user/reads")
+ db.writeMeter = metrics.NewMeter(prefix + "user/writes")
+ db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
+ db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
+ db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
// Create a quit channel for the periodic collector and run it
- self.quitLock.Lock()
- self.quitChan = make(chan chan error)
- self.quitLock.Unlock()
+ db.quitLock.Lock()
+ db.quitChan = make(chan chan error)
+ db.quitLock.Unlock()
- go self.meter(3 * time.Second)
+ go db.meter(3 * time.Second)
}
// meter periodically retrieves internal leveldb counters and reports them to
@@ -218,7 +205,7 @@ func (self *LDBDatabase) Meter(prefix string) {
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
-func (self *LDBDatabase) meter(refresh time.Duration) {
+func (db *LDBDatabase) meter(refresh time.Duration) {
// Create the counters to store current and previous values
counters := make([][]float64, 2)
for i := 0; i < 2; i++ {
@@ -227,9 +214,9 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
// Iterate ad infinitum and collect the stats
for i := 1; ; i++ {
// Retrieve the database stats
- stats, err := self.db.GetProperty("leveldb.stats")
+ stats, err := db.db.GetProperty("leveldb.stats")
if err != nil {
- log.Error(fmt.Sprintf("failed to read database stats: %v", err))
+ db.log.Error("Failed to read database stats", "err", err)
return
}
// Find the compaction table, skip the header
@@ -238,7 +225,7 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
lines = lines[1:]
}
if len(lines) <= 3 {
- log.Error(fmt.Sprintf("compaction table not found"))
+ db.log.Error("Compaction table not found")
return
}
lines = lines[3:]
@@ -253,27 +240,27 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
break
}
for idx, counter := range parts[3:] {
- if value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64); err != nil {
- log.Error(fmt.Sprintf("compaction entry parsing failed: %v", err))
+ value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
+ if err != nil {
+ db.log.Error("Compaction entry parsing failed", "err", err)
return
- } else {
- counters[i%2][idx] += value
}
+ counters[i%2][idx] += value
}
}
// Update all the requested meters
- if self.compTimeMeter != nil {
- self.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
+ if db.compTimeMeter != nil {
+ db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
- if self.compReadMeter != nil {
- self.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
+ if db.compReadMeter != nil {
+ db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
}
- if self.compWriteMeter != nil {
- self.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
+ if db.compWriteMeter != nil {
+ db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
}
// Sleep a bit, then repeat the stats collection
select {
- case errc := <-self.quitChan:
+ case errc := <-db.quitChan:
// Quit requesting, stop hammering the database
errc <- nil
return