ethdb/pebble: add level file metrics (#28271)

This commit is contained in:
rjl493456442 2023-10-11 15:18:18 +08:00 committed by GitHub
parent 8976a0c97a
commit 7776a3214a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 9 deletions

View File

@ -71,6 +71,8 @@ type Database struct {
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
closed bool // keep track of whether we're Closed closed bool // keep track of whether we're Closed
@ -230,7 +232,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil) db.manualMemAllocGauge = metrics.NewRegisteredGauge(namespace+"memory/manualalloc", nil)
// Start up the metrics gathering and return // Start up the metrics gathering and return
go db.meter(metricsGatheringInterval) go db.meter(metricsGatheringInterval, namespace)
return db, nil return db, nil
} }
@ -427,7 +429,7 @@ func (d *Database) Path() string {
// meter periodically retrieves internal pebble counters and reports them to // meter periodically retrieves internal pebble counters and reports them to
// the metrics subsystem. // the metrics subsystem.
func (d *Database) meter(refresh time.Duration) { func (d *Database) meter(refresh time.Duration, namespace string) {
var errc chan error var errc chan error
timer := time.NewTimer(refresh) timer := time.NewTimer(refresh)
defer timer.Stop() defer timer.Stop()
@ -450,7 +452,7 @@ func (d *Database) meter(refresh time.Duration) {
compRead int64 compRead int64
nWrite int64 nWrite int64
metrics = d.db.Metrics() stats = d.db.Metrics()
compTime = d.compTime.Load() compTime = d.compTime.Load()
writeDelayCount = d.writeDelayCount.Load() writeDelayCount = d.writeDelayCount.Load()
writeDelayTime = d.writeDelayTime.Load() writeDelayTime = d.writeDelayTime.Load()
@ -461,14 +463,14 @@ func (d *Database) meter(refresh time.Duration) {
writeDelayCounts[i%2] = writeDelayCount writeDelayCounts[i%2] = writeDelayCount
compTimes[i%2] = compTime compTimes[i%2] = compTime
for _, levelMetrics := range metrics.Levels { for _, levelMetrics := range stats.Levels {
nWrite += int64(levelMetrics.BytesCompacted) nWrite += int64(levelMetrics.BytesCompacted)
nWrite += int64(levelMetrics.BytesFlushed) nWrite += int64(levelMetrics.BytesFlushed)
compWrite += int64(levelMetrics.BytesCompacted) compWrite += int64(levelMetrics.BytesCompacted)
compRead += int64(levelMetrics.BytesRead) compRead += int64(levelMetrics.BytesRead)
} }
nWrite += int64(metrics.WAL.BytesWritten) nWrite += int64(stats.WAL.BytesWritten)
compWrites[i%2] = compWrite compWrites[i%2] = compWrite
compReads[i%2] = compRead compReads[i%2] = compRead
@ -490,7 +492,7 @@ func (d *Database) meter(refresh time.Duration) {
d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2]) d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
} }
if d.diskSizeGauge != nil { if d.diskSizeGauge != nil {
d.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage())) d.diskSizeGauge.Update(int64(stats.DiskSpaceUsage()))
} }
if d.diskReadMeter != nil { if d.diskReadMeter != nil {
d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
@ -499,12 +501,20 @@ func (d *Database) meter(refresh time.Duration) {
d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2]) d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
} }
// See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054 // See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054
manuallyAllocated := metrics.BlockCache.Size + int64(metrics.MemTable.Size) + int64(metrics.MemTable.ZombieSize) manuallyAllocated := stats.BlockCache.Size + int64(stats.MemTable.Size) + int64(stats.MemTable.ZombieSize)
d.manualMemAllocGauge.Update(manuallyAllocated) d.manualMemAllocGauge.Update(manuallyAllocated)
d.memCompGauge.Update(metrics.Flush.Count) d.memCompGauge.Update(stats.Flush.Count)
d.nonlevel0CompGauge.Update(nonLevel0CompCount) d.nonlevel0CompGauge.Update(nonLevel0CompCount)
d.level0CompGauge.Update(level0CompCount) d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(metrics.Compact.ReadCount) d.seekCompGauge.Update(stats.Compact.ReadCount)
for i, level := range stats.Levels {
// Append metrics for additional layers
if i >= len(d.levelsGauge) {
d.levelsGauge = append(d.levelsGauge, metrics.NewRegisteredGauge(namespace+fmt.Sprintf("tables/level%v", i), nil))
}
d.levelsGauge[i].Update(level.NumFiles)
}
// Sleep a bit, then repeat the stats collection // Sleep a bit, then repeat the stats collection
select { select {