trie: concurrent commit (#30545)
This change makes the trie commit operation concurrent, if the number of changes exceed 100. Co-authored-by: stevemilk <wangpeculiar@gmail.com> Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
parent
16f64098b9
commit
f4dc7530b1
|
@ -18,6 +18,7 @@ package trie
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
|
@ -42,12 +43,12 @@ func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *
|
|||
}
|
||||
|
||||
// Commit collapses a node down into a hash node.
|
||||
func (c *committer) Commit(n node) hashNode {
|
||||
return c.commit(nil, n).(hashNode)
|
||||
func (c *committer) Commit(n node, parallel bool) hashNode {
|
||||
return c.commit(nil, n, parallel).(hashNode)
|
||||
}
|
||||
|
||||
// commit collapses a node down into a hash node and returns it.
|
||||
func (c *committer) commit(path []byte, n node) node {
|
||||
func (c *committer) commit(path []byte, n node, parallel bool) node {
|
||||
// if this path is clean, use available cached data
|
||||
hash, dirty := n.cache()
|
||||
if hash != nil && !dirty {
|
||||
|
@ -62,7 +63,7 @@ func (c *committer) commit(path []byte, n node) node {
|
|||
// If the child is fullNode, recursively commit,
|
||||
// otherwise it can only be hashNode or valueNode.
|
||||
if _, ok := cn.Val.(*fullNode); ok {
|
||||
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
|
||||
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val, false)
|
||||
}
|
||||
// The key needs to be copied, since we're adding it to the
|
||||
// modified nodeset.
|
||||
|
@ -73,7 +74,7 @@ func (c *committer) commit(path []byte, n node) node {
|
|||
}
|
||||
return collapsed
|
||||
case *fullNode:
|
||||
hashedKids := c.commitChildren(path, cn)
|
||||
hashedKids := c.commitChildren(path, cn, parallel)
|
||||
collapsed := cn.copy()
|
||||
collapsed.Children = hashedKids
|
||||
|
||||
|
@ -91,8 +92,12 @@ func (c *committer) commit(path []byte, n node) node {
|
|||
}
|
||||
|
||||
// commitChildren commits the children of the given fullnode
|
||||
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
|
||||
var children [17]node
|
||||
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) [17]node {
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
nodesMu sync.Mutex
|
||||
children [17]node
|
||||
)
|
||||
for i := 0; i < 16; i++ {
|
||||
child := n.Children[i]
|
||||
if child == nil {
|
||||
|
@ -108,7 +113,24 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
|
|||
// Commit the child recursively and store the "hashed" value.
|
||||
// Note the returned node can be some embedded nodes, so it's
|
||||
// possible the type is not hashNode.
|
||||
children[i] = c.commit(append(path, byte(i)), child)
|
||||
if !parallel {
|
||||
children[i] = c.commit(append(path, byte(i)), child, false)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
p := append(path, byte(index))
|
||||
childSet := trienode.NewNodeSet(c.nodes.Owner)
|
||||
childCommitter := newCommitter(childSet, c.tracer, c.collectLeaf)
|
||||
children[index] = childCommitter.commit(p, child, false)
|
||||
nodesMu.Lock()
|
||||
c.nodes.MergeSet(childSet)
|
||||
nodesMu.Unlock()
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
if parallel {
|
||||
wg.Wait()
|
||||
}
|
||||
// For the 17th child, it's possible the type is valuenode.
|
||||
if n.Children[16] != nil {
|
||||
|
|
13
trie/trie.go
13
trie/trie.go
|
@ -49,6 +49,9 @@ type Trie struct {
|
|||
// actually unhashed nodes.
|
||||
unhashed int
|
||||
|
||||
// uncommitted is the number of updates since last commit.
|
||||
uncommitted int
|
||||
|
||||
// reader is the handler trie can retrieve nodes from.
|
||||
reader *trieReader
|
||||
|
||||
|
@ -67,9 +70,10 @@ func (t *Trie) Copy() *Trie {
|
|||
root: t.root,
|
||||
owner: t.owner,
|
||||
committed: t.committed,
|
||||
unhashed: t.unhashed,
|
||||
reader: t.reader,
|
||||
tracer: t.tracer.copy(),
|
||||
uncommitted: t.uncommitted,
|
||||
unhashed: t.unhashed,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,6 +313,7 @@ func (t *Trie) Update(key, value []byte) error {
|
|||
|
||||
func (t *Trie) update(key, value []byte) error {
|
||||
t.unhashed++
|
||||
t.uncommitted++
|
||||
k := keybytesToHex(key)
|
||||
if len(value) != 0 {
|
||||
_, n, err := t.insert(t.root, nil, k, valueNode(value))
|
||||
|
@ -422,6 +427,7 @@ func (t *Trie) Delete(key []byte) error {
|
|||
if t.committed {
|
||||
return ErrCommitted
|
||||
}
|
||||
t.uncommitted++
|
||||
t.unhashed++
|
||||
k := keybytesToHex(key)
|
||||
_, n, err := t.delete(t.root, nil, k)
|
||||
|
@ -642,7 +648,9 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
|
|||
for _, path := range t.tracer.deletedNodes() {
|
||||
nodes.AddNode([]byte(path), trienode.NewDeleted())
|
||||
}
|
||||
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
|
||||
// If the number of changes is below 100, we let one thread handle it
|
||||
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root, t.uncommitted > 100)
|
||||
t.uncommitted = 0
|
||||
return rootHash, nodes
|
||||
}
|
||||
|
||||
|
@ -678,6 +686,7 @@ func (t *Trie) Reset() {
|
|||
t.root = nil
|
||||
t.owner = common.Hash{}
|
||||
t.unhashed = 0
|
||||
t.uncommitted = 0
|
||||
t.tracer.reset()
|
||||
t.committed = false
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
|
||||
|
@ -35,6 +36,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/internal/testrand"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
"github.com/holiman/uint256"
|
||||
|
@ -1206,3 +1208,105 @@ func FuzzTrie(f *testing.F) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkCommit(b *testing.B) {
|
||||
benchmarkCommit(b, 100)
|
||||
benchmarkCommit(b, 500)
|
||||
benchmarkCommit(b, 2000)
|
||||
benchmarkCommit(b, 5000)
|
||||
}
|
||||
|
||||
func benchmarkCommit(b *testing.B, n int) {
|
||||
b.Run(fmt.Sprintf("commit-%vnodes-sequential", n), func(b *testing.B) {
|
||||
testCommit(b, n, false)
|
||||
})
|
||||
b.Run(fmt.Sprintf("commit-%vnodes-parallel", n), func(b *testing.B) {
|
||||
testCommit(b, n, true)
|
||||
})
|
||||
}
|
||||
|
||||
func testCommit(b *testing.B, n int, parallel bool) {
|
||||
tries := make([]*Trie, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
tries[i] = NewEmpty(nil)
|
||||
for j := 0; j < n; j++ {
|
||||
key := testrand.Bytes(32)
|
||||
val := testrand.Bytes(32)
|
||||
tries[i].Update(key, val)
|
||||
}
|
||||
tries[i].Hash()
|
||||
if !parallel {
|
||||
tries[i].uncommitted = 0
|
||||
}
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < len(tries); i++ {
|
||||
tries[i].Commit(true)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommitCorrect(t *testing.T) {
|
||||
var paraTrie = NewEmpty(nil)
|
||||
var refTrie = NewEmpty(nil)
|
||||
|
||||
for j := 0; j < 5000; j++ {
|
||||
key := testrand.Bytes(32)
|
||||
val := testrand.Bytes(32)
|
||||
paraTrie.Update(key, val)
|
||||
refTrie.Update(common.CopyBytes(key), common.CopyBytes(val))
|
||||
}
|
||||
paraTrie.Hash()
|
||||
refTrie.Hash()
|
||||
refTrie.uncommitted = 0
|
||||
|
||||
haveRoot, haveNodes := paraTrie.Commit(true)
|
||||
wantRoot, wantNodes := refTrie.Commit(true)
|
||||
|
||||
if haveRoot != wantRoot {
|
||||
t.Fatalf("have %x want %x", haveRoot, wantRoot)
|
||||
}
|
||||
have := printSet(haveNodes)
|
||||
want := printSet(wantNodes)
|
||||
if have != want {
|
||||
i := 0
|
||||
for i = 0; i < len(have); i++ {
|
||||
if have[i] != want[i] {
|
||||
break
|
||||
}
|
||||
}
|
||||
if i > 100 {
|
||||
i -= 100
|
||||
}
|
||||
t.Fatalf("have != want\nhave %q\nwant %q", have[i:], want[i:])
|
||||
}
|
||||
}
|
||||
func printSet(set *trienode.NodeSet) string {
|
||||
var out = new(strings.Builder)
|
||||
fmt.Fprintf(out, "nodeset owner: %v\n", set.Owner)
|
||||
var paths []string
|
||||
for k := range set.Nodes {
|
||||
paths = append(paths, k)
|
||||
}
|
||||
sort.Strings(paths)
|
||||
|
||||
for _, path := range paths {
|
||||
n := set.Nodes[path]
|
||||
// Deletion
|
||||
if n.IsDeleted() {
|
||||
fmt.Fprintf(out, " [-]: %x\n", path)
|
||||
continue
|
||||
}
|
||||
// Insertion or update
|
||||
fmt.Fprintf(out, " [+/*]: %x -> %v \n", path, n.Hash)
|
||||
}
|
||||
sort.Slice(set.Leaves, func(i, j int) bool {
|
||||
a := set.Leaves[i]
|
||||
b := set.Leaves[j]
|
||||
return bytes.Compare(a.Parent[:], b.Parent[:]) < 0
|
||||
})
|
||||
for _, n := range set.Leaves {
|
||||
fmt.Fprintf(out, "[leaf]: %v\n", n)
|
||||
}
|
||||
return out.String()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package trienode
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
|
@ -99,6 +100,23 @@ func (set *NodeSet) AddNode(path []byte, n *Node) {
|
|||
set.Nodes[string(path)] = n
|
||||
}
|
||||
|
||||
// MergeSet merges this 'set' with 'other'. It assumes that the sets are disjoint,
|
||||
// and thus does not deduplicate data (count deletes, dedup leaves etc).
|
||||
func (set *NodeSet) MergeSet(other *NodeSet) error {
|
||||
if set.Owner != other.Owner {
|
||||
return fmt.Errorf("nodesets belong to different owner are not mergeable %x-%x", set.Owner, other.Owner)
|
||||
}
|
||||
maps.Copy(set.Nodes, other.Nodes)
|
||||
|
||||
set.deletes += other.deletes
|
||||
set.updates += other.updates
|
||||
|
||||
// Since we assume the sets are disjoint, we can safely append leaves
|
||||
// like this without deduplication.
|
||||
set.Leaves = append(set.Leaves, other.Leaves...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge adds a set of nodes into the set.
|
||||
func (set *NodeSet) Merge(owner common.Hash, nodes map[string]*Node) error {
|
||||
if set.Owner != owner {
|
||||
|
|
Loading…
Reference in New Issue