From 0e7efd696bb028ea63ab4e0004b0856791628595 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 25 Oct 2021 16:24:27 +0200 Subject: [PATCH] core/rawdb, ethdb: introduce batched/atomic reads from ancients (#23566) This PR adds a new accessor method to the freezer database. This new view offers a consistent interface, guaranteeing that all individual tables (headers, bodies etc) are all on the same number, and that this number is not changes (added/truncated) while the operation is performing. --- core/rawdb/accessors_chain.go | 199 +++++++++++++--------------------- core/rawdb/database.go | 20 +++- core/rawdb/freezer.go | 21 +++- core/rawdb/table.go | 10 +- ethdb/database.go | 17 ++- 5 files changed, 129 insertions(+), 138 deletions(-) diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index ed1c71e202..7f26c3a420 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -35,20 +35,15 @@ import ( // ReadCanonicalHash retrieves the hash assigned to a canonical block number. func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { - data, _ := db.Ancient(freezerHashTable, number) - if len(data) == 0 { - data, _ = db.Get(headerHashKey(number)) - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + data, _ = reader.Ancient(freezerHashTable, number) if len(data) == 0 { - data, _ = db.Ancient(freezerHashTable, number) + // Get it by hash from leveldb + data, _ = db.Get(headerHashKey(number)) } - } - if len(data) == 0 { - return common.Hash{} - } + return nil + }) return common.BytesToHash(data) } @@ -304,32 +299,25 @@ func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) { // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - // First try to look up the data in ancient database. Extra hash - // comparison is necessary since ancient database only maintains - // the canonical data. - data, _ := db.Ancient(freezerHeaderTable, number) - if len(data) > 0 && crypto.Keccak256Hash(data) == hash { - return data - } - // Then try to look up the data in leveldb. - data, _ = db.Get(headerKey(number, hash)) - if len(data) > 0 { - return data - } - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - data, _ = db.Ancient(freezerHeaderTable, number) - if len(data) > 0 && crypto.Keccak256Hash(data) == hash { - return data - } - return nil // Can't find the data anywhere. + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. + data, _ = reader.Ancient(freezerHeaderTable, number) + if len(data) > 0 && crypto.Keccak256Hash(data) == hash { + return nil + } + // If not, try reading from leveldb + data, _ = db.Get(headerKey(number, hash)) + return nil + }) + return data } // HasHeader verifies the existence of a block header corresponding to the hash. func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { + if isCanon(db, number, hash) { return true } if has, err := db.Has(headerKey(number, hash)); !has || err != nil { @@ -389,53 +377,48 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number } } +// isCanon is an internal utility method, to check whether the given number/hash +// is part of the ancient (canon) set. +func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool { + h, err := reader.Ancient(freezerHashTable, number) + if err != nil { + return false + } + return bytes.Equal(h, hash[:]) +} + // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash // comparison is necessary since ancient database only maintains // the canonical data. - data, _ := db.Ancient(freezerBodiesTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + // Check if the data is in ancients + if isCanon(reader, number, hash) { + data, _ = reader.Ancient(freezerBodiesTable, number) + return nil } - } - // Then try to look up the data in leveldb. - data, _ = db.Get(blockBodyKey(number, hash)) - if len(data) > 0 { - return data - } - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - data, _ = db.Ancient(freezerBodiesTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data - } - } - return nil // Can't find the data anywhere. + // If not, try reading from leveldb + data, _ = db.Get(blockBodyKey(number, hash)) + return nil + }) + return data } // ReadCanonicalBodyRLP retrieves the block body (transactions and uncles) for the canonical // block at number, in RLP encoding. func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue { - // If it's an ancient one, we don't need the canonical hash - data, _ := db.Ancient(freezerBodiesTable, number) - if len(data) == 0 { - // Need to get the hash - data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number))) - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - if len(data) == 0 { - data, _ = db.Ancient(freezerBodiesTable, number) + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + data, _ = reader.Ancient(freezerBodiesTable, number) + if len(data) > 0 { + return nil } - } + // Get it by hash from leveldb + data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number))) + return nil + }) return data } @@ -448,7 +431,7 @@ func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp // HasBody verifies the existence of a block body corresponding to the hash. func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { + if isCanon(db, number, hash) { return true } if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil { @@ -489,33 +472,18 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding. func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - // First try to look up the data in ancient database. Extra hash - // comparison is necessary since ancient database only maintains - // the canonical data. - data, _ := db.Ancient(freezerDifficultyTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + // Check if the data is in ancients + if isCanon(reader, number, hash) { + data, _ = reader.Ancient(freezerDifficultyTable, number) + return nil } - } - // Then try to look up the data in leveldb. - data, _ = db.Get(headerTDKey(number, hash)) - if len(data) > 0 { - return data - } - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - data, _ = db.Ancient(freezerDifficultyTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data - } - } - return nil // Can't find the data anywhere. + // If not, try reading from leveldb + data, _ = db.Get(headerTDKey(number, hash)) + return nil + }) + return data } // ReadTd retrieves a block's total difficulty corresponding to the hash. @@ -553,7 +521,7 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { // HasReceipts verifies the existence of all the transaction receipts belonging // to a block. func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { - if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { + if isCanon(db, number, hash) { return true } if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil { @@ -564,33 +532,18 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool { // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - // First try to look up the data in ancient database. Extra hash - // comparison is necessary since ancient database only maintains - // the canonical data. - data, _ := db.Ancient(freezerReceiptTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data + var data []byte + db.ReadAncients(func(reader ethdb.AncientReader) error { + // Check if the data is in ancients + if isCanon(reader, number, hash) { + data, _ = reader.Ancient(freezerReceiptTable, number) + return nil } - } - // Then try to look up the data in leveldb. - data, _ = db.Get(blockReceiptsKey(number, hash)) - if len(data) > 0 { - return data - } - // In the background freezer is moving data from leveldb to flatten files. - // So during the first check for ancient db, the data is not yet in there, - // but when we reach into leveldb, the data was already moved. That would - // result in a not found error. - data, _ = db.Ancient(freezerReceiptTable, number) - if len(data) > 0 { - h, _ := db.Ancient(freezerHashTable, number) - if common.BytesToHash(h) == hash { - return data - } - } - return nil // Can't find the data anywhere. + // If not, try reading from leveldb + data, _ = db.Get(blockReceiptsKey(number, hash)) + return nil + }) + return data } // ReadRawReceipts retrieves all the transaction receipts belonging to a block. diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 0e116ef999..f7c7f5d336 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -89,8 +89,8 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) { return nil, errNotSupported } -// ReadAncients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) { +// AncientRange returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) { return nil, errNotSupported } @@ -119,6 +119,22 @@ func (db *nofreezedb) Sync() error { return errNotSupported } +func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) { + // Unlike other ancient-related methods, this method does not return + // errNotSupported when invoked. + // The reason for this is that the caller might want to do several things: + // 1. Check if something is in freezer, + // 2. If not, check leveldb. + // + // This will work, since the ancient-checks inside 'fn' will return errors, + // and the leveldb work will continue. + // + // If we instead were to return errNotSupported here, then the caller would + // have to explicitly check for that, having an extra clause to do the + // non-ancient operations. + return fn(db) +} + // NewDatabase creates a high level database on top of a given key-value data // store without a freezer moving immutable chain segments into cold storage. func NewDatabase(db ethdb.KeyValueStore) ethdb.Database { diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 4cecbc50f4..e19c202adc 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -80,8 +80,9 @@ type freezer struct { frozen uint64 // Number of blocks already frozen threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) - // This lock synchronizes writers and the truncate operation. - writeLock sync.Mutex + // This lock synchronizes writers and the truncate operation, as well as + // the "atomic" (batched) read operations. + writeLock sync.RWMutex writeBatch *freezerBatch readonly bool @@ -201,12 +202,12 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) { return nil, errUnknownTable } -// ReadAncients retrieves multiple items in sequence, starting from the index 'start'. +// AncientRange retrieves multiple items in sequence, starting from the index 'start'. // It will return // - at most 'max' items, // - at least 1 item (even if exceeding the maxByteSize), but will otherwise // return as many items as fit into maxByteSize. -func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { +func (f *freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { if table := f.tables[kind]; table != nil { return table.RetrieveItems(start, count, maxBytes) } @@ -222,8 +223,8 @@ func (f *freezer) Ancients() (uint64, error) { func (f *freezer) AncientSize(kind string) (uint64, error) { // This needs the write lock to avoid data races on table fields. // Speed doesn't matter here, AncientSize is for debugging. - f.writeLock.Lock() - defer f.writeLock.Unlock() + f.writeLock.RLock() + defer f.writeLock.RUnlock() if table := f.tables[kind]; table != nil { return table.size() @@ -231,6 +232,14 @@ func (f *freezer) AncientSize(kind string) (uint64, error) { return 0, errUnknownTable } +// ReadAncients runs the given read operation while ensuring that no writes take place +// on the underlying freezer. +func (f *freezer) ReadAncients(fn func(ethdb.AncientReader) error) (err error) { + f.writeLock.RLock() + defer f.writeLock.RUnlock() + return fn(f) +} + // ModifyAncients runs the given write operation. func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { if f.readonly { diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 02e23b5174..91fc31b660 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -62,10 +62,10 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) { return t.db.Ancient(kind, number) } -// ReadAncients is a noop passthrough that just forwards the request to the underlying +// AncientRange is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) { - return t.db.ReadAncients(kind, start, count, maxBytes) +func (t *table) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + return t.db.AncientRange(kind, start, count, maxBytes) } // Ancients is a noop passthrough that just forwards the request to the underlying @@ -85,6 +85,10 @@ func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, erro return t.db.ModifyAncients(fn) } +func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) { + return t.db.ReadAncients(fn) +} + // TruncateAncients is a noop passthrough that just forwards the request to the underlying // database. func (t *table) TruncateAncients(items uint64) error { diff --git a/ethdb/database.go b/ethdb/database.go index 3c6500d1dc..0a5729c6c1 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -76,12 +76,12 @@ type AncientReader interface { // Ancient retrieves an ancient binary blob from the append-only immutable files. Ancient(kind string, number uint64) ([]byte, error) - // ReadAncients retrieves multiple items in sequence, starting from the index 'start'. + // AncientRange retrieves multiple items in sequence, starting from the index 'start'. // It will return // - at most 'count' items, // - at least 1 item (even if exceeding the maxBytes), but will otherwise // return as many items as fit into maxBytes. - ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) + AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) @@ -90,6 +90,15 @@ type AncientReader interface { AncientSize(kind string) (uint64, error) } +// AncientBatchReader is the interface for 'batched' or 'atomic' reading. +type AncientBatchReader interface { + AncientReader + + // ReadAncients runs the given read operation while ensuring that no writes take place + // on the underlying freezer. + ReadAncients(fn func(AncientReader) error) (err error) +} + // AncientWriter contains the methods required to write to immutable ancient data. type AncientWriter interface { // ModifyAncients runs a write operation on the ancient store. @@ -117,7 +126,7 @@ type AncientWriteOp interface { // immutable ancient data. type Reader interface { KeyValueReader - AncientReader + AncientBatchReader } // Writer contains the methods required to write data to both key-value as well as @@ -130,7 +139,7 @@ type Writer interface { // AncientStore contains all the methods required to allow handling different // ancient data stores backing immutable chain data store. type AncientStore interface { - AncientReader + AncientBatchReader AncientWriter io.Closer }