From 0851646e480f3fff0d6cdd900fc1034960b993f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Thu, 9 Apr 2020 11:55:32 +0200 Subject: [PATCH] les, les/lespay/client: add service value statistics and API (#20837) This PR adds service value measurement statistics to the light client. It also adds a private API that makes these statistics accessible. A follow-up PR will add the new server pool which uses these statistics to select servers with good performance. This document describes the function of the new components: https://gist.github.com/zsfelfoldi/3c7ace895234b7b345ab4f71dab102d4 Co-authored-by: rjl493456442 Co-authored-by: rjl493456442 --- internal/web3ext/web3ext.go | 32 ++ les/benchmark.go | 2 +- les/client.go | 50 ++- les/client_handler.go | 8 + les/lespay/client/api.go | 107 +++++ les/lespay/client/requestbasket.go | 285 +++++++++++++ les/lespay/client/requestbasket_test.go | 161 ++++++++ les/lespay/client/timestats.go | 237 +++++++++++ les/lespay/client/timestats_test.go | 137 +++++++ les/lespay/client/valuetracker.go | 515 ++++++++++++++++++++++++ les/lespay/client/valuetracker_test.go | 135 +++++++ les/peer.go | 122 +++++- les/protocol.go | 63 ++- les/txrelay.go | 2 +- les/utils/expiredvalue.go | 202 ++++++++++ les/utils/expiredvalue_test.go | 116 ++++++ p2p/enode/node.go | 6 +- 17 files changed, 2142 insertions(+), 38 deletions(-) create mode 100644 les/lespay/client/api.go create mode 100644 les/lespay/client/requestbasket.go create mode 100644 les/lespay/client/requestbasket_test.go create mode 100644 les/lespay/client/timestats.go create mode 100644 les/lespay/client/timestats_test.go create mode 100644 les/lespay/client/valuetracker.go create mode 100644 les/lespay/client/valuetracker_test.go create mode 100644 les/utils/expiredvalue.go create mode 100644 les/utils/expiredvalue_test.go diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 71f0a2ed5d..4d85f0597d 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -33,6 +33,7 @@ var Modules = map[string]string{ "swarmfs": SwarmfsJs, "txpool": TxpoolJs, "les": LESJs, + "lespay": LESPayJs, } const ChequebookJs = ` @@ -856,3 +857,34 @@ web3._extend({ ] }); ` + +const LESPayJs = ` +web3._extend({ + property: 'lespay', + methods: + [ + new web3._extend.Method({ + name: 'distribution', + call: 'lespay_distribution', + params: 2 + }), + new web3._extend.Method({ + name: 'timeout', + call: 'lespay_timeout', + params: 2 + }), + new web3._extend.Method({ + name: 'value', + call: 'lespay_value', + params: 2 + }), + ], + properties: + [ + new web3._extend.Property({ + name: 'requestStats', + getter: 'lespay_requestStats' + }), + ] +}); +` diff --git a/les/benchmark.go b/les/benchmark.go index dbb10a5c2a..a146de2fed 100644 --- a/les/benchmark.go +++ b/les/benchmark.go @@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error { func (b *benchmarkTxSend) request(peer *serverPeer, index int) error { enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]}) - return peer.sendTxs(0, enc) + return peer.sendTxs(0, 1, enc) } // benchmarkTxStatus implements requestBenchmark diff --git a/les/client.go b/les/client.go index dfd0909778..94ac4ce847 100644 --- a/les/client.go +++ b/les/client.go @@ -19,6 +19,7 @@ package les import ( "fmt" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -37,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/les/checkpointoracle" + lpc "github.com/ethereum/go-ethereum/les/lespay/client" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -49,15 +51,16 @@ import ( type LightEthereum struct { lesCommons - peers *serverPeerSet - reqDist *requestDistributor - retriever *retrieveManager - odr *LesOdr - relay *lesTxRelay - handler *clientHandler - txPool *light.TxPool - blockchain *light.LightChain - serverPool *serverPool + peers *serverPeerSet + reqDist *requestDistributor + retriever *retrieveManager + odr *LesOdr + relay *lesTxRelay + handler *clientHandler + txPool *light.TxPool + blockchain *light.LightChain + serverPool *serverPool + valueTracker *lpc.ValueTracker bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports @@ -74,6 +77,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { if err != nil { return nil, err } + lespayDb, err := ctx.OpenDatabase("lespay", 0, 0, "eth/db/lespay") + if err != nil { + return nil, err + } chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.OverrideIstanbul, config.OverrideMuirGlacier) if _, isCompat := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !isCompat { @@ -99,7 +106,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), serverPool: newServerPool(chainDb, config.UltraLightServers), + valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)), } + peers.subscribe((*vtSubscription)(leth.valueTracker)) leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) leth.relay = newLesTxRelay(peers, leth.retriever) @@ -154,6 +163,23 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { return leth, nil } +// vtSubscription implements serverPeerSubscriber +type vtSubscription lpc.ValueTracker + +// registerPeer implements serverPeerSubscriber +func (v *vtSubscription) registerPeer(p *serverPeer) { + vt := (*lpc.ValueTracker)(v) + p.setValueTracker(vt, vt.Register(p.ID())) + p.updateVtParams() +} + +// unregisterPeer implements serverPeerSubscriber +func (v *vtSubscription) unregisterPeer(p *serverPeer) { + vt := (*lpc.ValueTracker)(v) + vt.Unregister(p.ID()) + p.setValueTracker(nil, nil) +} + type LightDummyAPI struct{} // Etherbase is the address that mining rewards will be send to @@ -207,6 +233,11 @@ func (s *LightEthereum) APIs() []rpc.API { Version: "1.0", Service: NewPrivateLightAPI(&s.lesCommons), Public: false, + }, { + Namespace: "lespay", + Version: "1.0", + Service: lpc.NewPrivateClientAPI(s.valueTracker), + Public: false, }, }...) } @@ -266,6 +297,7 @@ func (s *LightEthereum) Stop() error { s.engine.Close() s.eventMux.Stop() s.serverPool.stop() + s.valueTracker.Stop() s.chainDb.Close() s.wg.Wait() log.Info("Light ethereum stopped") diff --git a/les/client_handler.go b/les/client_handler.go index 9d8b989012..6367fdb6be 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -180,6 +180,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrRequestRejected, "") } p.updateFlowControl(update) + p.updateVtParams() if req.Hash != (common.Hash{}) { if p.announceType == announceTypeNone { @@ -205,6 +206,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) if h.fetcher.requestedID(resp.ReqID) { h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) } else { @@ -222,6 +224,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgBlockBodies, ReqID: resp.ReqID, @@ -237,6 +240,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgCode, ReqID: resp.ReqID, @@ -252,6 +256,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgReceipts, ReqID: resp.ReqID, @@ -267,6 +272,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgProofsV2, ReqID: resp.ReqID, @@ -282,6 +288,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgHelperTrieProofs, ReqID: resp.ReqID, @@ -297,6 +304,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } p.fcServer.ReceivedReply(resp.ReqID, resp.BV) + p.answeredRequest(resp.ReqID) deliverMsg = &Msg{ MsgType: MsgTxStatus, ReqID: resp.ReqID, diff --git a/les/lespay/client/api.go b/les/lespay/client/api.go new file mode 100644 index 0000000000..5ad6ffd77e --- /dev/null +++ b/les/lespay/client/api.go @@ -0,0 +1,107 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// PrivateClientAPI implements the lespay client side API +type PrivateClientAPI struct { + vt *ValueTracker +} + +// NewPrivateClientAPI creates a PrivateClientAPI +func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI { + return &PrivateClientAPI{vt} +} + +// parseNodeStr converts either an enode address or a plain hex node id to enode.ID +func parseNodeStr(nodeStr string) (enode.ID, error) { + if id, err := enode.ParseID(nodeStr); err == nil { + return id, nil + } + if node, err := enode.Parse(enode.ValidSchemes, nodeStr); err == nil { + return node.ID(), nil + } else { + return enode.ID{}, err + } +} + +// RequestStats returns the current contents of the reference request basket, with +// request values meaning average per request rather than total. +func (api *PrivateClientAPI) RequestStats() []RequestStatsItem { + return api.vt.RequestStats() +} + +// Distribution returns a distribution as a series of (X, Y) chart coordinates, +// where the X axis is the response time in seconds while the Y axis is the amount of +// service value received with a response time close to the X coordinate. +// The distribution is optionally normalized to a sum of 1. +// If nodeStr == "" then the global distribution is returned, otherwise the individual +// distribution of the specified server node. +func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error) { + var expFactor utils.ExpirationFactor + if !normalized { + expFactor = utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now())) + } + if nodeStr == "" { + return api.vt.RtStats().Distribution(normalized, expFactor), nil + } + if id, err := parseNodeStr(nodeStr); err == nil { + return api.vt.GetNode(id).RtStats().Distribution(normalized, expFactor), nil + } else { + return RtDistribution{}, err + } +} + +// Timeout suggests a timeout value based on either the global distribution or the +// distribution of the specified node. The parameter is the desired rate of timeouts +// assuming a similar distribution in the future. +// Note that the actual timeout should have a sensible minimum bound so that operating +// under ideal working conditions for a long time (for example, using a local server +// with very low response times) will not make it very hard for the system to accommodate +// longer response times in the future. +func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error) { + if nodeStr == "" { + return float64(api.vt.RtStats().Timeout(failRate)) / float64(time.Second), nil + } + if id, err := parseNodeStr(nodeStr); err == nil { + return float64(api.vt.GetNode(id).RtStats().Timeout(failRate)) / float64(time.Second), nil + } else { + return 0, err + } +} + +// Value calculates the total service value provided either globally or by the specified +// server node, using a weight function based on the given timeout. +func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error) { + wt := TimeoutWeights(time.Duration(timeout * float64(time.Second))) + expFactor := utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now())) + if nodeStr == "" { + return api.vt.RtStats().Value(wt, expFactor), nil + } + if id, err := parseNodeStr(nodeStr); err == nil { + return api.vt.GetNode(id).RtStats().Value(wt, expFactor), nil + } else { + return 0, err + } +} diff --git a/les/lespay/client/requestbasket.go b/les/lespay/client/requestbasket.go new file mode 100644 index 0000000000..55d4b165df --- /dev/null +++ b/les/lespay/client/requestbasket.go @@ -0,0 +1,285 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "io" + + "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/rlp" +) + +const basketFactor = 1000000 // reference basket amount and value scale factor + +// referenceBasket keeps track of global request usage statistics and the usual prices +// of each used request type relative to each other. The amounts in the basket are scaled +// up by basketFactor because of the exponential expiration of long-term statistical data. +// Values are scaled so that the sum of all amounts and the sum of all values are equal. +// +// reqValues represent the internal relative value estimates for each request type and are +// calculated as value / amount. The average reqValue of all used requests is 1. +// In other words: SUM(refBasket[type].amount * reqValue[type]) = SUM(refBasket[type].amount) +type referenceBasket struct { + basket requestBasket + reqValues []float64 // contents are read only, new slice is created for each update +} + +// serverBasket collects served request amount and value statistics for a single server. +// +// Values are gradually transferred to the global reference basket with a long time +// constant so that each server basket represents long term usage and price statistics. +// When the transferred part is added to the reference basket the values are scaled so +// that their sum equals the total value calculated according to the previous reqValues. +// The ratio of request values coming from the server basket represent the pricing of +// the specific server and modify the global estimates with a weight proportional to +// the amount of service provided by the server. +type serverBasket struct { + basket requestBasket + rvFactor float64 +} + +type ( + // requestBasket holds amounts and values for each request type. + // These values are exponentially expired (see utils.ExpiredValue). The power of 2 + // exponent is applicable to all values within. + requestBasket struct { + items []basketItem + exp uint64 + } + // basketItem holds amount and value for a single request type. Value is the total + // relative request value accumulated for served requests while amount is the counter + // for each request type. + // Note that these values are both scaled up by basketFactor because of the exponential + // expiration. + basketItem struct { + amount, value uint64 + } +) + +// setExp sets the power of 2 exponent of the structure, scaling base values (the amounts +// and request values) up or down if necessary. +func (b *requestBasket) setExp(exp uint64) { + if exp > b.exp { + shift := exp - b.exp + for i, item := range b.items { + item.amount >>= shift + item.value >>= shift + b.items[i] = item + } + b.exp = exp + } + if exp < b.exp { + shift := b.exp - exp + for i, item := range b.items { + item.amount <<= shift + item.value <<= shift + b.items[i] = item + } + b.exp = exp + } +} + +// init initializes a new server basket with the given service vector size (number of +// different request types) +func (s *serverBasket) init(size int) { + if s.basket.items == nil { + s.basket.items = make([]basketItem, size) + } +} + +// add adds the give type and amount of requests to the basket. Cost is calculated +// according to the server's own cost table. +func (s *serverBasket) add(reqType, reqAmount uint32, reqCost uint64, expFactor utils.ExpirationFactor) { + s.basket.setExp(expFactor.Exp) + i := &s.basket.items[reqType] + i.amount += uint64(float64(uint64(reqAmount)*basketFactor) * expFactor.Factor) + i.value += uint64(float64(reqCost) * s.rvFactor * expFactor.Factor) +} + +// updateRvFactor updates the request value factor that scales server costs into the +// local value dimensions. +func (s *serverBasket) updateRvFactor(rvFactor float64) { + s.rvFactor = rvFactor +} + +// transfer decreases amounts and values in the basket with the given ratio and +// moves the removed amounts into a new basket which is returned and can be added +// to the global reference basket. +func (s *serverBasket) transfer(ratio float64) requestBasket { + res := requestBasket{ + items: make([]basketItem, len(s.basket.items)), + exp: s.basket.exp, + } + for i, v := range s.basket.items { + ta := uint64(float64(v.amount) * ratio) + tv := uint64(float64(v.value) * ratio) + if ta > v.amount { + ta = v.amount + } + if tv > v.value { + tv = v.value + } + s.basket.items[i] = basketItem{v.amount - ta, v.value - tv} + res.items[i] = basketItem{ta, tv} + } + return res +} + +// init initializes the reference basket with the given service vector size (number of +// different request types) +func (r *referenceBasket) init(size int) { + r.reqValues = make([]float64, size) + r.normalize() + r.updateReqValues() +} + +// add adds the transferred part of a server basket to the reference basket while scaling +// value amounts so that their sum equals the total value calculated according to the +// previous reqValues. +func (r *referenceBasket) add(newBasket requestBasket) { + r.basket.setExp(newBasket.exp) + // scale newBasket to match service unit value + var ( + totalCost uint64 + totalValue float64 + ) + for i, v := range newBasket.items { + totalCost += v.value + totalValue += float64(v.amount) * r.reqValues[i] + } + if totalCost > 0 { + // add to reference with scaled values + scaleValues := totalValue / float64(totalCost) + for i, v := range newBasket.items { + r.basket.items[i].amount += v.amount + r.basket.items[i].value += uint64(float64(v.value) * scaleValues) + } + } + r.updateReqValues() +} + +// updateReqValues recalculates reqValues after adding transferred baskets. Note that +// values should be normalized first. +func (r *referenceBasket) updateReqValues() { + r.reqValues = make([]float64, len(r.reqValues)) + for i, b := range r.basket.items { + if b.amount > 0 { + r.reqValues[i] = float64(b.value) / float64(b.amount) + } else { + r.reqValues[i] = 0 + } + } +} + +// normalize ensures that the sum of values equal the sum of amounts in the basket. +func (r *referenceBasket) normalize() { + var sumAmount, sumValue uint64 + for _, b := range r.basket.items { + sumAmount += b.amount + sumValue += b.value + } + add := float64(int64(sumAmount-sumValue)) / float64(sumValue) + for i, b := range r.basket.items { + b.value += uint64(int64(float64(b.value) * add)) + r.basket.items[i] = b + } +} + +// reqValueFactor calculates the request value factor applicable to the server with +// the given announced request cost list +func (r *referenceBasket) reqValueFactor(costList []uint64) float64 { + var ( + totalCost float64 + totalValue uint64 + ) + for i, b := range r.basket.items { + totalCost += float64(costList[i]) * float64(b.amount) // use floats to avoid overflow + totalValue += b.value + } + if totalCost < 1 { + return 0 + } + return float64(totalValue) * basketFactor / totalCost +} + +// EncodeRLP implements rlp.Encoder +func (b *basketItem) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{b.amount, b.value}) +} + +// DecodeRLP implements rlp.Decoder +func (b *basketItem) DecodeRLP(s *rlp.Stream) error { + var item struct { + Amount, Value uint64 + } + if err := s.Decode(&item); err != nil { + return err + } + b.amount, b.value = item.Amount, item.Value + return nil +} + +// EncodeRLP implements rlp.Encoder +func (r *requestBasket) EncodeRLP(w io.Writer) error { + return rlp.Encode(w, []interface{}{r.items, r.exp}) +} + +// DecodeRLP implements rlp.Decoder +func (r *requestBasket) DecodeRLP(s *rlp.Stream) error { + var enc struct { + Items []basketItem + Exp uint64 + } + if err := s.Decode(&enc); err != nil { + return err + } + r.items, r.exp = enc.Items, enc.Exp + return nil +} + +// convertMapping converts a basket loaded from the database into the current format. +// If the available request types and their mapping into the service vector differ from +// the one used when saving the basket then this function reorders old fields and fills +// in previously unknown fields by scaling up amounts and values taken from the +// initialization basket. +func (r requestBasket) convertMapping(oldMapping, newMapping []string, initBasket requestBasket) requestBasket { + nameMap := make(map[string]int) + for i, name := range oldMapping { + nameMap[name] = i + } + rc := requestBasket{items: make([]basketItem, len(newMapping))} + var scale, oldScale, newScale float64 + for i, name := range newMapping { + if ii, ok := nameMap[name]; ok { + rc.items[i] = r.items[ii] + oldScale += float64(initBasket.items[i].amount) * float64(initBasket.items[i].amount) + newScale += float64(rc.items[i].amount) * float64(initBasket.items[i].amount) + } + } + if oldScale > 1e-10 { + scale = newScale / oldScale + } else { + scale = 1 + } + for i, name := range newMapping { + if _, ok := nameMap[name]; !ok { + rc.items[i].amount = uint64(float64(initBasket.items[i].amount) * scale) + rc.items[i].value = uint64(float64(initBasket.items[i].value) * scale) + } + } + return rc +} diff --git a/les/lespay/client/requestbasket_test.go b/les/lespay/client/requestbasket_test.go new file mode 100644 index 0000000000..7c5f87c618 --- /dev/null +++ b/les/lespay/client/requestbasket_test.go @@ -0,0 +1,161 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/les/utils" +) + +func checkU64(t *testing.T, name string, value, exp uint64) { + if value != exp { + t.Errorf("Incorrect value for %s: got %d, expected %d", name, value, exp) + } +} + +func checkF64(t *testing.T, name string, value, exp, tol float64) { + if value < exp-tol || value > exp+tol { + t.Errorf("Incorrect value for %s: got %f, expected %f", name, value, exp) + } +} + +func TestServerBasket(t *testing.T) { + var s serverBasket + s.init(2) + // add some requests with different request value factors + s.updateRvFactor(1) + noexp := utils.ExpirationFactor{Factor: 1} + s.add(0, 1000, 10000, noexp) + s.add(1, 3000, 60000, noexp) + s.updateRvFactor(10) + s.add(0, 4000, 4000, noexp) + s.add(1, 2000, 4000, noexp) + s.updateRvFactor(10) + // check basket contents directly + checkU64(t, "s.basket[0].amount", s.basket.items[0].amount, 5000*basketFactor) + checkU64(t, "s.basket[0].value", s.basket.items[0].value, 50000) + checkU64(t, "s.basket[1].amount", s.basket.items[1].amount, 5000*basketFactor) + checkU64(t, "s.basket[1].value", s.basket.items[1].value, 100000) + // transfer 50% of the contents of the basket + transfer1 := s.transfer(0.5) + checkU64(t, "transfer1[0].amount", transfer1.items[0].amount, 2500*basketFactor) + checkU64(t, "transfer1[0].value", transfer1.items[0].value, 25000) + checkU64(t, "transfer1[1].amount", transfer1.items[1].amount, 2500*basketFactor) + checkU64(t, "transfer1[1].value", transfer1.items[1].value, 50000) + // add more requests + s.updateRvFactor(100) + s.add(0, 1000, 100, noexp) + // transfer 25% of the contents of the basket + transfer2 := s.transfer(0.25) + checkU64(t, "transfer2[0].amount", transfer2.items[0].amount, (2500+1000)/4*basketFactor) + checkU64(t, "transfer2[0].value", transfer2.items[0].value, (25000+10000)/4) + checkU64(t, "transfer2[1].amount", transfer2.items[1].amount, 2500/4*basketFactor) + checkU64(t, "transfer2[1].value", transfer2.items[1].value, 50000/4) +} + +func TestConvertMapping(t *testing.T) { + b := requestBasket{items: []basketItem{{3, 3}, {1, 1}, {2, 2}}} + oldMap := []string{"req3", "req1", "req2"} + newMap := []string{"req1", "req2", "req3", "req4"} + init := requestBasket{items: []basketItem{{2, 2}, {4, 4}, {6, 6}, {8, 8}}} + bc := b.convertMapping(oldMap, newMap, init) + checkU64(t, "bc[0].amount", bc.items[0].amount, 1) + checkU64(t, "bc[1].amount", bc.items[1].amount, 2) + checkU64(t, "bc[2].amount", bc.items[2].amount, 3) + checkU64(t, "bc[3].amount", bc.items[3].amount, 4) // 8 should be scaled down to 4 +} + +func TestReqValueFactor(t *testing.T) { + var ref referenceBasket + ref.basket = requestBasket{items: make([]basketItem, 4)} + for i := range ref.basket.items { + ref.basket.items[i].amount = uint64(i+1) * basketFactor + ref.basket.items[i].value = uint64(i+1) * basketFactor + } + ref.init(4) + rvf := ref.reqValueFactor([]uint64{1000, 2000, 3000, 4000}) + // expected value is (1000000+2000000+3000000+4000000) / (1*1000+2*2000+3*3000+4*4000) = 10000000/30000 = 333.333 + checkF64(t, "reqValueFactor", rvf, 333.333, 1) +} + +func TestNormalize(t *testing.T) { + for cycle := 0; cycle < 100; cycle += 1 { + // Initialize data for testing + valueRange, lower := 1000000, 1000000 + ref := referenceBasket{basket: requestBasket{items: make([]basketItem, 10)}} + for i := 0; i < 10; i++ { + ref.basket.items[i].amount = uint64(rand.Intn(valueRange) + lower) + ref.basket.items[i].value = uint64(rand.Intn(valueRange) + lower) + } + ref.normalize() + + // Check whether SUM(amount) ~= SUM(value) + var sumAmount, sumValue uint64 + for i := 0; i < 10; i++ { + sumAmount += ref.basket.items[i].amount + sumValue += ref.basket.items[i].value + } + var epsilon = 0.01 + if float64(sumAmount)*(1+epsilon) < float64(sumValue) || float64(sumAmount)*(1-epsilon) > float64(sumValue) { + t.Fatalf("Failed to normalize sumAmount: %d sumValue: %d", sumAmount, sumValue) + } + } +} + +func TestReqValueAdjustment(t *testing.T) { + var s1, s2 serverBasket + s1.init(3) + s2.init(3) + cost1 := []uint64{30000, 60000, 90000} + cost2 := []uint64{100000, 200000, 300000} + var ref referenceBasket + ref.basket = requestBasket{items: make([]basketItem, 3)} + for i := range ref.basket.items { + ref.basket.items[i].amount = 123 * basketFactor + ref.basket.items[i].value = 123 * basketFactor + } + ref.init(3) + // initial reqValues are expected to be {1, 1, 1} + checkF64(t, "reqValues[0]", ref.reqValues[0], 1, 0.01) + checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01) + checkF64(t, "reqValues[2]", ref.reqValues[2], 1, 0.01) + var logOffset utils.Fixed64 + for period := 0; period < 1000; period++ { + exp := utils.ExpFactor(logOffset) + s1.updateRvFactor(ref.reqValueFactor(cost1)) + s2.updateRvFactor(ref.reqValueFactor(cost2)) + // throw in random requests into each basket using their internal pricing + for i := 0; i < 1000; i++ { + reqType, reqAmount := uint32(rand.Intn(3)), uint32(rand.Intn(10)+1) + reqCost := uint64(reqAmount) * cost1[reqType] + s1.add(reqType, reqAmount, reqCost, exp) + reqType, reqAmount = uint32(rand.Intn(3)), uint32(rand.Intn(10)+1) + reqCost = uint64(reqAmount) * cost2[reqType] + s2.add(reqType, reqAmount, reqCost, exp) + } + ref.add(s1.transfer(0.1)) + ref.add(s2.transfer(0.1)) + ref.normalize() + ref.updateReqValues() + logOffset += utils.Float64ToFixed64(0.1) + } + checkF64(t, "reqValues[0]", ref.reqValues[0], 0.5, 0.01) + checkF64(t, "reqValues[1]", ref.reqValues[1], 1, 0.01) + checkF64(t, "reqValues[2]", ref.reqValues[2], 1.5, 0.01) +} diff --git a/les/lespay/client/timestats.go b/les/lespay/client/timestats.go new file mode 100644 index 0000000000..7f1ffdbe26 --- /dev/null +++ b/les/lespay/client/timestats.go @@ -0,0 +1,237 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "io" + "math" + "time" + + "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + minResponseTime = time.Millisecond * 50 + maxResponseTime = time.Second * 10 + timeStatLength = 32 + weightScaleFactor = 1000000 +) + +// ResponseTimeStats is the response time distribution of a set of answered requests, +// weighted with request value, either served by a single server or aggregated for +// multiple servers. +// It it a fixed length (timeStatLength) distribution vector with linear interpolation. +// The X axis (the time values) are not linear, they should be transformed with +// TimeToStatScale and StatScaleToTime. +type ( + ResponseTimeStats struct { + stats [timeStatLength]uint64 + exp uint64 + } + ResponseTimeWeights [timeStatLength]float64 +) + +var timeStatsLogFactor = (timeStatLength - 1) / (math.Log(float64(maxResponseTime)/float64(minResponseTime)) + 1) + +// TimeToStatScale converts a response time to a distribution vector index. The index +// is represented by a float64 so that linear interpolation can be applied. +func TimeToStatScale(d time.Duration) float64 { + if d < 0 { + return 0 + } + r := float64(d) / float64(minResponseTime) + if r > 1 { + r = math.Log(r) + 1 + } + r *= timeStatsLogFactor + if r > timeStatLength-1 { + return timeStatLength - 1 + } + return r +} + +// StatScaleToTime converts a distribution vector index to a response time. The index +// is represented by a float64 so that linear interpolation can be applied. +func StatScaleToTime(r float64) time.Duration { + r /= timeStatsLogFactor + if r > 1 { + r = math.Exp(r - 1) + } + return time.Duration(r * float64(minResponseTime)) +} + +// TimeoutWeights calculates the weight function used for calculating service value +// based on the response time distribution of the received service. +// It is based on the request timeout value of the system. It consists of a half cosine +// function starting with 1, crossing zero at timeout and reaching -1 at 2*timeout. +// After 2*timeout the weight is constant -1. +func TimeoutWeights(timeout time.Duration) (res ResponseTimeWeights) { + for i := range res { + t := StatScaleToTime(float64(i)) + if t < 2*timeout { + res[i] = math.Cos(math.Pi / 2 * float64(t) / float64(timeout)) + } else { + res[i] = -1 + } + } + return +} + +// EncodeRLP implements rlp.Encoder +func (rt *ResponseTimeStats) EncodeRLP(w io.Writer) error { + enc := struct { + Stats [timeStatLength]uint64 + Exp uint64 + }{rt.stats, rt.exp} + return rlp.Encode(w, &enc) +} + +// DecodeRLP implements rlp.Decoder +func (rt *ResponseTimeStats) DecodeRLP(s *rlp.Stream) error { + var enc struct { + Stats [timeStatLength]uint64 + Exp uint64 + } + if err := s.Decode(&enc); err != nil { + return err + } + rt.stats, rt.exp = enc.Stats, enc.Exp + return nil +} + +// Add adds a new response time with the given weight to the distribution. +func (rt *ResponseTimeStats) Add(respTime time.Duration, weight float64, expFactor utils.ExpirationFactor) { + rt.setExp(expFactor.Exp) + weight *= expFactor.Factor * weightScaleFactor + r := TimeToStatScale(respTime) + i := int(r) + r -= float64(i) + rt.stats[i] += uint64(weight * (1 - r)) + if i < timeStatLength-1 { + rt.stats[i+1] += uint64(weight * r) + } +} + +// setExp sets the power of 2 exponent of the structure, scaling base values (the vector +// itself) up or down if necessary. +func (rt *ResponseTimeStats) setExp(exp uint64) { + if exp > rt.exp { + shift := exp - rt.exp + for i, v := range rt.stats { + rt.stats[i] = v >> shift + } + rt.exp = exp + } + if exp < rt.exp { + shift := rt.exp - exp + for i, v := range rt.stats { + rt.stats[i] = v << shift + } + rt.exp = exp + } +} + +// Value calculates the total service value based on the given distribution, using the +// specified weight function. +func (rt ResponseTimeStats) Value(weights ResponseTimeWeights, expFactor utils.ExpirationFactor) float64 { + var v float64 + for i, s := range rt.stats { + v += float64(s) * weights[i] + } + if v < 0 { + return 0 + } + return expFactor.Value(v, rt.exp) / weightScaleFactor +} + +// AddStats adds the given ResponseTimeStats to the current one. +func (rt *ResponseTimeStats) AddStats(s *ResponseTimeStats) { + rt.setExp(s.exp) + for i, v := range s.stats { + rt.stats[i] += v + } +} + +// SubStats subtracts the given ResponseTimeStats from the current one. +func (rt *ResponseTimeStats) SubStats(s *ResponseTimeStats) { + rt.setExp(s.exp) + for i, v := range s.stats { + if v < rt.stats[i] { + rt.stats[i] -= v + } else { + rt.stats[i] = 0 + } + } +} + +// Timeout suggests a timeout value based on the previous distribution. The parameter +// is the desired rate of timeouts assuming a similar distribution in the future. +// Note that the actual timeout should have a sensible minimum bound so that operating +// under ideal working conditions for a long time (for example, using a local server +// with very low response times) will not make it very hard for the system to accommodate +// longer response times in the future. +func (rt ResponseTimeStats) Timeout(failRatio float64) time.Duration { + var sum uint64 + for _, v := range rt.stats { + sum += v + } + s := uint64(float64(sum) * failRatio) + i := timeStatLength - 1 + for i > 0 && s >= rt.stats[i] { + s -= rt.stats[i] + i-- + } + r := float64(i) + 0.5 + if rt.stats[i] > 0 { + r -= float64(s) / float64(rt.stats[i]) + } + if r < 0 { + r = 0 + } + th := StatScaleToTime(r) + if th > maxResponseTime { + th = maxResponseTime + } + return th +} + +// RtDistribution represents a distribution as a series of (X, Y) chart coordinates, +// where the X axis is the response time in seconds while the Y axis is the amount of +// service value received with a response time close to the X coordinate. +type RtDistribution [timeStatLength][2]float64 + +// Distribution returns a RtDistribution, optionally normalized to a sum of 1. +func (rt ResponseTimeStats) Distribution(normalized bool, expFactor utils.ExpirationFactor) (res RtDistribution) { + var mul float64 + if normalized { + var sum uint64 + for _, v := range rt.stats { + sum += v + } + if sum > 0 { + mul = 1 / float64(sum) + } + } else { + mul = expFactor.Value(float64(1)/weightScaleFactor, rt.exp) + } + for i, v := range rt.stats { + res[i][0] = float64(StatScaleToTime(float64(i))) / float64(time.Second) + res[i][1] = float64(v) * mul + } + return +} diff --git a/les/lespay/client/timestats_test.go b/les/lespay/client/timestats_test.go new file mode 100644 index 0000000000..a28460171e --- /dev/null +++ b/les/lespay/client/timestats_test.go @@ -0,0 +1,137 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/les/utils" +) + +func TestTransition(t *testing.T) { + var epsilon = 0.01 + var cases = []time.Duration{ + time.Millisecond, minResponseTime, + time.Second, time.Second * 5, maxResponseTime, + } + for _, c := range cases { + got := StatScaleToTime(TimeToStatScale(c)) + if float64(got)*(1+epsilon) < float64(c) || float64(got)*(1-epsilon) > float64(c) { + t.Fatalf("Failed to transition back") + } + } + // If the time is too large(exceeds the max response time. + got := StatScaleToTime(TimeToStatScale(2 * maxResponseTime)) + if float64(got)*(1+epsilon) < float64(maxResponseTime) || float64(got)*(1-epsilon) > float64(maxResponseTime) { + t.Fatalf("Failed to transition back") + } +} + +var maxResponseWeights = TimeoutWeights(maxResponseTime) + +func TestValue(t *testing.T) { + noexp := utils.ExpirationFactor{Factor: 1} + for i := 0; i < 1000; i++ { + max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime))) + min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime))) + timeout := max/2 + time.Duration(rand.Int63n(int64(maxResponseTime-max/2))) + s := makeRangeStats(min, max, 1000, noexp) + value := s.Value(TimeoutWeights(timeout), noexp) + // calculate the average weight (the average of the given range of the half cosine + // weight function). + minx := math.Pi / 2 * float64(min) / float64(timeout) + maxx := math.Pi / 2 * float64(max) / float64(timeout) + avgWeight := (math.Sin(maxx) - math.Sin(minx)) / (maxx - minx) + expv := 1000 * avgWeight + if expv < 0 { + expv = 0 + } + if value < expv-10 || value > expv+10 { + t.Errorf("Value failed (expected %v, got %v)", expv, value) + } + } +} + +func TestAddSubExpire(t *testing.T) { + var ( + sum1, sum2 ResponseTimeStats + sum1ValueExp, sum2ValueExp float64 + logOffset utils.Fixed64 + ) + for i := 0; i < 1000; i++ { + exp := utils.ExpFactor(logOffset) + max := minResponseTime + time.Duration(rand.Int63n(int64(maxResponseTime-minResponseTime))) + min := minResponseTime + time.Duration(rand.Int63n(int64(max-minResponseTime))) + s := makeRangeStats(min, max, 1000, exp) + value := s.Value(maxResponseWeights, exp) + sum1.AddStats(&s) + sum1ValueExp += value + if rand.Intn(2) == 1 { + sum2.AddStats(&s) + sum2ValueExp += value + } + logOffset += utils.Float64ToFixed64(0.001 / math.Log(2)) + sum1ValueExp -= sum1ValueExp * 0.001 + sum2ValueExp -= sum2ValueExp * 0.001 + } + exp := utils.ExpFactor(logOffset) + sum1Value := sum1.Value(maxResponseWeights, exp) + if sum1Value < sum1ValueExp*0.99 || sum1Value > sum1ValueExp*1.01 { + t.Errorf("sum1Value failed (expected %v, got %v)", sum1ValueExp, sum1Value) + } + sum2Value := sum2.Value(maxResponseWeights, exp) + if sum2Value < sum2ValueExp*0.99 || sum2Value > sum2ValueExp*1.01 { + t.Errorf("sum2Value failed (expected %v, got %v)", sum2ValueExp, sum2Value) + } + diff := sum1 + diff.SubStats(&sum2) + diffValue := diff.Value(maxResponseWeights, exp) + diffValueExp := sum1ValueExp - sum2ValueExp + if diffValue < diffValueExp*0.99 || diffValue > diffValueExp*1.01 { + t.Errorf("diffValue failed (expected %v, got %v)", diffValueExp, diffValue) + } +} + +func TestTimeout(t *testing.T) { + testTimeoutRange(t, 0, time.Second) + testTimeoutRange(t, time.Second, time.Second*2) + testTimeoutRange(t, time.Second, maxResponseTime) +} + +func testTimeoutRange(t *testing.T, min, max time.Duration) { + s := makeRangeStats(min, max, 1000, utils.ExpirationFactor{Factor: 1}) + for i := 2; i < 9; i++ { + to := s.Timeout(float64(i) / 10) + exp := max - (max-min)*time.Duration(i)/10 + tol := (max - min) / 50 + if to < exp-tol || to > exp+tol { + t.Errorf("Timeout failed (expected %v, got %v)", exp, to) + } + } +} + +func makeRangeStats(min, max time.Duration, amount float64, exp utils.ExpirationFactor) ResponseTimeStats { + var s ResponseTimeStats + amount /= 1000 + for i := 0; i < 1000; i++ { + s.Add(min+(max-min)*time.Duration(i)/999, amount, exp) + } + return s +} diff --git a/les/lespay/client/valuetracker.go b/les/lespay/client/valuetracker.go new file mode 100644 index 0000000000..92bfd694ed --- /dev/null +++ b/les/lespay/client/valuetracker.go @@ -0,0 +1,515 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "bytes" + "fmt" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/utils" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + vtVersion = 1 // database encoding format for ValueTracker + nvtVersion = 1 // database encoding format for NodeValueTracker +) + +var ( + vtKey = []byte("vt:") + vtNodeKey = []byte("vtNode:") +) + +// NodeValueTracker collects service value statistics for a specific server node +type NodeValueTracker struct { + lock sync.Mutex + + rtStats, lastRtStats ResponseTimeStats + lastTransfer mclock.AbsTime + basket serverBasket + reqCosts []uint64 + reqValues *[]float64 +} + +// init initializes a NodeValueTracker. +// Note that the contents of the referenced reqValues slice will not change; a new +// reference is passed if the values are updated by ValueTracker. +func (nv *NodeValueTracker) init(now mclock.AbsTime, reqValues *[]float64) { + reqTypeCount := len(*reqValues) + nv.reqCosts = make([]uint64, reqTypeCount) + nv.lastTransfer = now + nv.reqValues = reqValues + nv.basket.init(reqTypeCount) +} + +// updateCosts updates the request cost table of the server. The request value factor +// is also updated based on the given cost table and the current reference basket. +// Note that the contents of the referenced reqValues slice will not change; a new +// reference is passed if the values are updated by ValueTracker. +func (nv *NodeValueTracker) updateCosts(reqCosts []uint64, reqValues *[]float64, rvFactor float64) { + nv.lock.Lock() + defer nv.lock.Unlock() + + nv.reqCosts = reqCosts + nv.reqValues = reqValues + nv.basket.updateRvFactor(rvFactor) +} + +// transferStats returns request basket and response time statistics that should be +// added to the global statistics. The contents of the server's own request basket are +// gradually transferred to the main reference basket and removed from the server basket +// with the specified transfer rate. +// The response time statistics are retained at both places and therefore the global +// distribution is always the sum of the individual server distributions. +func (nv *NodeValueTracker) transferStats(now mclock.AbsTime, transferRate float64) (requestBasket, ResponseTimeStats) { + nv.lock.Lock() + defer nv.lock.Unlock() + + dt := now - nv.lastTransfer + nv.lastTransfer = now + if dt < 0 { + dt = 0 + } + recentRtStats := nv.rtStats + recentRtStats.SubStats(&nv.lastRtStats) + nv.lastRtStats = nv.rtStats + return nv.basket.transfer(-math.Expm1(-transferRate * float64(dt))), recentRtStats +} + +// RtStats returns the node's own response time distribution statistics +func (nv *NodeValueTracker) RtStats() ResponseTimeStats { + nv.lock.Lock() + defer nv.lock.Unlock() + + return nv.rtStats +} + +// ValueTracker coordinates service value calculation for individual servers and updates +// global statistics +type ValueTracker struct { + clock mclock.Clock + lock sync.Mutex + quit chan chan struct{} + db ethdb.KeyValueStore + connected map[enode.ID]*NodeValueTracker + reqTypeCount int + + refBasket referenceBasket + mappings [][]string + currentMapping int + initRefBasket requestBasket + rtStats ResponseTimeStats + + transferRate float64 + statsExpLock sync.RWMutex + statsExpRate, offlineExpRate float64 + statsExpirer utils.Expirer + statsExpFactor utils.ExpirationFactor +} + +type valueTrackerEncV1 struct { + Mappings [][]string + RefBasketMapping uint + RefBasket requestBasket + RtStats ResponseTimeStats + ExpOffset, SavedAt uint64 +} + +type nodeValueTrackerEncV1 struct { + RtStats ResponseTimeStats + ServerBasketMapping uint + ServerBasket requestBasket +} + +// RequestInfo is an initializer structure for the service vector. +type RequestInfo struct { + // Name identifies the request type and is used for re-mapping the service vector if necessary + Name string + // InitAmount and InitValue are used to initialize the reference basket + InitAmount, InitValue float64 +} + +// NewValueTracker creates a new ValueTracker and loads its previously saved state from +// the database if possible. +func NewValueTracker(db ethdb.KeyValueStore, clock mclock.Clock, reqInfo []RequestInfo, updatePeriod time.Duration, transferRate, statsExpRate, offlineExpRate float64) *ValueTracker { + now := clock.Now() + + initRefBasket := requestBasket{items: make([]basketItem, len(reqInfo))} + mapping := make([]string, len(reqInfo)) + + var sumAmount, sumValue float64 + for _, req := range reqInfo { + sumAmount += req.InitAmount + sumValue += req.InitAmount * req.InitValue + } + scaleValues := sumAmount * basketFactor / sumValue + for i, req := range reqInfo { + mapping[i] = req.Name + initRefBasket.items[i].amount = uint64(req.InitAmount * basketFactor) + initRefBasket.items[i].value = uint64(req.InitAmount * req.InitValue * scaleValues) + } + + vt := &ValueTracker{ + clock: clock, + connected: make(map[enode.ID]*NodeValueTracker), + quit: make(chan chan struct{}), + db: db, + reqTypeCount: len(initRefBasket.items), + initRefBasket: initRefBasket, + transferRate: transferRate, + statsExpRate: statsExpRate, + offlineExpRate: offlineExpRate, + } + if vt.loadFromDb(mapping) != nil { + // previous state not saved or invalid, init with default values + vt.refBasket.basket = initRefBasket + vt.mappings = [][]string{mapping} + vt.currentMapping = 0 + } + vt.statsExpirer.SetRate(now, statsExpRate) + vt.refBasket.init(vt.reqTypeCount) + vt.periodicUpdate() + + go func() { + for { + select { + case <-clock.After(updatePeriod): + vt.lock.Lock() + vt.periodicUpdate() + vt.lock.Unlock() + case quit := <-vt.quit: + close(quit) + return + } + } + }() + return vt +} + +// StatsExpirer returns the statistics expirer so that other values can be expired +// with the same rate as the service value statistics. +func (vt *ValueTracker) StatsExpirer() *utils.Expirer { + return &vt.statsExpirer +} + +// loadFromDb loads the value tracker's state from the database and converts saved +// request basket index mapping if it does not match the specified index to name mapping. +func (vt *ValueTracker) loadFromDb(mapping []string) error { + enc, err := vt.db.Get(vtKey) + if err != nil { + return err + } + r := bytes.NewReader(enc) + var version uint + if err := rlp.Decode(r, &version); err != nil { + log.Error("Decoding value tracker state failed", "err", err) + return err + } + if version != vtVersion { + log.Error("Unknown ValueTracker version", "stored", version, "current", nvtVersion) + return fmt.Errorf("Unknown ValueTracker version %d (current version is %d)", version, vtVersion) + } + var vte valueTrackerEncV1 + if err := rlp.Decode(r, &vte); err != nil { + log.Error("Decoding value tracker state failed", "err", err) + return err + } + logOffset := utils.Fixed64(vte.ExpOffset) + dt := time.Now().UnixNano() - int64(vte.SavedAt) + if dt > 0 { + logOffset += utils.Float64ToFixed64(float64(dt) * vt.offlineExpRate / math.Log(2)) + } + vt.statsExpirer.SetLogOffset(vt.clock.Now(), logOffset) + vt.rtStats = vte.RtStats + vt.mappings = vte.Mappings + vt.currentMapping = -1 +loop: + for i, m := range vt.mappings { + if len(m) != len(mapping) { + continue loop + } + for j, s := range mapping { + if m[j] != s { + continue loop + } + } + vt.currentMapping = i + break + } + if vt.currentMapping == -1 { + vt.currentMapping = len(vt.mappings) + vt.mappings = append(vt.mappings, mapping) + } + if int(vte.RefBasketMapping) == vt.currentMapping { + vt.refBasket.basket = vte.RefBasket + } else { + if vte.RefBasketMapping >= uint(len(vt.mappings)) { + log.Error("Unknown request basket mapping", "stored", vte.RefBasketMapping, "current", vt.currentMapping) + return fmt.Errorf("Unknown request basket mapping %d (current version is %d)", vte.RefBasketMapping, vt.currentMapping) + } + vt.refBasket.basket = vte.RefBasket.convertMapping(vt.mappings[vte.RefBasketMapping], mapping, vt.initRefBasket) + } + return nil +} + +// saveToDb saves the value tracker's state to the database +func (vt *ValueTracker) saveToDb() { + vte := valueTrackerEncV1{ + Mappings: vt.mappings, + RefBasketMapping: uint(vt.currentMapping), + RefBasket: vt.refBasket.basket, + RtStats: vt.rtStats, + ExpOffset: uint64(vt.statsExpirer.LogOffset(vt.clock.Now())), + SavedAt: uint64(time.Now().UnixNano()), + } + enc1, err := rlp.EncodeToBytes(uint(vtVersion)) + if err != nil { + log.Error("Encoding value tracker state failed", "err", err) + return + } + enc2, err := rlp.EncodeToBytes(&vte) + if err != nil { + log.Error("Encoding value tracker state failed", "err", err) + return + } + if err := vt.db.Put(vtKey, append(enc1, enc2...)); err != nil { + log.Error("Saving value tracker state failed", "err", err) + } +} + +// Stop saves the value tracker's state and each loaded node's individual state and +// returns after shutting the internal goroutines down. +func (vt *ValueTracker) Stop() { + quit := make(chan struct{}) + vt.quit <- quit + <-quit + vt.lock.Lock() + vt.periodicUpdate() + for id, nv := range vt.connected { + vt.saveNode(id, nv) + } + vt.connected = nil + vt.saveToDb() + vt.lock.Unlock() +} + +// Register adds a server node to the value tracker +func (vt *ValueTracker) Register(id enode.ID) *NodeValueTracker { + vt.lock.Lock() + defer vt.lock.Unlock() + + if vt.connected == nil { + // ValueTracker has already been stopped + return nil + } + nv := vt.loadOrNewNode(id) + nv.init(vt.clock.Now(), &vt.refBasket.reqValues) + vt.connected[id] = nv + return nv +} + +// Unregister removes a server node from the value tracker +func (vt *ValueTracker) Unregister(id enode.ID) { + vt.lock.Lock() + defer vt.lock.Unlock() + + if nv := vt.connected[id]; nv != nil { + vt.saveNode(id, nv) + delete(vt.connected, id) + } +} + +// GetNode returns an individual server node's value tracker. If it did not exist before +// then a new node is created. +func (vt *ValueTracker) GetNode(id enode.ID) *NodeValueTracker { + vt.lock.Lock() + defer vt.lock.Unlock() + + return vt.loadOrNewNode(id) +} + +// loadOrNewNode returns an individual server node's value tracker. If it did not exist before +// then a new node is created. +func (vt *ValueTracker) loadOrNewNode(id enode.ID) *NodeValueTracker { + if nv, ok := vt.connected[id]; ok { + return nv + } + nv := &NodeValueTracker{lastTransfer: vt.clock.Now()} + enc, err := vt.db.Get(append(vtNodeKey, id[:]...)) + if err != nil { + return nv + } + r := bytes.NewReader(enc) + var version uint + if err := rlp.Decode(r, &version); err != nil { + log.Error("Failed to decode node value tracker", "id", id, "err", err) + return nv + } + if version != nvtVersion { + log.Error("Unknown NodeValueTracker version", "stored", version, "current", nvtVersion) + return nv + } + var nve nodeValueTrackerEncV1 + if err := rlp.Decode(r, &nve); err != nil { + log.Error("Failed to decode node value tracker", "id", id, "err", err) + return nv + } + nv.rtStats = nve.RtStats + nv.lastRtStats = nve.RtStats + if int(nve.ServerBasketMapping) == vt.currentMapping { + nv.basket.basket = nve.ServerBasket + } else { + if nve.ServerBasketMapping >= uint(len(vt.mappings)) { + log.Error("Unknown request basket mapping", "stored", nve.ServerBasketMapping, "current", vt.currentMapping) + return nv + } + nv.basket.basket = nve.ServerBasket.convertMapping(vt.mappings[nve.ServerBasketMapping], vt.mappings[vt.currentMapping], vt.initRefBasket) + } + return nv +} + +// saveNode saves a server node's value tracker to the database +func (vt *ValueTracker) saveNode(id enode.ID, nv *NodeValueTracker) { + recentRtStats := nv.rtStats + recentRtStats.SubStats(&nv.lastRtStats) + vt.rtStats.AddStats(&recentRtStats) + nv.lastRtStats = nv.rtStats + + nve := nodeValueTrackerEncV1{ + RtStats: nv.rtStats, + ServerBasketMapping: uint(vt.currentMapping), + ServerBasket: nv.basket.basket, + } + enc1, err := rlp.EncodeToBytes(uint(nvtVersion)) + if err != nil { + log.Error("Failed to encode service value information", "id", id, "err", err) + return + } + enc2, err := rlp.EncodeToBytes(&nve) + if err != nil { + log.Error("Failed to encode service value information", "id", id, "err", err) + return + } + if err := vt.db.Put(append(vtNodeKey, id[:]...), append(enc1, enc2...)); err != nil { + log.Error("Failed to save service value information", "id", id, "err", err) + } +} + +// UpdateCosts updates the node value tracker's request cost table +func (vt *ValueTracker) UpdateCosts(nv *NodeValueTracker, reqCosts []uint64) { + vt.lock.Lock() + defer vt.lock.Unlock() + + nv.updateCosts(reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(reqCosts)) +} + +// RtStats returns the global response time distribution statistics +func (vt *ValueTracker) RtStats() ResponseTimeStats { + vt.lock.Lock() + defer vt.lock.Unlock() + + vt.periodicUpdate() + return vt.rtStats +} + +// periodicUpdate transfers individual node data to the global statistics, normalizes +// the reference basket and updates request values. The global state is also saved to +// the database with each update. +func (vt *ValueTracker) periodicUpdate() { + now := vt.clock.Now() + vt.statsExpLock.Lock() + vt.statsExpFactor = utils.ExpFactor(vt.statsExpirer.LogOffset(now)) + vt.statsExpLock.Unlock() + + for _, nv := range vt.connected { + basket, rtStats := nv.transferStats(now, vt.transferRate) + vt.refBasket.add(basket) + vt.rtStats.AddStats(&rtStats) + } + vt.refBasket.normalize() + vt.refBasket.updateReqValues() + for _, nv := range vt.connected { + nv.updateCosts(nv.reqCosts, &vt.refBasket.reqValues, vt.refBasket.reqValueFactor(nv.reqCosts)) + } + vt.saveToDb() +} + +type ServedRequest struct { + ReqType, Amount uint32 +} + +// Served adds a served request to the node's statistics. An actual request may be composed +// of one or more request types (service vector indices). +func (vt *ValueTracker) Served(nv *NodeValueTracker, reqs []ServedRequest, respTime time.Duration) { + vt.statsExpLock.RLock() + expFactor := vt.statsExpFactor + vt.statsExpLock.RUnlock() + + nv.lock.Lock() + defer nv.lock.Unlock() + + var value float64 + for _, r := range reqs { + nv.basket.add(r.ReqType, r.Amount, nv.reqCosts[r.ReqType]*uint64(r.Amount), expFactor) + value += (*nv.reqValues)[r.ReqType] * float64(r.Amount) + } + nv.rtStats.Add(respTime, value, vt.statsExpFactor) +} + +type RequestStatsItem struct { + Name string + ReqAmount, ReqValue float64 +} + +// RequestStats returns the current contents of the reference request basket, with +// request values meaning average per request rather than total. +func (vt *ValueTracker) RequestStats() []RequestStatsItem { + vt.statsExpLock.RLock() + expFactor := vt.statsExpFactor + vt.statsExpLock.RUnlock() + vt.lock.Lock() + defer vt.lock.Unlock() + + vt.periodicUpdate() + res := make([]RequestStatsItem, len(vt.refBasket.basket.items)) + for i, item := range vt.refBasket.basket.items { + res[i].Name = vt.mappings[vt.currentMapping][i] + res[i].ReqAmount = expFactor.Value(float64(item.amount)/basketFactor, vt.refBasket.basket.exp) + res[i].ReqValue = vt.refBasket.reqValues[i] + } + return res +} + +// TotalServiceValue returns the total service value provided by the given node (as +// a function of the weights which are calculated from the request timeout value). +func (vt *ValueTracker) TotalServiceValue(nv *NodeValueTracker, weights ResponseTimeWeights) float64 { + vt.statsExpLock.RLock() + expFactor := vt.statsExpFactor + vt.statsExpLock.RUnlock() + + nv.lock.Lock() + defer nv.lock.Unlock() + + return nv.rtStats.Value(weights, expFactor) +} diff --git a/les/lespay/client/valuetracker_test.go b/les/lespay/client/valuetracker_test.go new file mode 100644 index 0000000000..ad398749e9 --- /dev/null +++ b/les/lespay/client/valuetracker_test.go @@ -0,0 +1,135 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package client + +import ( + "math" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/p2p/enode" + + "github.com/ethereum/go-ethereum/les/utils" +) + +const ( + testReqTypes = 3 + testNodeCount = 5 + testReqCount = 10000 + testRounds = 10 +) + +func TestValueTracker(t *testing.T) { + db := memorydb.New() + clock := &mclock.Simulated{} + requestList := make([]RequestInfo, testReqTypes) + relPrices := make([]float64, testReqTypes) + totalAmount := make([]uint64, testReqTypes) + for i := range requestList { + requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1} + totalAmount[i] = 1 + relPrices[i] = rand.Float64() + 0.1 + } + nodes := make([]*NodeValueTracker, testNodeCount) + for round := 0; round < testRounds; round++ { + makeRequests := round < testRounds-2 + useExpiration := round == testRounds-1 + var expRate float64 + if useExpiration { + expRate = math.Log(2) / float64(time.Hour*100) + } + + vt := NewValueTracker(db, clock, requestList, time.Minute, 1/float64(time.Hour), expRate, expRate) + updateCosts := func(i int) { + costList := make([]uint64, testReqTypes) + baseCost := rand.Float64()*10000000 + 100000 + for j := range costList { + costList[j] = uint64(baseCost * relPrices[j]) + } + vt.UpdateCosts(nodes[i], costList) + } + for i := range nodes { + nodes[i] = vt.Register(enode.ID{byte(i)}) + updateCosts(i) + } + if makeRequests { + for i := 0; i < testReqCount; i++ { + reqType := rand.Intn(testReqTypes) + reqAmount := rand.Intn(10) + 1 + node := rand.Intn(testNodeCount) + respTime := time.Duration((rand.Float64() + 1) * float64(time.Second) * float64(node+1) / testNodeCount) + totalAmount[reqType] += uint64(reqAmount) + vt.Served(nodes[node], []ServedRequest{{uint32(reqType), uint32(reqAmount)}}, respTime) + clock.Run(time.Second) + } + } else { + clock.Run(time.Hour * 100) + if useExpiration { + for i, a := range totalAmount { + totalAmount[i] = a / 2 + } + } + } + vt.Stop() + var sumrp, sumrv float64 + for i, rp := range relPrices { + sumrp += rp + sumrv += vt.refBasket.reqValues[i] + } + for i, rp := range relPrices { + ratio := vt.refBasket.reqValues[i] * sumrp / (rp * sumrv) + if ratio < 0.99 || ratio > 1.01 { + t.Errorf("reqValues (%v) does not match relPrices (%v)", vt.refBasket.reqValues, relPrices) + break + } + } + exp := utils.ExpFactor(vt.StatsExpirer().LogOffset(clock.Now())) + basketAmount := make([]uint64, testReqTypes) + for i, bi := range vt.refBasket.basket.items { + basketAmount[i] += uint64(exp.Value(float64(bi.amount), vt.refBasket.basket.exp)) + } + if makeRequests { + // if we did not make requests in this round then we expect all amounts to be + // in the reference basket + for _, node := range nodes { + for i, bi := range node.basket.basket.items { + basketAmount[i] += uint64(exp.Value(float64(bi.amount), node.basket.basket.exp)) + } + } + } + for i, a := range basketAmount { + amount := a / basketFactor + if amount+10 < totalAmount[i] || amount > totalAmount[i]+10 { + t.Errorf("totalAmount[%d] mismatch in round %d (expected %d, got %d)", i, round, totalAmount[i], amount) + } + } + var sumValue float64 + for _, node := range nodes { + s := node.RtStats() + sumValue += s.Value(maxResponseWeights, exp) + } + s := vt.RtStats() + mainValue := s.Value(maxResponseWeights, exp) + if sumValue < mainValue-10 || sumValue > mainValue+10 { + t.Errorf("Main rtStats value does not match sum of node rtStats values in round %d (main %v, sum %v)", round, mainValue, sumValue) + } + } +} diff --git a/les/peer.go b/les/peer.go index c2f4235eb4..d14a5e0d1e 100644 --- a/les/peer.go +++ b/les/peer.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" + lpc "github.com/ethereum/go-ethereum/les/lespay/client" "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" @@ -356,8 +357,12 @@ type serverPeer struct { checkpointNumber uint64 // The block height which the checkpoint is registered. checkpoint params.TrustedCheckpoint // The advertised checkpoint sent by server. - poolEntry *poolEntry // Statistic for server peer. - fcServer *flowcontrol.ServerNode // Client side mirror token bucket. + poolEntry *poolEntry // Statistic for server peer. + fcServer *flowcontrol.ServerNode // Client side mirror token bucket. + vtLock sync.Mutex + valueTracker *lpc.ValueTracker + nodeValueTracker *lpc.NodeValueTracker + sentReqs map[uint64]sentReqEntry // Statistics errCount int // Counter the invalid responses server has replied @@ -428,62 +433,71 @@ func sendRequest(w p2p.MsgWriter, msgcode, reqID uint64, data interface{}) error return p2p.Send(w, msgcode, req{reqID, data}) } +func (p *serverPeer) sendRequest(msgcode, reqID uint64, data interface{}, amount int) error { + p.sentRequest(reqID, uint32(msgcode), uint32(amount)) + return sendRequest(p.rw, msgcode, reqID, data) +} + // requestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *serverPeer) requestHeadersByHash(reqID uint64, origin common.Hash, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) } // requestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *serverPeer) requestHeadersByNumber(reqID, origin uint64, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + return p.sendRequest(GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}, amount) } // requestBodies fetches a batch of blocks' bodies corresponding to the hashes // specified. func (p *serverPeer) requestBodies(reqID uint64, hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - return sendRequest(p.rw, GetBlockBodiesMsg, reqID, hashes) + return p.sendRequest(GetBlockBodiesMsg, reqID, hashes, len(hashes)) } // requestCode fetches a batch of arbitrary data from a node's known state // data, corresponding to the specified hashes. func (p *serverPeer) requestCode(reqID uint64, reqs []CodeReq) error { p.Log().Debug("Fetching batch of codes", "count", len(reqs)) - return sendRequest(p.rw, GetCodeMsg, reqID, reqs) + return p.sendRequest(GetCodeMsg, reqID, reqs, len(reqs)) } // requestReceipts fetches a batch of transaction receipts from a remote node. func (p *serverPeer) requestReceipts(reqID uint64, hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - return sendRequest(p.rw, GetReceiptsMsg, reqID, hashes) + return p.sendRequest(GetReceiptsMsg, reqID, hashes, len(hashes)) } // requestProofs fetches a batch of merkle proofs from a remote node. func (p *serverPeer) requestProofs(reqID uint64, reqs []ProofReq) error { p.Log().Debug("Fetching batch of proofs", "count", len(reqs)) - return sendRequest(p.rw, GetProofsV2Msg, reqID, reqs) + return p.sendRequest(GetProofsV2Msg, reqID, reqs, len(reqs)) } // requestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. func (p *serverPeer) requestHelperTrieProofs(reqID uint64, reqs []HelperTrieReq) error { p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) - return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, reqs) + return p.sendRequest(GetHelperTrieProofsMsg, reqID, reqs, len(reqs)) } // requestTxStatus fetches a batch of transaction status records from a remote node. func (p *serverPeer) requestTxStatus(reqID uint64, txHashes []common.Hash) error { p.Log().Debug("Requesting transaction status", "count", len(txHashes)) - return sendRequest(p.rw, GetTxStatusMsg, reqID, txHashes) + return p.sendRequest(GetTxStatusMsg, reqID, txHashes, len(txHashes)) } // SendTxStatus creates a reply with a batch of transactions to be added to the remote transaction pool. -func (p *serverPeer) sendTxs(reqID uint64, txs rlp.RawValue) error { - p.Log().Debug("Sending batch of transactions", "size", len(txs)) - return sendRequest(p.rw, SendTxV2Msg, reqID, txs) +func (p *serverPeer) sendTxs(reqID uint64, amount int, txs rlp.RawValue) error { + p.Log().Debug("Sending batch of transactions", "amount", amount, "size", len(txs)) + sizeFactor := (len(txs) + txSizeCostLimit/2) / txSizeCostLimit + if sizeFactor > amount { + amount = sizeFactor + } + return p.sendRequest(SendTxV2Msg, reqID, txs, amount) } // waitBefore implements distPeer interface @@ -532,6 +546,7 @@ func (p *serverPeer) getTxRelayCost(amount, size int) uint64 { // HasBlock checks if the peer has a given block func (p *serverPeer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { p.lock.RLock() + head := p.headInfo.Number var since, recent uint64 if hasState { @@ -630,6 +645,87 @@ func (p *serverPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge }) } +// setValueTracker sets the value tracker references for connected servers. Note that the +// references should be removed upon disconnection by setValueTracker(nil, nil). +func (p *serverPeer) setValueTracker(vt *lpc.ValueTracker, nvt *lpc.NodeValueTracker) { + p.vtLock.Lock() + p.valueTracker = vt + p.nodeValueTracker = nvt + if nvt != nil { + p.sentReqs = make(map[uint64]sentReqEntry) + } else { + p.sentReqs = nil + } + p.vtLock.Unlock() +} + +// updateVtParams updates the server's price table in the value tracker. +func (p *serverPeer) updateVtParams() { + p.vtLock.Lock() + defer p.vtLock.Unlock() + + if p.nodeValueTracker == nil { + return + } + reqCosts := make([]uint64, len(requestList)) + for code, costs := range p.fcCosts { + if m, ok := requestMapping[uint32(code)]; ok { + reqCosts[m.first] = costs.baseCost + costs.reqCost + if m.rest != -1 { + reqCosts[m.rest] = costs.reqCost + } + } + } + p.valueTracker.UpdateCosts(p.nodeValueTracker, reqCosts) +} + +// sentReqEntry remembers sent requests and their sending times +type sentReqEntry struct { + reqType, amount uint32 + at mclock.AbsTime +} + +// sentRequest marks a request sent at the current moment to this server. +func (p *serverPeer) sentRequest(id uint64, reqType, amount uint32) { + p.vtLock.Lock() + if p.sentReqs != nil { + p.sentReqs[id] = sentReqEntry{reqType, amount, mclock.Now()} + } + p.vtLock.Unlock() +} + +// answeredRequest marks a request answered at the current moment by this server. +func (p *serverPeer) answeredRequest(id uint64) { + p.vtLock.Lock() + if p.sentReqs == nil { + p.vtLock.Unlock() + return + } + e, ok := p.sentReqs[id] + delete(p.sentReqs, id) + vt := p.valueTracker + nvt := p.nodeValueTracker + p.vtLock.Unlock() + if !ok { + return + } + var ( + vtReqs [2]lpc.ServedRequest + reqCount int + ) + m := requestMapping[e.reqType] + if m.rest == -1 || e.amount <= 1 { + reqCount = 1 + vtReqs[0] = lpc.ServedRequest{ReqType: uint32(m.first), Amount: e.amount} + } else { + reqCount = 2 + vtReqs[0] = lpc.ServedRequest{ReqType: uint32(m.first), Amount: 1} + vtReqs[1] = lpc.ServedRequest{ReqType: uint32(m.rest), Amount: e.amount - 1} + } + dt := time.Duration(mclock.Now() - e.at) + vt.Served(nvt, vtReqs[:reqCount], dt) +} + // clientPeer represents each node to which the les server is connected. // The node here refers to the light client. type clientPeer struct { diff --git a/les/protocol.go b/les/protocol.go index 36af88aea6..f8ad94a7b5 100644 --- a/les/protocol.go +++ b/les/protocol.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + lpc "github.com/ethereum/go-ethereum/les/lespay/client" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rlp" ) @@ -77,19 +78,59 @@ const ( ) type requestInfo struct { - name string - maxCount uint64 + name string + maxCount uint64 + refBasketFirst, refBasketRest float64 } -var requests = map[uint64]requestInfo{ - GetBlockHeadersMsg: {"GetBlockHeaders", MaxHeaderFetch}, - GetBlockBodiesMsg: {"GetBlockBodies", MaxBodyFetch}, - GetReceiptsMsg: {"GetReceipts", MaxReceiptFetch}, - GetCodeMsg: {"GetCode", MaxCodeFetch}, - GetProofsV2Msg: {"GetProofsV2", MaxProofsFetch}, - GetHelperTrieProofsMsg: {"GetHelperTrieProofs", MaxHelperTrieProofsFetch}, - SendTxV2Msg: {"SendTxV2", MaxTxSend}, - GetTxStatusMsg: {"GetTxStatus", MaxTxStatus}, +// reqMapping maps an LES request to one or two lespay service vector entries. +// If rest != -1 and the request type is used with amounts larger than one then the +// first one of the multi-request is mapped to first while the rest is mapped to rest. +type reqMapping struct { + first, rest int +} + +var ( + // requests describes the available LES request types and their initializing amounts + // in the lespay/client.ValueTracker reference basket. Initial values are estimates + // based on the same values as the server's default cost estimates (reqAvgTimeCost). + requests = map[uint64]requestInfo{ + GetBlockHeadersMsg: {"GetBlockHeaders", MaxHeaderFetch, 10, 1000}, + GetBlockBodiesMsg: {"GetBlockBodies", MaxBodyFetch, 1, 0}, + GetReceiptsMsg: {"GetReceipts", MaxReceiptFetch, 1, 0}, + GetCodeMsg: {"GetCode", MaxCodeFetch, 1, 0}, + GetProofsV2Msg: {"GetProofsV2", MaxProofsFetch, 10, 0}, + GetHelperTrieProofsMsg: {"GetHelperTrieProofs", MaxHelperTrieProofsFetch, 10, 100}, + SendTxV2Msg: {"SendTxV2", MaxTxSend, 1, 0}, + GetTxStatusMsg: {"GetTxStatus", MaxTxStatus, 10, 0}, + } + requestList []lpc.RequestInfo + requestMapping map[uint32]reqMapping +) + +// init creates a request list and mapping between protocol message codes and lespay +// service vector indices. +func init() { + requestMapping = make(map[uint32]reqMapping) + for code, req := range requests { + cost := reqAvgTimeCost[code] + rm := reqMapping{len(requestList), -1} + requestList = append(requestList, lpc.RequestInfo{ + Name: req.name + ".first", + InitAmount: req.refBasketFirst, + InitValue: float64(cost.baseCost + cost.reqCost), + }) + if req.refBasketRest != 0 { + rm.rest = len(requestList) + requestList = append(requestList, lpc.RequestInfo{ + Name: req.name + ".rest", + InitAmount: req.refBasketRest, + InitValue: float64(cost.reqCost), + }) + } + requestMapping[uint32(code)] = rm + } + } type errCode int diff --git a/les/txrelay.go b/les/txrelay.go index 595c4d5808..4f6c15025e 100644 --- a/les/txrelay.go +++ b/les/txrelay.go @@ -144,7 +144,7 @@ func (ltrx *lesTxRelay) send(txs types.Transactions, count int) { peer := dp.(*serverPeer) cost := peer.getTxRelayCost(len(ll), len(enc)) peer.fcServer.QueuedRequest(reqID, cost) - return func() { peer.sendTxs(reqID, enc) } + return func() { peer.sendTxs(reqID, len(ll), enc) } }, } go ltrx.retriever.retrieve(context.Background(), reqID, rq, func(p distPeer, msg *Msg) error { return nil }, ltrx.stop) diff --git a/les/utils/expiredvalue.go b/les/utils/expiredvalue.go new file mode 100644 index 0000000000..2d6e5d4bb8 --- /dev/null +++ b/les/utils/expiredvalue.go @@ -0,0 +1,202 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package utils + +import ( + "math" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +// ExpiredValue is a scalar value that is continuously expired (decreased +// exponentially) based on the provided logarithmic expiration offset value. +// +// The formula for value calculation is: base*2^(exp-logOffset). In order to +// simplify the calculation of ExpiredValue, its value is expressed in the form +// of an exponent with a base of 2. +// +// Also here is a trick to reduce a lot of calculations. In theory, when a value X +// decays over time and then a new value Y is added, the final result should be +// X*2^(exp-logOffset)+Y. However it's very hard to represent in memory. +// So the trick is using the idea of inflation instead of exponential decay. At this +// moment the temporary value becomes: X*2^exp+Y*2^logOffset_1, apply the exponential +// decay when we actually want to calculate the value. +// +// e.g. +// t0: V = 100 +// t1: add 30, inflationary value is: 100 + 30/0.3, 0.3 is the decay coefficient +// t2: get value, decay coefficient is 0.2 now, final result is: 200*0.2 = 40 +type ExpiredValue struct { + Base, Exp uint64 // rlp encoding works by default +} + +// ExpirationFactor is calculated from logOffset. 1 <= Factor < 2 and Factor*2^Exp +// describes the multiplier applicable for additions and the divider for readouts. +// If logOffset changes slowly then it saves some expensive operations to not calculate +// them for each addition and readout but cache this intermediate form for some time. +// It is also useful for structures where multiple values are expired with the same +// Expirer. +type ExpirationFactor struct { + Exp uint64 + Factor float64 +} + +// ExpFactor calculates ExpirationFactor based on logOffset +func ExpFactor(logOffset Fixed64) ExpirationFactor { + return ExpirationFactor{Exp: logOffset.ToUint64(), Factor: logOffset.Fraction().Pow2()} +} + +// Value calculates the expired value based on a floating point base and integer +// power-of-2 exponent. This function should be used by multi-value expired structures. +func (e ExpirationFactor) Value(base float64, exp uint64) float64 { + res := base / e.Factor + if exp > e.Exp { + res *= float64(uint64(1) << (exp - e.Exp)) + } + if exp < e.Exp { + res /= float64(uint64(1) << (e.Exp - exp)) + } + return res +} + +// value calculates the value at the given moment. +func (e ExpiredValue) Value(logOffset Fixed64) uint64 { + offset := Uint64ToFixed64(e.Exp) - logOffset + return uint64(float64(e.Base) * offset.Pow2()) +} + +// add adds a signed value at the given moment +func (e *ExpiredValue) Add(amount int64, logOffset Fixed64) int64 { + integer, frac := logOffset.ToUint64(), logOffset.Fraction() + factor := frac.Pow2() + base := factor * float64(amount) + if integer < e.Exp { + base /= math.Pow(2, float64(e.Exp-integer)) + } + if integer > e.Exp { + e.Base >>= (integer - e.Exp) + e.Exp = integer + } + if base >= 0 || uint64(-base) <= e.Base { + e.Base += uint64(base) + return amount + } + net := int64(-float64(e.Base) / factor) + e.Base = 0 + return net +} + +// addExp adds another ExpiredValue +func (e *ExpiredValue) AddExp(a ExpiredValue) { + if e.Exp > a.Exp { + a.Base >>= (e.Exp - a.Exp) + } + if e.Exp < a.Exp { + e.Base >>= (a.Exp - e.Exp) + e.Exp = a.Exp + } + e.Base += a.Base +} + +// subExp subtracts another ExpiredValue +func (e *ExpiredValue) SubExp(a ExpiredValue) { + if e.Exp > a.Exp { + a.Base >>= (e.Exp - a.Exp) + } + if e.Exp < a.Exp { + e.Base >>= (a.Exp - e.Exp) + e.Exp = a.Exp + } + if e.Base > a.Base { + e.Base -= a.Base + } else { + e.Base = 0 + } +} + +// Expirer changes logOffset with a linear rate which can be changed during operation. +// It is not thread safe, if access by multiple goroutines is needed then it should be +// encapsulated into a locked structure. +// Note that if neither SetRate nor SetLogOffset are used during operation then LogOffset +// is thread safe. +type Expirer struct { + logOffset Fixed64 + rate float64 + lastUpdate mclock.AbsTime +} + +// SetRate changes the expiration rate which is the inverse of the time constant in +// nanoseconds. +func (e *Expirer) SetRate(now mclock.AbsTime, rate float64) { + dt := now - e.lastUpdate + if dt > 0 { + e.logOffset += Fixed64(logToFixedFactor * float64(dt) * e.rate) + } + e.lastUpdate = now + e.rate = rate +} + +// SetLogOffset sets logOffset instantly. +func (e *Expirer) SetLogOffset(now mclock.AbsTime, logOffset Fixed64) { + e.lastUpdate = now + e.logOffset = logOffset +} + +// LogOffset returns the current logarithmic offset. +func (e *Expirer) LogOffset(now mclock.AbsTime) Fixed64 { + dt := now - e.lastUpdate + if dt <= 0 { + return e.logOffset + } + return e.logOffset + Fixed64(logToFixedFactor*float64(dt)*e.rate) +} + +// fixedFactor is the fixed point multiplier factor used by Fixed64. +const fixedFactor = 0x1000000 + +// Fixed64 implements 64-bit fixed point arithmetic functions. +type Fixed64 int64 + +// Uint64ToFixed64 converts uint64 integer to Fixed64 format. +func Uint64ToFixed64(f uint64) Fixed64 { + return Fixed64(f * fixedFactor) +} + +// float64ToFixed64 converts float64 to Fixed64 format. +func Float64ToFixed64(f float64) Fixed64 { + return Fixed64(f * fixedFactor) +} + +// toUint64 converts Fixed64 format to uint64. +func (f64 Fixed64) ToUint64() uint64 { + return uint64(f64) / fixedFactor +} + +// fraction returns the fractional part of a Fixed64 value. +func (f64 Fixed64) Fraction() Fixed64 { + return f64 % fixedFactor +} + +var ( + logToFixedFactor = float64(fixedFactor) / math.Log(2) + fixedToLogFactor = math.Log(2) / float64(fixedFactor) +) + +// pow2Fixed returns the base 2 power of the fixed point value. +func (f64 Fixed64) Pow2() float64 { + return math.Exp(float64(f64) * fixedToLogFactor) +} diff --git a/les/utils/expiredvalue_test.go b/les/utils/expiredvalue_test.go new file mode 100644 index 0000000000..4894a12b12 --- /dev/null +++ b/les/utils/expiredvalue_test.go @@ -0,0 +1,116 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package utils + +import "testing" + +func TestValueExpiration(t *testing.T) { + var cases = []struct { + input ExpiredValue + timeOffset Fixed64 + expect uint64 + }{ + {ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 128}, + {ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 64}, + {ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(2), 32}, + {ExpiredValue{Base: 128, Exp: 2}, Uint64ToFixed64(2), 128}, + {ExpiredValue{Base: 128, Exp: 2}, Uint64ToFixed64(3), 64}, + } + for _, c := range cases { + if got := c.input.Value(c.timeOffset); got != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got) + } + } +} + +func TestValueAddition(t *testing.T) { + var cases = []struct { + input ExpiredValue + addend int64 + timeOffset Fixed64 + expect uint64 + expectNet int64 + }{ + // Addition + {ExpiredValue{Base: 128, Exp: 0}, 128, Uint64ToFixed64(0), 256, 128}, + {ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(0), 640, 128}, + + // Addition with offset + {ExpiredValue{Base: 128, Exp: 0}, 128, Uint64ToFixed64(1), 192, 128}, + {ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(1), 384, 128}, + {ExpiredValue{Base: 128, Exp: 2}, 128, Uint64ToFixed64(3), 192, 128}, + + // Subtraction + {ExpiredValue{Base: 128, Exp: 0}, -64, Uint64ToFixed64(0), 64, -64}, + {ExpiredValue{Base: 128, Exp: 0}, -128, Uint64ToFixed64(0), 0, -128}, + {ExpiredValue{Base: 128, Exp: 0}, -192, Uint64ToFixed64(0), 0, -128}, + + // Subtraction with offset + {ExpiredValue{Base: 128, Exp: 0}, -64, Uint64ToFixed64(1), 0, -64}, + {ExpiredValue{Base: 128, Exp: 0}, -128, Uint64ToFixed64(1), 0, -64}, + {ExpiredValue{Base: 128, Exp: 2}, -128, Uint64ToFixed64(1), 128, -128}, + {ExpiredValue{Base: 128, Exp: 2}, -128, Uint64ToFixed64(2), 0, -128}, + } + for _, c := range cases { + if net := c.input.Add(c.addend, c.timeOffset); net != c.expectNet { + t.Fatalf("Net amount mismatch, want=%d, got=%d", c.expectNet, net) + } + if got := c.input.Value(c.timeOffset); got != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got) + } + } +} + +func TestExpiredValueAddition(t *testing.T) { + var cases = []struct { + input ExpiredValue + another ExpiredValue + timeOffset Fixed64 + expect uint64 + }{ + {ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 256}, + {ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 384}, + {ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 1}, Uint64ToFixed64(0), 384}, + {ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 128}, + } + for _, c := range cases { + c.input.AddExp(c.another) + if got := c.input.Value(c.timeOffset); got != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got) + } + } +} + +func TestExpiredValueSubtraction(t *testing.T) { + var cases = []struct { + input ExpiredValue + another ExpiredValue + timeOffset Fixed64 + expect uint64 + }{ + {ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 0}, + {ExpiredValue{Base: 128, Exp: 0}, ExpiredValue{Base: 128, Exp: 1}, Uint64ToFixed64(0), 0}, + {ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(0), 128}, + {ExpiredValue{Base: 128, Exp: 1}, ExpiredValue{Base: 128, Exp: 0}, Uint64ToFixed64(1), 64}, + } + for _, c := range cases { + c.input.SubExp(c.another) + if got := c.input.Value(c.timeOffset); got != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, got) + } + } +} diff --git a/p2p/enode/node.go b/p2p/enode/node.go index 9eb2544ffe..3f6cda6d4a 100644 --- a/p2p/enode/node.go +++ b/p2p/enode/node.go @@ -217,7 +217,7 @@ func (n ID) MarshalText() ([]byte, error) { // UnmarshalText implements the encoding.TextUnmarshaler interface. func (n *ID) UnmarshalText(text []byte) error { - id, err := parseID(string(text)) + id, err := ParseID(string(text)) if err != nil { return err } @@ -229,14 +229,14 @@ func (n *ID) UnmarshalText(text []byte) error { // The string may be prefixed with 0x. // It panics if the string is not a valid ID. func HexID(in string) ID { - id, err := parseID(in) + id, err := ParseID(in) if err != nil { panic(err) } return id } -func parseID(in string) (ID, error) { +func ParseID(in string) (ID, error) { var id ID b, err := hex.DecodeString(strings.TrimPrefix(in, "0x")) if err != nil {