658 lines
22 KiB
Go
658 lines
22 KiB
Go
// Copyright 2023 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package pebble implements the key-value database layer based on pebble.
|
|
package pebble
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/pebble"
|
|
"github.com/cockroachdb/pebble/bloom"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
)
|
|
|
|
const (
|
|
// minCache is the minimum amount of memory in megabytes to allocate to pebble
|
|
// read and write caching, split half and half.
|
|
minCache = 16
|
|
|
|
// minHandles is the minimum number of files handles to allocate to the open
|
|
// database files.
|
|
minHandles = 16
|
|
|
|
// metricsGatheringInterval specifies the interval to retrieve pebble database
|
|
// compaction, io and pause stats to report to the user.
|
|
metricsGatheringInterval = 3 * time.Second
|
|
|
|
// degradationWarnInterval specifies how often warning should be printed if the
|
|
// leveldb database cannot keep up with requested writes.
|
|
degradationWarnInterval = time.Minute
|
|
)
|
|
|
|
// Database is a persistent key-value store based on the pebble storage engine.
|
|
// Apart from basic data storage functionality it also supports batch writes and
|
|
// iterating over the keyspace in binary-alphabetical order.
|
|
type Database struct {
|
|
fn string // filename for reporting
|
|
db *pebble.DB // Underlying pebble storage engine
|
|
|
|
compTimeMeter *metrics.Meter // Meter for measuring the total time spent in database compaction
|
|
compReadMeter *metrics.Meter // Meter for measuring the data read during compaction
|
|
compWriteMeter *metrics.Meter // Meter for measuring the data written during compaction
|
|
writeDelayNMeter *metrics.Meter // Meter for measuring the write delay number due to database compaction
|
|
writeDelayMeter *metrics.Meter // Meter for measuring the write delay duration due to database compaction
|
|
diskSizeGauge *metrics.Gauge // Gauge for tracking the size of all the levels in the database
|
|
diskReadMeter *metrics.Meter // Meter for measuring the effective amount of data read
|
|
diskWriteMeter *metrics.Meter // Meter for measuring the effective amount of data written
|
|
memCompGauge *metrics.Gauge // Gauge for tracking the number of memory compaction
|
|
level0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in level0
|
|
nonlevel0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
|
|
seekCompGauge *metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
|
|
manualMemAllocGauge *metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
|
|
|
|
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels
|
|
|
|
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
|
|
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
|
|
closed bool // keep track of whether we're Closed
|
|
|
|
log log.Logger // Contextual logger tracking the database path
|
|
|
|
activeComp int // Current number of active compactions
|
|
compStartTime time.Time // The start time of the earliest currently-active compaction
|
|
compTime atomic.Int64 // Total time spent in compaction in ns
|
|
level0Comp atomic.Uint32 // Total number of level-zero compactions
|
|
nonLevel0Comp atomic.Uint32 // Total number of non level-zero compactions
|
|
|
|
writeStalled atomic.Bool // Flag whether the write is stalled
|
|
writeDelayStartTime time.Time // The start time of the latest write stall
|
|
writeDelayCount atomic.Int64 // Total number of write stall counts
|
|
writeDelayTime atomic.Int64 // Total time spent in write stalls
|
|
|
|
writeOptions *pebble.WriteOptions
|
|
}
|
|
|
|
func (d *Database) onCompactionBegin(info pebble.CompactionInfo) {
|
|
if d.activeComp == 0 {
|
|
d.compStartTime = time.Now()
|
|
}
|
|
l0 := info.Input[0]
|
|
if l0.Level == 0 {
|
|
d.level0Comp.Add(1)
|
|
} else {
|
|
d.nonLevel0Comp.Add(1)
|
|
}
|
|
d.activeComp++
|
|
}
|
|
|
|
func (d *Database) onCompactionEnd(info pebble.CompactionInfo) {
|
|
if d.activeComp == 1 {
|
|
d.compTime.Add(int64(time.Since(d.compStartTime)))
|
|
} else if d.activeComp == 0 {
|
|
panic("should not happen")
|
|
}
|
|
d.activeComp--
|
|
}
|
|
|
|
func (d *Database) onWriteStallBegin(b pebble.WriteStallBeginInfo) {
|
|
d.writeDelayStartTime = time.Now()
|
|
d.writeDelayCount.Add(1)
|
|
d.writeStalled.Store(true)
|
|
}
|
|
|
|
func (d *Database) onWriteStallEnd() {
|
|
d.writeDelayTime.Add(int64(time.Since(d.writeDelayStartTime)))
|
|
d.writeStalled.Store(false)
|
|
}
|
|
|
|
// panicLogger is just a noop logger to disable Pebble's internal logger.
|
|
//
|
|
// TODO(karalabe): Remove when Pebble sets this as the default.
|
|
type panicLogger struct{}
|
|
|
|
func (l panicLogger) Infof(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (l panicLogger) Errorf(format string, args ...interface{}) {
|
|
}
|
|
|
|
func (l panicLogger) Fatalf(format string, args ...interface{}) {
|
|
panic(fmt.Errorf("fatal: "+format, args...))
|
|
}
|
|
|
|
// New returns a wrapped pebble DB object. The namespace is the prefix that the
|
|
// metrics reporting should use for surfacing internal stats.
|
|
func New(file string, cache int, handles int, namespace string, readonly bool) (*Database, error) {
|
|
// Ensure we have some minimal caching and file guarantees
|
|
if cache < minCache {
|
|
cache = minCache
|
|
}
|
|
if handles < minHandles {
|
|
handles = minHandles
|
|
}
|
|
logger := log.New("database", file)
|
|
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
|
|
|
|
// The max memtable size is limited by the uint32 offsets stored in
|
|
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
|
|
//
|
|
// - MaxUint32 on 64-bit platforms;
|
|
// - MaxInt on 32-bit platforms.
|
|
//
|
|
// It is used when slices are limited to Uint32 on 64-bit platforms (the
|
|
// length limit for slices is naturally MaxInt on 32-bit platforms).
|
|
//
|
|
// Taken from https://github.com/cockroachdb/pebble/blob/master/internal/constants/constants.go
|
|
maxMemTableSize := (1<<31)<<(^uint(0)>>63) - 1
|
|
|
|
// Two memory tables is configured which is identical to leveldb,
|
|
// including a frozen memory table and another live one.
|
|
memTableLimit := 2
|
|
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
|
|
|
|
// The memory table size is currently capped at maxMemTableSize-1 due to a
|
|
// known bug in the pebble where maxMemTableSize is not recognized as a
|
|
// valid size.
|
|
//
|
|
// TODO use the maxMemTableSize as the maximum table size once the issue
|
|
// in pebble is fixed.
|
|
if memTableSize >= maxMemTableSize {
|
|
memTableSize = maxMemTableSize - 1
|
|
}
|
|
db := &Database{
|
|
fn: file,
|
|
log: logger,
|
|
quitChan: make(chan chan error),
|
|
writeOptions: &pebble.WriteOptions{Sync: false},
|
|
}
|
|
opt := &pebble.Options{
|
|
// Pebble has a single combined cache area and the write
|
|
// buffers are taken from this too. Assign all available
|
|
// memory allowance for cache.
|
|
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
|
|
MaxOpenFiles: handles,
|
|
|
|
// The size of memory table(as well as the write buffer).
|
|
// Note, there may have more than two memory tables in the system.
|
|
MemTableSize: uint64(memTableSize),
|
|
|
|
// MemTableStopWritesThreshold places a hard limit on the size
|
|
// of the existent MemTables(including the frozen one).
|
|
// Note, this must be the number of tables not the size of all memtables
|
|
// according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742
|
|
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
|
|
MemTableStopWritesThreshold: memTableLimit,
|
|
|
|
// The default compaction concurrency(1 thread),
|
|
// Here use all available CPUs for faster compaction.
|
|
MaxConcurrentCompactions: runtime.NumCPU,
|
|
|
|
// Per-level options. Options for at least one level must be specified. The
|
|
// options for the last level are used for all subsequent levels.
|
|
Levels: []pebble.LevelOptions{
|
|
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 4 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 8 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 16 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 32 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 64 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
{TargetFileSize: 128 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
|
|
},
|
|
ReadOnly: readonly,
|
|
EventListener: &pebble.EventListener{
|
|
CompactionBegin: db.onCompactionBegin,
|
|
CompactionEnd: db.onCompactionEnd,
|
|
WriteStallBegin: db.onWriteStallBegin,
|
|
WriteStallEnd: db.onWriteStallEnd,
|
|
},
|
|
Logger: panicLogger{}, // TODO(karalabe): Delete when this is upstreamed in Pebble
|
|
}
|
|
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
|
|
// for more details.
|
|
opt.Experimental.ReadSamplingMultiplier = -1
|
|
|
|
// Open the db and recover any potential corruptions
|
|
innerDB, err := pebble.Open(file, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
db.db = innerDB
|
|
|
|
db.compTimeMeter = metrics.GetOrRegisterMeter(namespace+"compact/time", nil)
|
|
db.compReadMeter = metrics.GetOrRegisterMeter(namespace+"compact/input", nil)
|
|
db.compWriteMeter = metrics.GetOrRegisterMeter(namespace+"compact/output", nil)
|
|
db.diskSizeGauge = metrics.GetOrRegisterGauge(namespace+"disk/size", nil)
|
|
db.diskReadMeter = metrics.GetOrRegisterMeter(namespace+"disk/read", nil)
|
|
db.diskWriteMeter = metrics.GetOrRegisterMeter(namespace+"disk/write", nil)
|
|
db.writeDelayMeter = metrics.GetOrRegisterMeter(namespace+"compact/writedelay/duration", nil)
|
|
db.writeDelayNMeter = metrics.GetOrRegisterMeter(namespace+"compact/writedelay/counter", nil)
|
|
db.memCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/memory", nil)
|
|
db.level0CompGauge = metrics.GetOrRegisterGauge(namespace+"compact/level0", nil)
|
|
db.nonlevel0CompGauge = metrics.GetOrRegisterGauge(namespace+"compact/nonlevel0", nil)
|
|
db.seekCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/seek", nil)
|
|
db.manualMemAllocGauge = metrics.GetOrRegisterGauge(namespace+"memory/manualalloc", nil)
|
|
|
|
// Start up the metrics gathering and return
|
|
go db.meter(metricsGatheringInterval, namespace)
|
|
return db, nil
|
|
}
|
|
|
|
// Close stops the metrics collection, flushes any pending data to disk and closes
|
|
// all io accesses to the underlying key-value store.
|
|
func (d *Database) Close() error {
|
|
d.quitLock.Lock()
|
|
defer d.quitLock.Unlock()
|
|
// Allow double closing, simplifies things
|
|
if d.closed {
|
|
return nil
|
|
}
|
|
d.closed = true
|
|
if d.quitChan != nil {
|
|
errc := make(chan error)
|
|
d.quitChan <- errc
|
|
if err := <-errc; err != nil {
|
|
d.log.Error("Metrics collection failed", "err", err)
|
|
}
|
|
d.quitChan = nil
|
|
}
|
|
return d.db.Close()
|
|
}
|
|
|
|
// Has retrieves if a key is present in the key-value store.
|
|
func (d *Database) Has(key []byte) (bool, error) {
|
|
d.quitLock.RLock()
|
|
defer d.quitLock.RUnlock()
|
|
if d.closed {
|
|
return false, pebble.ErrClosed
|
|
}
|
|
_, closer, err := d.db.Get(key)
|
|
if err == pebble.ErrNotFound {
|
|
return false, nil
|
|
} else if err != nil {
|
|
return false, err
|
|
}
|
|
if err = closer.Close(); err != nil {
|
|
return false, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// Get retrieves the given key if it's present in the key-value store.
|
|
func (d *Database) Get(key []byte) ([]byte, error) {
|
|
d.quitLock.RLock()
|
|
defer d.quitLock.RUnlock()
|
|
if d.closed {
|
|
return nil, pebble.ErrClosed
|
|
}
|
|
dat, closer, err := d.db.Get(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ret := make([]byte, len(dat))
|
|
copy(ret, dat)
|
|
if err = closer.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// Put inserts the given value into the key-value store.
|
|
func (d *Database) Put(key []byte, value []byte) error {
|
|
d.quitLock.RLock()
|
|
defer d.quitLock.RUnlock()
|
|
if d.closed {
|
|
return pebble.ErrClosed
|
|
}
|
|
return d.db.Set(key, value, d.writeOptions)
|
|
}
|
|
|
|
// Delete removes the key from the key-value store.
|
|
func (d *Database) Delete(key []byte) error {
|
|
d.quitLock.RLock()
|
|
defer d.quitLock.RUnlock()
|
|
if d.closed {
|
|
return pebble.ErrClosed
|
|
}
|
|
return d.db.Delete(key, d.writeOptions)
|
|
}
|
|
|
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
|
// (inclusive on start, exclusive on end).
|
|
func (d *Database) DeleteRange(start, end []byte) error {
|
|
d.quitLock.RLock()
|
|
defer d.quitLock.RUnlock()
|
|
if d.closed {
|
|
return pebble.ErrClosed
|
|
}
|
|
return d.db.DeleteRange(start, end, d.writeOptions)
|
|
}
|
|
|
|
// NewBatch creates a write-only key-value store that buffers changes to its host
|
|
// database until a final write is called.
|
|
func (d *Database) NewBatch() ethdb.Batch {
|
|
return &batch{
|
|
b: d.db.NewBatch(),
|
|
db: d,
|
|
}
|
|
}
|
|
|
|
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
|
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
|
return &batch{
|
|
b: d.db.NewBatchWithSize(size),
|
|
db: d,
|
|
}
|
|
}
|
|
|
|
// upperBound returns the upper bound for the given prefix
|
|
func upperBound(prefix []byte) (limit []byte) {
|
|
for i := len(prefix) - 1; i >= 0; i-- {
|
|
c := prefix[i]
|
|
if c == 0xff {
|
|
continue
|
|
}
|
|
limit = make([]byte, i+1)
|
|
copy(limit, prefix)
|
|
limit[i] = c + 1
|
|
break
|
|
}
|
|
return limit
|
|
}
|
|
|
|
// Stat returns the internal metrics of Pebble in a text format. It's a developer
|
|
// method to read everything there is to read, independent of Pebble version.
|
|
func (d *Database) Stat() (string, error) {
|
|
return d.db.Metrics().String(), nil
|
|
}
|
|
|
|
// Compact flattens the underlying data store for the given key range. In essence,
|
|
// deleted and overwritten versions are discarded, and the data is rearranged to
|
|
// reduce the cost of operations needed to access them.
|
|
//
|
|
// A nil start is treated as a key before all keys in the data store; a nil limit
|
|
// is treated as a key after all keys in the data store. If both is nil then it
|
|
// will compact entire data store.
|
|
func (d *Database) Compact(start []byte, limit []byte) error {
|
|
// There is no special flag to represent the end of key range
|
|
// in pebble(nil in leveldb). Use an ugly hack to construct a
|
|
// large key to represent it.
|
|
// Note any prefixed database entry will be smaller than this
|
|
// flag, as for trie nodes we need the 32 byte 0xff because
|
|
// there might be a shared prefix starting with a number of
|
|
// 0xff-s, so 32 ensures than only a hash collision could touch it.
|
|
// https://github.com/cockroachdb/pebble/issues/2359#issuecomment-1443995833
|
|
if limit == nil {
|
|
limit = bytes.Repeat([]byte{0xff}, 32)
|
|
}
|
|
return d.db.Compact(start, limit, true) // Parallelization is preferred
|
|
}
|
|
|
|
// Path returns the path to the database directory.
|
|
func (d *Database) Path() string {
|
|
return d.fn
|
|
}
|
|
|
|
// meter periodically retrieves internal pebble counters and reports them to
|
|
// the metrics subsystem.
|
|
func (d *Database) meter(refresh time.Duration, namespace string) {
|
|
var errc chan error
|
|
timer := time.NewTimer(refresh)
|
|
defer timer.Stop()
|
|
|
|
// Create storage and warning log tracer for write delay.
|
|
var (
|
|
compTimes [2]int64
|
|
compWrites [2]int64
|
|
compReads [2]int64
|
|
|
|
nWrites [2]int64
|
|
|
|
writeDelayTimes [2]int64
|
|
writeDelayCounts [2]int64
|
|
lastWriteStallReport time.Time
|
|
)
|
|
|
|
// Iterate ad infinitum and collect the stats
|
|
for i := 1; errc == nil; i++ {
|
|
var (
|
|
compWrite int64
|
|
compRead int64
|
|
nWrite int64
|
|
|
|
stats = d.db.Metrics()
|
|
compTime = d.compTime.Load()
|
|
writeDelayCount = d.writeDelayCount.Load()
|
|
writeDelayTime = d.writeDelayTime.Load()
|
|
nonLevel0CompCount = int64(d.nonLevel0Comp.Load())
|
|
level0CompCount = int64(d.level0Comp.Load())
|
|
)
|
|
writeDelayTimes[i%2] = writeDelayTime
|
|
writeDelayCounts[i%2] = writeDelayCount
|
|
compTimes[i%2] = compTime
|
|
|
|
for _, levelMetrics := range stats.Levels {
|
|
nWrite += int64(levelMetrics.BytesCompacted)
|
|
nWrite += int64(levelMetrics.BytesFlushed)
|
|
compWrite += int64(levelMetrics.BytesCompacted)
|
|
compRead += int64(levelMetrics.BytesRead)
|
|
}
|
|
|
|
nWrite += int64(stats.WAL.BytesWritten)
|
|
|
|
compWrites[i%2] = compWrite
|
|
compReads[i%2] = compRead
|
|
nWrites[i%2] = nWrite
|
|
|
|
if d.writeDelayNMeter != nil {
|
|
d.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
|
|
}
|
|
if d.writeDelayMeter != nil {
|
|
d.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
|
|
}
|
|
// Print a warning log if writing has been stalled for a while. The log will
|
|
// be printed per minute to avoid overwhelming users.
|
|
if d.writeStalled.Load() && writeDelayCounts[i%2] == writeDelayCounts[(i-1)%2] &&
|
|
time.Now().After(lastWriteStallReport.Add(degradationWarnInterval)) {
|
|
d.log.Warn("Database compacting, degraded performance")
|
|
lastWriteStallReport = time.Now()
|
|
}
|
|
if d.compTimeMeter != nil {
|
|
d.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
|
|
}
|
|
if d.compReadMeter != nil {
|
|
d.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
|
|
}
|
|
if d.compWriteMeter != nil {
|
|
d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
|
|
}
|
|
if d.diskSizeGauge != nil {
|
|
d.diskSizeGauge.Update(int64(stats.DiskSpaceUsage()))
|
|
}
|
|
if d.diskReadMeter != nil {
|
|
d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
|
|
}
|
|
if d.diskWriteMeter != nil {
|
|
d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
|
|
}
|
|
// See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054
|
|
manuallyAllocated := stats.BlockCache.Size + int64(stats.MemTable.Size) + int64(stats.MemTable.ZombieSize)
|
|
d.manualMemAllocGauge.Update(manuallyAllocated)
|
|
d.memCompGauge.Update(stats.Flush.Count)
|
|
d.nonlevel0CompGauge.Update(nonLevel0CompCount)
|
|
d.level0CompGauge.Update(level0CompCount)
|
|
d.seekCompGauge.Update(stats.Compact.ReadCount)
|
|
|
|
for i, level := range stats.Levels {
|
|
// Append metrics for additional layers
|
|
if i >= len(d.levelsGauge) {
|
|
d.levelsGauge = append(d.levelsGauge, metrics.GetOrRegisterGauge(namespace+fmt.Sprintf("tables/level%v", i), nil))
|
|
}
|
|
d.levelsGauge[i].Update(level.NumFiles)
|
|
}
|
|
|
|
// Sleep a bit, then repeat the stats collection
|
|
select {
|
|
case errc = <-d.quitChan:
|
|
// Quit requesting, stop hammering the database
|
|
case <-timer.C:
|
|
timer.Reset(refresh)
|
|
// Timeout, gather a new set of stats
|
|
}
|
|
}
|
|
errc <- nil
|
|
}
|
|
|
|
// batch is a write-only batch that commits changes to its host database
|
|
// when Write is called. A batch cannot be used concurrently.
|
|
type batch struct {
|
|
b *pebble.Batch
|
|
db *Database
|
|
size int
|
|
}
|
|
|
|
// Put inserts the given value into the batch for later committing.
|
|
func (b *batch) Put(key, value []byte) error {
|
|
if err := b.b.Set(key, value, nil); err != nil {
|
|
return err
|
|
}
|
|
b.size += len(key) + len(value)
|
|
return nil
|
|
}
|
|
|
|
// Delete inserts the key removal into the batch for later committing.
|
|
func (b *batch) Delete(key []byte) error {
|
|
if err := b.b.Delete(key, nil); err != nil {
|
|
return err
|
|
}
|
|
b.size += len(key)
|
|
return nil
|
|
}
|
|
|
|
// ValueSize retrieves the amount of data queued up for writing.
|
|
func (b *batch) ValueSize() int {
|
|
return b.size
|
|
}
|
|
|
|
// Write flushes any accumulated data to disk.
|
|
func (b *batch) Write() error {
|
|
b.db.quitLock.RLock()
|
|
defer b.db.quitLock.RUnlock()
|
|
if b.db.closed {
|
|
return pebble.ErrClosed
|
|
}
|
|
return b.b.Commit(b.db.writeOptions)
|
|
}
|
|
|
|
// Reset resets the batch for reuse.
|
|
func (b *batch) Reset() {
|
|
b.b.Reset()
|
|
b.size = 0
|
|
}
|
|
|
|
// Replay replays the batch contents.
|
|
func (b *batch) Replay(w ethdb.KeyValueWriter) error {
|
|
reader := b.b.Reader()
|
|
for {
|
|
kind, k, v, ok, err := reader.Next()
|
|
if !ok || err != nil {
|
|
return err
|
|
}
|
|
// The (k,v) slices might be overwritten if the batch is reset/reused,
|
|
// and the receiver should copy them if they are to be retained long-term.
|
|
if kind == pebble.InternalKeyKindSet {
|
|
if err = w.Put(k, v); err != nil {
|
|
return err
|
|
}
|
|
} else if kind == pebble.InternalKeyKindDelete {
|
|
if err = w.Delete(k); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
return fmt.Errorf("unhandled operation, keytype: %v", kind)
|
|
}
|
|
}
|
|
}
|
|
|
|
// pebbleIterator is a wrapper of underlying iterator in storage engine.
|
|
// The purpose of this structure is to implement the missing APIs.
|
|
//
|
|
// The pebble iterator is not thread-safe.
|
|
type pebbleIterator struct {
|
|
iter *pebble.Iterator
|
|
moved bool
|
|
released bool
|
|
}
|
|
|
|
// NewIterator creates a binary-alphabetical iterator over a subset
|
|
// of database content with a particular key prefix, starting at a particular
|
|
// initial key (or after, if it does not exist).
|
|
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
|
iter, _ := d.db.NewIter(&pebble.IterOptions{
|
|
LowerBound: append(prefix, start...),
|
|
UpperBound: upperBound(prefix),
|
|
})
|
|
iter.First()
|
|
return &pebbleIterator{iter: iter, moved: true, released: false}
|
|
}
|
|
|
|
// Next moves the iterator to the next key/value pair. It returns whether the
|
|
// iterator is exhausted.
|
|
func (iter *pebbleIterator) Next() bool {
|
|
if iter.moved {
|
|
iter.moved = false
|
|
return iter.iter.Valid()
|
|
}
|
|
return iter.iter.Next()
|
|
}
|
|
|
|
// Error returns any accumulated error. Exhausting all the key/value pairs
|
|
// is not considered to be an error.
|
|
func (iter *pebbleIterator) Error() error {
|
|
return iter.iter.Error()
|
|
}
|
|
|
|
// Key returns the key of the current key/value pair, or nil if done. The caller
|
|
// should not modify the contents of the returned slice, and its contents may
|
|
// change on the next call to Next.
|
|
func (iter *pebbleIterator) Key() []byte {
|
|
return iter.iter.Key()
|
|
}
|
|
|
|
// Value returns the value of the current key/value pair, or nil if done. The
|
|
// caller should not modify the contents of the returned slice, and its contents
|
|
// may change on the next call to Next.
|
|
func (iter *pebbleIterator) Value() []byte {
|
|
return iter.iter.Value()
|
|
}
|
|
|
|
// Release releases associated resources. Release should always succeed and can
|
|
// be called multiple times without causing error.
|
|
func (iter *pebbleIterator) Release() {
|
|
if !iter.released {
|
|
iter.iter.Close()
|
|
iter.released = true
|
|
}
|
|
}
|