From 9742b7a07455ecaab02940525cf48db55f7664f4 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Fri, 14 Feb 2025 23:33:54 +0100 Subject: [PATCH 1/5] 1 --- cmd/workload/filtercmd.go | 287 ++++++++++++++++++++++++++++++++++++++ cmd/workload/main.go | 95 +++++++++++++ 2 files changed, 382 insertions(+) create mode 100644 cmd/workload/filtercmd.go create mode 100644 cmd/workload/main.go diff --git a/cmd/workload/filtercmd.go b/cmd/workload/filtercmd.go new file mode 100644 index 0000000000..b0f3cd4415 --- /dev/null +++ b/cmd/workload/filtercmd.go @@ -0,0 +1,287 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "context" + "fmt" + "math" + "math/big" + "math/rand" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" + "github.com/urfave/cli/v2" +) + +const ( + maxFilterRange = 100000 + maxFilterResultSize = 100 + filterBuckets = 10 + maxFilterBucketSize = 100 + filterSeedChance = 10 + filterMergeChance = 45 +) + +var ( + filterCommand = &cli.Command{ + Name: "filter", + Usage: "Runs range log filter workload test against an RPC endpoint", + ArgsUsage: "", + Action: filterTestCmd, + Flags: []cli.Flag{}, + } +) + +func filterTestCmd(ctx *cli.Context) error { + f := filterTest{ec: makeEthClient(ctx)} + f.getFinalizedBlock() + query := f.newQuery() + f.query(&query) + if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { + for { + extQuery := f.extendQuery(query) + f.query(&extQuery) + if len(query.results) <= maxFilterResultSize { + break + } + query = extQuery + } + f.storeQuery(query) + } + return nil +} + +func (f *filterTest) storeQuery(query filterQuery) { + logRatio := math.Log(float64(len(query.results))*maxFilterRange/float64(query.end+1-query.begin)) / math.Log(maxFilterRange*maxFilterResultSize) + bucket := int(math.Floor(logRatio * filterBuckets)) + if bucket >= filterBuckets { + bucket = filterBuckets - 1 + } + if len(f.stored[bucket]) < maxFilterBucketSize { + f.stored[bucket] = append(f.stored[bucket], query) + } else { + f.stored[bucket][rand.Intn(len(f.stored[bucket]))] = query + } +} + +func (f *filterTest) extendQuery(q filterQuery) filterQuery { + rangeLen := q.end + 1 - q.begin + extLen := rand.Int63n(rangeLen) + 1 + extBefore := rand.Int63n(extLen + 1) + if extBefore > q.begin { + extBefore = q.begin + } + return filterQuery{ + begin: q.begin - extBefore, + end: q.end + extLen - extBefore, + addresses: q.addresses, + topics: q.topics, + } +} + +func (f *filterTest) newQuery() filterQuery { + for { + t := rand.Intn(100) + if t < filterSeedChance { + return f.newSeedQuery() + } + if t < filterSeedChance+filterMergeChance { + if query, ok := f.newMergedQuery(); ok { + return query + } + continue + } + if query, ok := f.newNarrowedQuery(); ok { + return query + } + } +} + +func (f *filterTest) newSeedQuery() filterQuery { + block := rand.Int63n(f.finalized + 1) + return filterQuery{ + begin: block, + end: block, + } +} + +func (f *filterTest) newMergedQuery() (filterQuery, bool) { + count := f.queryCount() + if count < 2 { + return filterQuery{}, false + } + pick1 := rand.Intn(count) + pick2 := pick1 + for pick2 == pick1 { + pick2 = rand.Intn(count) + } + q1 := f.pickQuery(pick1) + q2 := f.pickQuery(pick2) + var ( + m filterQuery + block int64 + topicCount int + ) + if rand.Intn(2) == 0 { + block = q1.begin + rand.Int63n(q1.end+1-q1.begin) + topicCount = len(q1.topics) + } else { + block = q2.begin + rand.Int63n(q2.end+1-q2.begin) + topicCount = len(q2.topics) + } + m.begin = block + m.end = block + m.topics = make([][]common.Hash, topicCount) + for _, addr := range q1.addresses { + if rand.Intn(2) == 0 { + m.addresses = append(m.addresses, addr) + } + } + for _, addr := range q2.addresses { + if rand.Intn(2) == 0 { + m.addresses = append(m.addresses, addr) + } + } + for i := range m.topics { + if len(q1.topics) > i { + for _, topic := range q1.topics[i] { + if rand.Intn(2) == 0 { + m.topics[i] = append(m.topics[i], topic) + } + } + } + if len(q2.topics) > i { + for _, topic := range q2.topics[i] { + if rand.Intn(2) == 0 { + m.topics[i] = append(m.topics[i], topic) + } + } + } + } + return m, true +} + +func (f *filterTest) newNarrowedQuery() (filterQuery, bool) { + count := f.queryCount() + if count < 1 { + return filterQuery{}, false + } + q := f.pickQuery(rand.Intn(count)) + log := q.results[rand.Intn(len(q.results))] + var emptyCount int + if len(q.addresses) == 0 { + emptyCount++ + } + for i := range log.Topics { + if len(q.topics) <= i || len(q.topics[i]) == 0 { + emptyCount++ + } + } + var query filterQuery + if emptyCount == 0 { + return query, false + } + query.addresses, query.topics = q.addresses, q.topics + pick := rand.Intn(emptyCount) + if len(q.addresses) == 0 { + if pick == 0 { + q.addresses = []common.Address{log.Address} + return query, true + } + pick-- + } + for i := range log.Topics { + if len(q.topics) <= i || len(q.topics[i]) == 0 { + if pick == 0 { + if len(q.topics) <= i { + q.topics = append(q.topics, make([][]common.Hash, i+1-len(q.topics))...) + } + q.topics[i] = []common.Hash{log.Topics[i]} + return query, true + } + pick-- + } + } + panic(nil) +} + +func (f *filterTest) queryCount() int { + var count int + for _, list := range f.stored { + count += len(list) + } + return count +} + +func (f *filterTest) pickQuery(pick int) filterQuery { + for _, list := range f.stored { + if pick < len(list) { + return list[pick] + } + pick -= len(list) + } + panic(nil) +} + +type filterTest struct { + ec *ethclient.Client + finalized int64 + stored [filterBuckets][]filterQuery +} + +type filterQuery struct { + begin, end int64 + addresses []common.Address + topics [][]common.Hash + results []types.Log + err error +} + +func (f *filterTest) getFinalizedBlock() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + header, err := f.ec.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + fmt.Println("finalized header error", err) + return + } + f.finalized = header.Number.Int64() + fmt.Println("finalized header updated", f.finalized) +} + +func (f *filterTest) query(query *filterQuery) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + logs, err := f.ec.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: big.NewInt(query.begin), + ToBlock: big.NewInt(query.end), + Addresses: query.addresses, + Topics: query.topics, + }) + if err != nil { + query.err = err + fmt.Println("filter query error", err) + return + } + query.results = logs + fmt.Println("filter query results", len(logs)) +} diff --git a/cmd/workload/main.go b/cmd/workload/main.go new file mode 100644 index 0000000000..9c46b12e7d --- /dev/null +++ b/cmd/workload/main.go @@ -0,0 +1,95 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "fmt" + "os" + + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/internal/debug" + "github.com/ethereum/go-ethereum/internal/flags" + "github.com/urfave/cli/v2" +) + +var app = flags.NewApp("go-ethereum workload test tool") + +func init() { + app.Flags = append(app.Flags, debug.Flags...) + app.Before = func(ctx *cli.Context) error { + flags.MigrateGlobalFlags(ctx) + return debug.Setup(ctx) + } + app.After = func(ctx *cli.Context) error { + debug.Exit() + return nil + } + app.CommandNotFound = func(ctx *cli.Context, cmd string) { + fmt.Fprintf(os.Stderr, "No such command: %s\n", cmd) + os.Exit(1) + } + + // Add subcommands. + app.Commands = []*cli.Command{ + filterCommand, + } +} + +func main() { + exit(app.Run(os.Args)) +} + +// commandHasFlag returns true if the current command supports the given flag. +func commandHasFlag(ctx *cli.Context, flag cli.Flag) bool { + names := flag.Names() + set := make(map[string]struct{}, len(names)) + for _, name := range names { + set[name] = struct{}{} + } + for _, ctx := range ctx.Lineage() { + if ctx.Command != nil { + for _, f := range ctx.Command.Flags { + for _, name := range f.Names() { + if _, ok := set[name]; ok { + return true + } + } + } + } + } + return false +} + +func makeEthClient(ctx *cli.Context) *ethclient.Client { + if ctx.NArg() < 1 { + exit("missing RPC endpoint URL as command-line argument") + } + url := ctx.Args().First() + cl, err := ethclient.Dial(url) + if err != nil { + exit(fmt.Errorf("Could not create RPC client at %s: %v", url, err)) + } + return cl +} + +func exit(err interface{}) { + if err == nil { + os.Exit(0) + } + fmt.Fprintln(os.Stderr, err) + os.Exit(1) +} From d52e5ca7a1d2100b5fd204ed8d95ba414a92d5c3 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sat, 15 Feb 2025 00:19:09 +0100 Subject: [PATCH 2/5] 11 --- cmd/workload/filtercmd.go | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/cmd/workload/filtercmd.go b/cmd/workload/filtercmd.go index b0f3cd4415..0e456d3631 100644 --- a/cmd/workload/filtercmd.go +++ b/cmd/workload/filtercmd.go @@ -53,19 +53,26 @@ var ( func filterTestCmd(ctx *cli.Context) error { f := filterTest{ec: makeEthClient(ctx)} - f.getFinalizedBlock() - query := f.newQuery() - f.query(&query) - if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { - for { - extQuery := f.extendQuery(query) - f.query(&extQuery) - if len(query.results) <= maxFilterResultSize { - break - } - query = extQuery + for { + select { + case <-ctx.Done(): + return nil + default: + } + f.getFinalizedBlock() + query := f.newQuery() + f.query(&query) + if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { + for { + extQuery := f.extendQuery(query) + f.query(&extQuery) + if len(query.results) <= maxFilterResultSize { + break + } + query = extQuery + } + f.storeQuery(query) } - f.storeQuery(query) } return nil } @@ -81,6 +88,11 @@ func (f *filterTest) storeQuery(query filterQuery) { } else { f.stored[bucket][rand.Intn(len(f.stored[bucket]))] = query } + fmt.Print("stored") + for _, list := range f.stored { + fmt.Print(" ", len(list)) + } + fmt.Println() } func (f *filterTest) extendQuery(q filterQuery) filterQuery { From 9a17c98ccdfdfa6bc2dfd49c63aacebfc7a0550b Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sat, 15 Feb 2025 10:08:55 +0100 Subject: [PATCH 3/5] 1 --- cmd/workload/filtercmd.go | 255 +++++++++++++++++++++++++++++--------- 1 file changed, 193 insertions(+), 62 deletions(-) diff --git a/cmd/workload/filtercmd.go b/cmd/workload/filtercmd.go index 0e456d3631..eb388f8ca4 100644 --- a/cmd/workload/filtercmd.go +++ b/cmd/workload/filtercmd.go @@ -22,20 +22,23 @@ import ( "math" "math/big" "math/rand" + "os" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli/v2" ) const ( - maxFilterRange = 100000 - maxFilterResultSize = 100 - filterBuckets = 10 + maxFilterRange = 10000000 + maxFilterResultSize = 300 + filterBuckets = 16 maxFilterBucketSize = 100 filterSeedChance = 10 filterMergeChance = 45 @@ -53,6 +56,7 @@ var ( func filterTestCmd(ctx *cli.Context) error { f := filterTest{ec: makeEthClient(ctx)} + lastWrite := time.Now() for { select { case <-ctx.Done(): @@ -61,23 +65,54 @@ func filterTestCmd(ctx *cli.Context) error { } f.getFinalizedBlock() query := f.newQuery() - f.query(&query) + f.query(query) + if query.err != nil { + f.failed = append(f.failed, query) + continue + } if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { for { extQuery := f.extendQuery(query) - f.query(&extQuery) - if len(query.results) <= maxFilterResultSize { + if extQuery == nil { + break + } + f.query(extQuery) + if extQuery.err == nil && len(extQuery.results) < len(query.results) { + extQuery.err = fmt.Errorf("invalid result length; old range %d %d; old length %d; new range %d %d; new length %d; addresses %v; topics %v", + query.begin, query.end, len(query.results), + extQuery.begin, extQuery.end, len(extQuery.results), + extQuery.addresses, extQuery.topics, + ) + } + if extQuery.err != nil { + f.failed = append(f.failed, extQuery) + break + } + if len(extQuery.results) > maxFilterResultSize { break } query = extQuery } f.storeQuery(query) + if time.Since(lastWrite) > time.Second*10 { + f.writeQueries("filter_queries") + f.writeFailed("filter_errors") + lastWrite = time.Now() + } } } return nil } -func (f *filterTest) storeQuery(query filterQuery) { +type filterTest struct { + ec *ethclient.Client + finalized int64 + stored [filterBuckets][]*filterQuery + failed []*filterQuery +} + +func (f *filterTest) storeQuery(query *filterQuery) { + query.resultHash = query.calculateHash() logRatio := math.Log(float64(len(query.results))*maxFilterRange/float64(query.end+1-query.begin)) / math.Log(maxFilterRange*maxFilterResultSize) bucket := int(math.Floor(logRatio * filterBuckets)) if bucket >= filterBuckets { @@ -95,61 +130,72 @@ func (f *filterTest) storeQuery(query filterQuery) { fmt.Println() } -func (f *filterTest) extendQuery(q filterQuery) filterQuery { +func (f *filterTest) extendQuery(q *filterQuery) *filterQuery { rangeLen := q.end + 1 - q.begin extLen := rand.Int63n(rangeLen) + 1 + if rangeLen+extLen > maxFilterRange { + return nil + } extBefore := rand.Int63n(extLen + 1) if extBefore > q.begin { extBefore = q.begin } - return filterQuery{ + extAfter := extLen - extBefore + if q.end+extAfter > f.finalized { + d := f.finalized - q.end - extAfter + extAfter -= d + if extBefore+d <= q.begin { + extBefore += d + } else { + extBefore = q.begin + } + } + return &filterQuery{ begin: q.begin - extBefore, - end: q.end + extLen - extBefore, + end: q.end + extAfter, addresses: q.addresses, topics: q.topics, } } -func (f *filterTest) newQuery() filterQuery { +func (f *filterTest) newQuery() *filterQuery { for { t := rand.Intn(100) if t < filterSeedChance { + fmt.Println("* seed") return f.newSeedQuery() } if t < filterSeedChance+filterMergeChance { - if query, ok := f.newMergedQuery(); ok { + if query := f.newMergedQuery(); query != nil { + fmt.Println("* merged") return query } + fmt.Println("* merged x") continue } - if query, ok := f.newNarrowedQuery(); ok { + if query := f.newNarrowedQuery(); query != nil { + fmt.Println("* narrowed") return query } + fmt.Println("* narrowed x") } } -func (f *filterTest) newSeedQuery() filterQuery { +func (f *filterTest) newSeedQuery() *filterQuery { block := rand.Int63n(f.finalized + 1) - return filterQuery{ + return &filterQuery{ begin: block, end: block, } } -func (f *filterTest) newMergedQuery() (filterQuery, bool) { - count := f.queryCount() - if count < 2 { - return filterQuery{}, false +func (f *filterTest) newMergedQuery() *filterQuery { + q1 := f.randomQuery() + q2 := f.randomQuery() + if q1 == nil || q2 == nil || q1 == q2 { + return nil } - pick1 := rand.Intn(count) - pick2 := pick1 - for pick2 == pick1 { - pick2 = rand.Intn(count) - } - q1 := f.pickQuery(pick1) - q2 := f.pickQuery(pick2) var ( - m filterQuery block int64 topicCount int ) @@ -160,9 +206,11 @@ func (f *filterTest) newMergedQuery() (filterQuery, bool) { block = q2.begin + rand.Int63n(q2.end+1-q2.begin) topicCount = len(q2.topics) } - m.begin = block - m.end = block - m.topics = make([][]common.Hash, topicCount) + m := &filterQuery{ + begin: block, + end: block, + topics: make([][]common.Hash, topicCount), + } for _, addr := range q1.addresses { if rand.Intn(2) == 0 { m.addresses = append(m.addresses, addr) @@ -189,15 +237,14 @@ func (f *filterTest) newMergedQuery() (filterQuery, bool) { } } } - return m, true + return m } -func (f *filterTest) newNarrowedQuery() (filterQuery, bool) { - count := f.queryCount() - if count < 1 { - return filterQuery{}, false +func (f *filterTest) newNarrowedQuery() *filterQuery { + q := f.randomQuery() + if q == nil { + return nil } - q := f.pickQuery(rand.Intn(count)) log := q.results[rand.Intn(len(q.results))] var emptyCount int if len(q.addresses) == 0 { @@ -208,16 +255,20 @@ func (f *filterTest) newNarrowedQuery() (filterQuery, bool) { emptyCount++ } } - var query filterQuery if emptyCount == 0 { - return query, false + return nil + } + query := &filterQuery{ + begin: q.begin, + end: q.end, + addresses: q.addresses, + topics: q.topics, } - query.addresses, query.topics = q.addresses, q.topics pick := rand.Intn(emptyCount) if len(q.addresses) == 0 { if pick == 0 { q.addresses = []common.Address{log.Address} - return query, true + return query } pick-- } @@ -228,7 +279,7 @@ func (f *filterTest) newNarrowedQuery() (filterQuery, bool) { q.topics = append(q.topics, make([][]common.Hash, i+1-len(q.topics))...) } q.topics[i] = []common.Hash{log.Topics[i]} - return query, true + return query } pick-- } @@ -236,38 +287,46 @@ func (f *filterTest) newNarrowedQuery() (filterQuery, bool) { panic(nil) } -func (f *filterTest) queryCount() int { - var count int +func (f *filterTest) randomQuery() *filterQuery { + var bucket, bucketCount int for _, list := range f.stored { - count += len(list) - } - return count -} - -func (f *filterTest) pickQuery(pick int) filterQuery { - for _, list := range f.stored { - if pick < len(list) { - return list[pick] + if len(list) > 0 { + bucketCount++ } - pick -= len(list) } - panic(nil) -} - -type filterTest struct { - ec *ethclient.Client - finalized int64 - stored [filterBuckets][]filterQuery + if bucketCount == 0 { + return nil + } + pick := rand.Intn(bucketCount) + for i, list := range f.stored { + if len(list) > 0 { + if pick == 0 { + bucket = i + break + } + pick-- + } + } + return f.stored[bucket][rand.Intn(len(f.stored[bucket]))] } type filterQuery struct { begin, end int64 addresses []common.Address topics [][]common.Hash + resultHash common.Hash results []types.Log err error } +func (fq *filterQuery) calculateHash() common.Hash { + enc, err := rlp.EncodeToBytes(&fq.results) + if err != nil { + exit(fmt.Errorf("Error encoding logs", "error", err)) + } + return crypto.Keccak256Hash(enc) +} + func (f *filterTest) getFinalizedBlock() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -281,7 +340,7 @@ func (f *filterTest) getFinalizedBlock() { } func (f *filterTest) query(query *filterQuery) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() logs, err := f.ec.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: big.NewInt(query.begin), @@ -295,5 +354,77 @@ func (f *filterTest) query(query *filterQuery) { return } query.results = logs - fmt.Println("filter query results", len(logs)) + fmt.Println("filter query range", query.end+1-query.begin, "results", len(logs)) +} + +func (f *filterTest) writeQueries(fn string) { + w, err := os.Create(fn) + if err != nil { + exit(fmt.Errorf("Error creating filter pattern file", "name", fn, "error", err)) + return + } + defer w.Close() + + w.WriteString("\t{\n") + for _, list := range f.stored { + w.WriteString("\t\t{\n") + for _, filter := range list { + w.WriteString(fmt.Sprintf("\t\t\t{%d, %d, []common.Address{\n", filter.begin, filter.end)) + for _, addr := range filter.addresses { + w.WriteString(fmt.Sprintf("\t\t\t\t\tcommon.HexToAddress(\"0x%040x\"),\n", addr)) + } + w.WriteString(fmt.Sprintf("\t\t\t\t}, [][]common.Hash{\n")) + for i, topics := range filter.topics { + if i == 0 { + w.WriteString(fmt.Sprintf("\t\t\t\t\t{\n")) + } + for _, topic := range topics { + w.WriteString(fmt.Sprintf("\t\t\t\t\t\tcommon.HexToHash(\"0x%064x\"),\n", topic)) + } + if i == len(filter.topics)-1 { + w.WriteString(fmt.Sprintf("\t\t\t\t\t},\n")) + } else { + w.WriteString(fmt.Sprintf("\t\t\t\t\t}, {\n")) + } + } + w.WriteString(fmt.Sprintf("\t\t\t\t}, common.HexToHash(\"0x%064x\"),\n", filter.resultHash)) + w.WriteString(fmt.Sprintf("\t\t\t},\n")) + } + w.WriteString("\t\t},\n") + } + w.WriteString("\t},\n") +} + +func (f *filterTest) writeFailed(fn string) { + w, err := os.Create(fn) + if err != nil { + exit(fmt.Errorf("Error creating filter error file", "name", fn, "error", err)) + return + } + defer w.Close() + + w.WriteString("\t{\n") + for _, filter := range f.failed { + w.WriteString(fmt.Sprintf("\t\t{%d, %d, []common.Address{\n", filter.begin, filter.end)) + for _, addr := range filter.addresses { + w.WriteString(fmt.Sprintf("\t\t\t\tcommon.HexToAddress(\"0x%040x\"),\n", addr)) + } + w.WriteString(fmt.Sprintf("\t\t\t}, [][]common.Hash{\n")) + for i, topics := range filter.topics { + if i == 0 { + w.WriteString(fmt.Sprintf("\t\t\t\t{\n")) + } + for _, topic := range topics { + w.WriteString(fmt.Sprintf("\t\t\t\t\tcommon.HexToHash(\"0x%064x\"),\n", topic)) + } + if i == len(filter.topics)-1 { + w.WriteString(fmt.Sprintf("\t\t\t\t},\n")) + } else { + w.WriteString(fmt.Sprintf("\t\t\t\t}, {\n")) + } + } + w.WriteString(fmt.Sprintf("\t\t\t}, \"%v\"),\n", filter.err)) + w.WriteString(fmt.Sprintf("\t\t},\n")) + } + w.WriteString("\t},\n") } From 45217c64642201162501e3f0bccee72fe2d4c267 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 18 Feb 2025 02:14:14 +0100 Subject: [PATCH 4/5] 2 --- cmd/workload/filtercmd.go | 205 +++++++++++++++----------------------- 1 file changed, 83 insertions(+), 122 deletions(-) diff --git a/cmd/workload/filtercmd.go b/cmd/workload/filtercmd.go index eb388f8ca4..c50cc243ae 100644 --- a/cmd/workload/filtercmd.go +++ b/cmd/workload/filtercmd.go @@ -18,6 +18,7 @@ package main import ( "context" + "encoding/json" "fmt" "math" "math/big" @@ -66,25 +67,25 @@ func filterTestCmd(ctx *cli.Context) error { f.getFinalizedBlock() query := f.newQuery() f.query(query) - if query.err != nil { + if query.Err != nil { f.failed = append(f.failed, query) continue } if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { for { - extQuery := f.extendQuery(query) + extQuery := f.extendRange(query) if extQuery == nil { break } f.query(extQuery) - if extQuery.err == nil && len(extQuery.results) < len(query.results) { - extQuery.err = fmt.Errorf("invalid result length; old range %d %d; old length %d; new range %d %d; new length %d; addresses %v; topics %v", - query.begin, query.end, len(query.results), - extQuery.begin, extQuery.end, len(extQuery.results), - extQuery.addresses, extQuery.topics, + if extQuery.Err == nil && len(extQuery.results) < len(query.results) { + extQuery.Err = fmt.Errorf("invalid result length; old range %d %d; old length %d; new range %d %d; new length %d; address %v; Topics %v", + query.FromBlock, query.ToBlock, len(query.results), + extQuery.FromBlock, extQuery.ToBlock, len(extQuery.results), + extQuery.Address, extQuery.Topics, ) } - if extQuery.err != nil { + if extQuery.Err != nil { f.failed = append(f.failed, extQuery) break } @@ -112,8 +113,9 @@ type filterTest struct { } func (f *filterTest) storeQuery(query *filterQuery) { - query.resultHash = query.calculateHash() - logRatio := math.Log(float64(len(query.results))*maxFilterRange/float64(query.end+1-query.begin)) / math.Log(maxFilterRange*maxFilterResultSize) + query.ResultHash = new(common.Hash) + *query.ResultHash = query.calculateHash() + logRatio := math.Log(float64(len(query.results))*maxFilterRange/float64(query.ToBlock+1-query.FromBlock)) / math.Log(maxFilterRange*maxFilterResultSize) bucket := int(math.Floor(logRatio * filterBuckets)) if bucket >= filterBuckets { bucket = filterBuckets - 1 @@ -130,31 +132,31 @@ func (f *filterTest) storeQuery(query *filterQuery) { fmt.Println() } -func (f *filterTest) extendQuery(q *filterQuery) *filterQuery { - rangeLen := q.end + 1 - q.begin +func (f *filterTest) extendRange(q *filterQuery) *filterQuery { + rangeLen := q.ToBlock + 1 - q.FromBlock extLen := rand.Int63n(rangeLen) + 1 if rangeLen+extLen > maxFilterRange { return nil } extBefore := rand.Int63n(extLen + 1) - if extBefore > q.begin { - extBefore = q.begin + if extBefore > q.FromBlock { + extBefore = q.FromBlock } extAfter := extLen - extBefore - if q.end+extAfter > f.finalized { - d := f.finalized - q.end - extAfter + if q.ToBlock+extAfter > f.finalized { + d := q.ToBlock + extAfter - f.finalized extAfter -= d - if extBefore+d <= q.begin { + if extBefore+d <= q.FromBlock { extBefore += d } else { - extBefore = q.begin + extBefore = q.FromBlock } } return &filterQuery{ - begin: q.begin - extBefore, - end: q.end + extAfter, - addresses: q.addresses, - topics: q.topics, + FromBlock: q.FromBlock - extBefore, + ToBlock: q.ToBlock + extAfter, + Address: q.Address, + Topics: q.Topics, } } @@ -184,8 +186,8 @@ func (f *filterTest) newQuery() *filterQuery { func (f *filterTest) newSeedQuery() *filterQuery { block := rand.Int63n(f.finalized + 1) return &filterQuery{ - begin: block, - end: block, + FromBlock: block, + ToBlock: block, } } @@ -200,39 +202,39 @@ func (f *filterTest) newMergedQuery() *filterQuery { topicCount int ) if rand.Intn(2) == 0 { - block = q1.begin + rand.Int63n(q1.end+1-q1.begin) - topicCount = len(q1.topics) + block = q1.FromBlock + rand.Int63n(q1.ToBlock+1-q1.FromBlock) + topicCount = len(q1.Topics) } else { - block = q2.begin + rand.Int63n(q2.end+1-q2.begin) - topicCount = len(q2.topics) + block = q2.FromBlock + rand.Int63n(q2.ToBlock+1-q2.FromBlock) + topicCount = len(q2.Topics) } m := &filterQuery{ - begin: block, - end: block, - topics: make([][]common.Hash, topicCount), + FromBlock: block, + ToBlock: block, + Topics: make([][]common.Hash, topicCount), } - for _, addr := range q1.addresses { + for _, addr := range q1.Address { if rand.Intn(2) == 0 { - m.addresses = append(m.addresses, addr) + m.Address = append(m.Address, addr) } } - for _, addr := range q2.addresses { + for _, addr := range q2.Address { if rand.Intn(2) == 0 { - m.addresses = append(m.addresses, addr) + m.Address = append(m.Address, addr) } } - for i := range m.topics { - if len(q1.topics) > i { - for _, topic := range q1.topics[i] { + for i := range m.Topics { + if len(q1.Topics) > i { + for _, topic := range q1.Topics[i] { if rand.Intn(2) == 0 { - m.topics[i] = append(m.topics[i], topic) + m.Topics[i] = append(m.Topics[i], topic) } } } - if len(q2.topics) > i { - for _, topic := range q2.topics[i] { + if len(q2.Topics) > i { + for _, topic := range q2.Topics[i] { if rand.Intn(2) == 0 { - m.topics[i] = append(m.topics[i], topic) + m.Topics[i] = append(m.Topics[i], topic) } } } @@ -247,11 +249,11 @@ func (f *filterTest) newNarrowedQuery() *filterQuery { } log := q.results[rand.Intn(len(q.results))] var emptyCount int - if len(q.addresses) == 0 { + if len(q.Address) == 0 { emptyCount++ } for i := range log.Topics { - if len(q.topics) <= i || len(q.topics[i]) == 0 { + if len(q.Topics) <= i || len(q.Topics[i]) == 0 { emptyCount++ } } @@ -259,26 +261,26 @@ func (f *filterTest) newNarrowedQuery() *filterQuery { return nil } query := &filterQuery{ - begin: q.begin, - end: q.end, - addresses: q.addresses, - topics: q.topics, + FromBlock: q.FromBlock, + ToBlock: q.ToBlock, + Address: q.Address, + Topics: q.Topics, } pick := rand.Intn(emptyCount) - if len(q.addresses) == 0 { + if len(q.Address) == 0 { if pick == 0 { - q.addresses = []common.Address{log.Address} + q.Address = []common.Address{log.Address} return query } pick-- } for i := range log.Topics { - if len(q.topics) <= i || len(q.topics[i]) == 0 { + if len(q.Topics) <= i || len(q.Topics[i]) == 0 { if pick == 0 { - if len(q.topics) <= i { - q.topics = append(q.topics, make([][]common.Hash, i+1-len(q.topics))...) + if len(q.Topics) <= i { + q.Topics = append(q.Topics, make([][]common.Hash, i+1-len(q.Topics))...) } - q.topics[i] = []common.Hash{log.Topics[i]} + q.Topics[i] = []common.Hash{log.Topics[i]} return query } pick-- @@ -311,12 +313,13 @@ func (f *filterTest) randomQuery() *filterQuery { } type filterQuery struct { - begin, end int64 - addresses []common.Address - topics [][]common.Hash - resultHash common.Hash + FromBlock int64 `json: fromBlock` + ToBlock int64 `json: toBlock` + Address []common.Address `json: address` + Topics [][]common.Hash `json: topics` + ResultHash *common.Hash `json: resultHash, omitEmpty` results []types.Log - err error + Err error `json: error, omitEmpty` } func (fq *filterQuery) calculateHash() common.Hash { @@ -343,88 +346,46 @@ func (f *filterTest) query(query *filterQuery) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() logs, err := f.ec.FilterLogs(ctx, ethereum.FilterQuery{ - FromBlock: big.NewInt(query.begin), - ToBlock: big.NewInt(query.end), - Addresses: query.addresses, - Topics: query.topics, + FromBlock: big.NewInt(query.FromBlock), + ToBlock: big.NewInt(query.ToBlock), + Addresses: query.Address, + Topics: query.Topics, }) if err != nil { - query.err = err + query.Err = err fmt.Println("filter query error", err) return } query.results = logs - fmt.Println("filter query range", query.end+1-query.begin, "results", len(logs)) + fmt.Println("filter query range", query.ToBlock+1-query.FromBlock, "results", len(logs)) } -func (f *filterTest) writeQueries(fn string) { - w, err := os.Create(fn) +func (f *filterTest) readQueries(fn string) { + file, err := os.Open(fn) if err != nil { exit(fmt.Errorf("Error creating filter pattern file", "name", fn, "error", err)) return } - defer w.Close() + json.NewDecoder(file).Decode(f.stored[:]) + file.Close() +} - w.WriteString("\t{\n") - for _, list := range f.stored { - w.WriteString("\t\t{\n") - for _, filter := range list { - w.WriteString(fmt.Sprintf("\t\t\t{%d, %d, []common.Address{\n", filter.begin, filter.end)) - for _, addr := range filter.addresses { - w.WriteString(fmt.Sprintf("\t\t\t\t\tcommon.HexToAddress(\"0x%040x\"),\n", addr)) - } - w.WriteString(fmt.Sprintf("\t\t\t\t}, [][]common.Hash{\n")) - for i, topics := range filter.topics { - if i == 0 { - w.WriteString(fmt.Sprintf("\t\t\t\t\t{\n")) - } - for _, topic := range topics { - w.WriteString(fmt.Sprintf("\t\t\t\t\t\tcommon.HexToHash(\"0x%064x\"),\n", topic)) - } - if i == len(filter.topics)-1 { - w.WriteString(fmt.Sprintf("\t\t\t\t\t},\n")) - } else { - w.WriteString(fmt.Sprintf("\t\t\t\t\t}, {\n")) - } - } - w.WriteString(fmt.Sprintf("\t\t\t\t}, common.HexToHash(\"0x%064x\"),\n", filter.resultHash)) - w.WriteString(fmt.Sprintf("\t\t\t},\n")) - } - w.WriteString("\t\t},\n") +func (f *filterTest) writeQueries(fn string) { + file, err := os.Create(fn) + if err != nil { + exit(fmt.Errorf("Error creating filter pattern file", "name", fn, "error", err)) + return } - w.WriteString("\t},\n") + json.NewEncoder(file).Encode(f.stored[:]) + file.Close() } func (f *filterTest) writeFailed(fn string) { - w, err := os.Create(fn) + file, err := os.Create(fn) if err != nil { exit(fmt.Errorf("Error creating filter error file", "name", fn, "error", err)) return } - defer w.Close() - - w.WriteString("\t{\n") - for _, filter := range f.failed { - w.WriteString(fmt.Sprintf("\t\t{%d, %d, []common.Address{\n", filter.begin, filter.end)) - for _, addr := range filter.addresses { - w.WriteString(fmt.Sprintf("\t\t\t\tcommon.HexToAddress(\"0x%040x\"),\n", addr)) - } - w.WriteString(fmt.Sprintf("\t\t\t}, [][]common.Hash{\n")) - for i, topics := range filter.topics { - if i == 0 { - w.WriteString(fmt.Sprintf("\t\t\t\t{\n")) - } - for _, topic := range topics { - w.WriteString(fmt.Sprintf("\t\t\t\t\tcommon.HexToHash(\"0x%064x\"),\n", topic)) - } - if i == len(filter.topics)-1 { - w.WriteString(fmt.Sprintf("\t\t\t\t},\n")) - } else { - w.WriteString(fmt.Sprintf("\t\t\t\t}, {\n")) - } - } - w.WriteString(fmt.Sprintf("\t\t\t}, \"%v\"),\n", filter.err)) - w.WriteString(fmt.Sprintf("\t\t},\n")) - } - w.WriteString("\t},\n") + json.NewEncoder(file).Encode(f.failed) + file.Close() } From 7d99c650a51b4486ba0b7ea3f48d20fbfcf395f8 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 19 Feb 2025 17:13:57 +0100 Subject: [PATCH 5/5] 1 --- cmd/workload/filtercmd.go | 244 ++++++++++++++++++++++++++++++++------ 1 file changed, 208 insertions(+), 36 deletions(-) diff --git a/cmd/workload/filtercmd.go b/cmd/workload/filtercmd.go index c50cc243ae..89795317ab 100644 --- a/cmd/workload/filtercmd.go +++ b/cmd/workload/filtercmd.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "os" + "sort" "time" "github.com/ethereum/go-ethereum" @@ -31,6 +32,9 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/internal/flags" + + //"github.com/ethereum/go-ethereum/internal/utesting" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli/v2" @@ -47,16 +51,150 @@ const ( var ( filterCommand = &cli.Command{ - Name: "filter", - Usage: "Runs range log filter workload test against an RPC endpoint", + Name: "filter", + Usage: "Log filter workload test commands", + Subcommands: []*cli.Command{ + filterGenCommand, + filterTestCommand, + }, + } + filterGenCommand = &cli.Command{ + Name: "generate", + Usage: "Generates query set for log filter workload test", + ArgsUsage: "", + Action: filterGenCmd, + Flags: []cli.Flag{ + filterQueryFileFlag, + filterErrorFileFlag, + }, + } + filterTestCommand = &cli.Command{ + Name: "test", + Usage: "Runs log filter workload test against an RPC endpoint", ArgsUsage: "", Action: filterTestCmd, - Flags: []cli.Flag{}, + Flags: []cli.Flag{ + filterQueryFileFlag, + filterErrorFileFlag, + }, + } + filterQueryFileFlag = &cli.StringFlag{ + Name: "queries", + Usage: "JSON file containing filter test queries", + Category: flags.TestingCategory, + Value: "filter_queries.json", + } + filterErrorFileFlag = &cli.StringFlag{ + Name: "errors", + Usage: "JSON file containing failed filter queries", + Category: flags.TestingCategory, + Value: "filter_errors.json", } ) +const passCount = 5 + func filterTestCmd(ctx *cli.Context) error { - f := filterTest{ec: makeEthClient(ctx)} + f := newFilterTest(ctx) + if f.loadQueries() == 0 { + exit("No test requests loaded") + } + f.getFinalizedBlock() + + type queryTest struct { + query *filterQuery + bucket, index int + runtime []time.Duration + medianTime time.Duration + } + var queries, processed []queryTest + + for i, bucket := range f.stored[:] { + for j, query := range bucket { + if query.ToBlock > f.finalized { + fmt.Println("invalid range") + continue + } + queries = append(queries, queryTest{query: query, bucket: i, index: j}) + } + } + + var failed, mismatch int + for i := 1; i <= passCount; i++ { + fmt.Println("Performance test pass", i, "/", passCount) + for len(queries) > 0 { + pick := rand.Intn(len(queries)) + qt := queries[pick] + queries[pick] = queries[len(queries)-1] + queries = queries[:len(queries)-1] + start := time.Now() + f.query(qt.query) + qt.runtime = append(qt.runtime, time.Since(start)) + sort.Slice(qt.runtime, func(i, j int) bool { return qt.runtime[i] < qt.runtime[j] }) + qt.medianTime = qt.runtime[len(qt.runtime)/2] + if qt.query.Err != nil { + fmt.Println(qt.bucket, qt.index, "err", qt.query.Err) + failed++ + continue + } + if *qt.query.ResultHash != qt.query.calculateHash() { + fmt.Println(qt.bucket, qt.index, "mismatch") + mismatch++ + continue + } + processed = append(processed, qt) + if len(processed)%50 == 0 { + fmt.Println("processed:", len(processed), "remaining", len(queries), "failed:", failed, "result mismatch:", mismatch) + } + } + queries, processed = processed, nil + } + fmt.Println("Done; processed:", len(queries), "failed:", failed, "result mismatch:", mismatch) + + type bucketStats struct { + blocks int64 + count, logs int + runtime time.Duration + } + stats := make([]bucketStats, len(f.stored)) + var wildcardStats bucketStats + for _, qt := range queries { + bs := &stats[qt.bucket] + if qt.query.isWildcard() { + bs = &wildcardStats + } + bs.blocks += qt.query.ToBlock + 1 - qt.query.FromBlock + bs.count++ + bs.logs += len(qt.query.results) + bs.runtime += qt.medianTime + } + + printStats := func(name string, stats *bucketStats) { + if stats.count == 0 { + return + } + fmt.Println(name, "query count", stats.count, "avg block count", float64(stats.blocks)/float64(stats.count), "avg log count", float64(stats.logs)/float64(stats.count), "avg runtime", stats.runtime/time.Duration(stats.count)) + } + + fmt.Println() + for i := range stats { + printStats(fmt.Sprintf("bucket #%d", i), &stats[i]) + } + printStats("wild card queries", &wildcardStats) + fmt.Println() + sort.Slice(queries, func(i, j int) bool { + return queries[i].medianTime > queries[j].medianTime + }) + for i := 0; i < 100; i++ { + q := queries[i] + fmt.Println("Most expensive query #", i+1, "median time", q.medianTime, "max time", q.runtime[len(q.runtime)-1], "results", len(q.query.results), "fromBlock", q.query.FromBlock, "toBlock", q.query.ToBlock, "addresses", q.query.Address, "topics", q.query.Topics) + } + return nil +} + +func filterGenCmd(ctx *cli.Context) error { + f := newFilterTest(ctx) + //f.loadQueries() //TODO lastWrite := time.Now() for { select { @@ -96,8 +234,8 @@ func filterTestCmd(ctx *cli.Context) error { } f.storeQuery(query) if time.Since(lastWrite) > time.Second*10 { - f.writeQueries("filter_queries") - f.writeFailed("filter_errors") + f.writeQueries() + f.writeErrors() lastWrite = time.Now() } } @@ -106,10 +244,19 @@ func filterTestCmd(ctx *cli.Context) error { } type filterTest struct { - ec *ethclient.Client - finalized int64 - stored [filterBuckets][]*filterQuery - failed []*filterQuery + ec *ethclient.Client + finalized int64 + queryFile, errorFile string + stored [filterBuckets][]*filterQuery + failed []*filterQuery +} + +func newFilterTest(ctx *cli.Context) *filterTest { + return &filterTest{ + ec: makeEthClient(ctx), + queryFile: ctx.String(filterQueryFileFlag.Name), + errorFile: ctx.String(filterErrorFileFlag.Name), + } } func (f *filterTest) storeQuery(query *filterQuery) { @@ -263,24 +410,31 @@ func (f *filterTest) newNarrowedQuery() *filterQuery { query := &filterQuery{ FromBlock: q.FromBlock, ToBlock: q.ToBlock, - Address: q.Address, - Topics: q.Topics, + Address: make([]common.Address, len(q.Address)), + Topics: make([][]common.Hash, len(q.Topics)), + } + copy(query.Address, q.Address) + for i, topics := range q.Topics { + if len(topics) > 0 { + query.Topics[i] = make([]common.Hash, len(topics)) + copy(query.Topics[i], topics) + } } pick := rand.Intn(emptyCount) - if len(q.Address) == 0 { + if len(query.Address) == 0 { if pick == 0 { - q.Address = []common.Address{log.Address} + query.Address = []common.Address{log.Address} return query } pick-- } for i := range log.Topics { - if len(q.Topics) <= i || len(q.Topics[i]) == 0 { + if len(query.Topics) <= i || len(query.Topics[i]) == 0 { if pick == 0 { - if len(q.Topics) <= i { - q.Topics = append(q.Topics, make([][]common.Hash, i+1-len(q.Topics))...) + if len(query.Topics) <= i { + query.Topics = append(query.Topics, make([][]common.Hash, i+1-len(query.Topics))...) } - q.Topics[i] = []common.Hash{log.Topics[i]} + query.Topics[i] = []common.Hash{log.Topics[i]} return query } pick-- @@ -322,6 +476,18 @@ type filterQuery struct { Err error `json: error, omitEmpty` } +func (fq *filterQuery) isWildcard() bool { + if len(fq.Address) != 0 { + return false + } + for _, topics := range fq.Topics { + if len(topics) != 0 { + return false + } + } + return true +} + func (fq *filterQuery) calculateHash() common.Hash { enc, err := rlp.EncodeToBytes(&fq.results) if err != nil { @@ -357,33 +523,39 @@ func (f *filterTest) query(query *filterQuery) { return } query.results = logs - fmt.Println("filter query range", query.ToBlock+1-query.FromBlock, "results", len(logs)) + //fmt.Println("filter query range", query.ToBlock+1-query.FromBlock, "results", len(logs)) } -func (f *filterTest) readQueries(fn string) { - file, err := os.Open(fn) +func (f *filterTest) loadQueries() int { + file, err := os.Open(f.queryFile) if err != nil { - exit(fmt.Errorf("Error creating filter pattern file", "name", fn, "error", err)) + fmt.Println("Error opening", f.queryFile, ":", err) + return 0 + } + json.NewDecoder(file).Decode(&f.stored) + file.Close() + var count int + for _, bucket := range f.stored { + count += len(bucket) + } + fmt.Println("Loaded", count, "filter test queries") + return count +} + +func (f *filterTest) writeQueries() { + file, err := os.Create(f.queryFile) + if err != nil { + exit(fmt.Errorf("Error creating filter test query file", "name", f.queryFile, "error", err)) return } - json.NewDecoder(file).Decode(f.stored[:]) + json.NewEncoder(file).Encode(&f.stored) file.Close() } -func (f *filterTest) writeQueries(fn string) { - file, err := os.Create(fn) +func (f *filterTest) writeErrors() { + file, err := os.Create(f.errorFile) if err != nil { - exit(fmt.Errorf("Error creating filter pattern file", "name", fn, "error", err)) - return - } - json.NewEncoder(file).Encode(f.stored[:]) - file.Close() -} - -func (f *filterTest) writeFailed(fn string) { - file, err := os.Create(fn) - if err != nil { - exit(fmt.Errorf("Error creating filter error file", "name", fn, "error", err)) + exit(fmt.Errorf("Error creating filter error file", "name", f.errorFile, "error", err)) return } json.NewEncoder(file).Encode(f.failed)