diff --git a/beacon/light/request/scheduler.go b/beacon/light/request/scheduler.go
new file mode 100644
index 0000000000..7f16d77d1e
--- /dev/null
+++ b/beacon/light/request/scheduler.go
@@ -0,0 +1,401 @@
+// Copyright 2023 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package request
+
+import (
+ "sync"
+
+ "github.com/ethereum/go-ethereum/log"
+)
+
+// Module represents a mechanism which is typically responsible for downloading
+// and updating a passive data structure. It does not directly interact with the
+// servers. It can start requests using the Requester interface, maintain its
+// internal state by receiving and processing Events and update its target data
+// structure based on the obtained data.
+// It is the Scheduler's responsibility to feed events to the modules, call
+// Process as long as there might be something to process and then generate request
+// candidates using MakeRequest and start the best possible requests.
+// Modules are called by Scheduler whenever a global trigger is fired. All events
+// fire the trigger. Changing a target data structure also triggers a next
+// processing round as it could make further actions possible either by the same
+// or another Module.
+type Module interface {
+ // Process is a non-blocking function responsible for starting requests,
+ // processing events and updating the target data structures(s) and the
+ // internal state of the module. Module state typically consists of information
+ // about pending requests and registered servers.
+ // Process is always called after an event is received or after a target data
+ // structure has been changed.
+ //
+ // Note: Process functions of different modules are never called concurrently;
+ // they are called by Scheduler in the same order of priority as they were
+ // registered in.
+ Process(Requester, []Event)
+}
+
+// Requester allows Modules to obtain the list of momentarily available servers,
+// start new requests and report server failure when a response has been proven
+// to be invalid in the processing phase.
+// Note that all Requester functions should be safe to call from Module.Process.
+type Requester interface {
+ CanSendTo() []Server
+ Send(Server, Request) ID
+ Fail(Server, string)
+}
+
+// Scheduler is a modular network data retrieval framework that coordinates multiple
+// servers and retrieval mechanisms (modules). It implements a trigger mechanism
+// that calls the Process function of registered modules whenever either the state
+// of existing data structures or events coming from registered servers could
+// allow new operations.
+type Scheduler struct {
+ lock sync.Mutex
+ modules []Module // first has highest priority
+ names map[Module]string
+ servers map[server]struct{}
+ targets map[targetData]uint64
+
+ requesterLock sync.RWMutex
+ serverOrder []server
+ pending map[ServerAndID]pendingRequest
+
+ // eventLock guards access to the events list. Note that eventLock can be
+ // locked either while lock is locked or unlocked but lock cannot be locked
+ // while eventLock is locked.
+ eventLock sync.Mutex
+ events []Event
+ stopCh chan chan struct{}
+
+ triggerCh chan struct{} // restarts waiting sync loop
+ // if trigger has already been fired then send to testWaitCh blocks until
+ // the triggered processing round is finished
+ testWaitCh chan struct{}
+}
+
+type (
+ // Server identifies a server without allowing any direct interaction.
+ // Note: server interface is used by Scheduler and Tracker but not used by
+ // the modules that do not interact with them directly.
+ // In order to make module testing easier, Server interface is used in
+ // events and modules.
+ Server any
+ Request any
+ Response any
+ ID uint64
+ ServerAndID struct {
+ Server Server
+ ID ID
+ }
+)
+
+// targetData represents a registered target data structure that increases its
+// ChangeCounter whenever it has been changed.
+type targetData interface {
+ ChangeCounter() uint64
+}
+
+// pendingRequest keeps track of sent and not yet finalized requests and their
+// sender modules.
+type pendingRequest struct {
+ request Request
+ module Module
+}
+
+// NewScheduler creates a new Scheduler.
+func NewScheduler() *Scheduler {
+ s := &Scheduler{
+ servers: make(map[server]struct{}),
+ names: make(map[Module]string),
+ pending: make(map[ServerAndID]pendingRequest),
+ targets: make(map[targetData]uint64),
+ stopCh: make(chan chan struct{}),
+ // Note: testWaitCh should not have capacity in order to ensure
+ // that after a trigger happens testWaitCh will block until the resulting
+ // processing round has been finished
+ triggerCh: make(chan struct{}, 1),
+ testWaitCh: make(chan struct{}),
+ }
+ return s
+}
+
+// RegisterTarget registers a target data structure, ensuring that any changes
+// made to it trigger a new round of Module.Process calls, giving a chance to
+// modules to react to the changes.
+func (s *Scheduler) RegisterTarget(t targetData) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.targets[t] = 0
+}
+
+// RegisterModule registers a module. Should be called before starting the scheduler.
+// In each processing round the order of module processing depends on the order of
+// registration.
+func (s *Scheduler) RegisterModule(m Module, name string) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.modules = append(s.modules, m)
+ s.names[m] = name
+}
+
+// RegisterServer registers a new server.
+func (s *Scheduler) RegisterServer(server server) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.addEvent(Event{Type: EvRegistered, Server: server})
+ server.subscribe(func(event Event) {
+ event.Server = server
+ s.addEvent(event)
+ })
+}
+
+// UnregisterServer removes a registered server.
+func (s *Scheduler) UnregisterServer(server server) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ server.unsubscribe()
+ s.addEvent(Event{Type: EvUnregistered, Server: server})
+}
+
+// Start starts the scheduler. It should be called after registering all modules
+// and before registering any servers.
+func (s *Scheduler) Start() {
+ go s.syncLoop()
+}
+
+// Stop stops the scheduler.
+func (s *Scheduler) Stop() {
+ stop := make(chan struct{})
+ s.stopCh <- stop
+ <-stop
+ s.lock.Lock()
+ for server := range s.servers {
+ server.unsubscribe()
+ }
+ s.servers = nil
+ s.lock.Unlock()
+}
+
+// syncLoop is the main event loop responsible for event/data processing and
+// sending new requests.
+// A round of processing starts whenever the global trigger is fired. Triggers
+// fired during a processing round ensure that there is going to be a next round.
+func (s *Scheduler) syncLoop() {
+ for {
+ s.lock.Lock()
+ s.processRound()
+ s.lock.Unlock()
+ loop:
+ for {
+ select {
+ case stop := <-s.stopCh:
+ close(stop)
+ return
+ case <-s.triggerCh:
+ break loop
+ case <-s.testWaitCh:
+ }
+ }
+ }
+}
+
+// targetChanged returns true if a registered target data structure has been
+// changed since the last call to this function.
+func (s *Scheduler) targetChanged() (changed bool) {
+ for target, counter := range s.targets {
+ if newCounter := target.ChangeCounter(); newCounter != counter {
+ s.targets[target] = newCounter
+ changed = true
+ }
+ }
+ return
+}
+
+// processRound runs an entire processing round. It calls the Process functions
+// of all modules, passing all relevant events and repeating Process calls as
+// long as any changes have been made to the registered target data structures.
+// Once all events have been processed and a stable state has been achieved,
+// requests are generated and sent if necessary and possible.
+func (s *Scheduler) processRound() {
+ for {
+ log.Debug("Processing modules")
+ filteredEvents := s.filterEvents()
+ for _, module := range s.modules {
+ log.Debug("Processing module", "name", s.names[module], "events", len(filteredEvents[module]))
+ module.Process(requester{s, module}, filteredEvents[module])
+ }
+ if !s.targetChanged() {
+ break
+ }
+ }
+}
+
+// Trigger starts a new processing round. If fired during processing, it ensures
+// another full round of processing all modules.
+func (s *Scheduler) Trigger() {
+ select {
+ case s.triggerCh <- struct{}{}:
+ default:
+ }
+}
+
+// addEvent adds an event to be processed in the next round. Note that it can be
+// called regardless of the state of the lock mutex, making it safe for use in
+// the server event callback.
+func (s *Scheduler) addEvent(event Event) {
+ s.eventLock.Lock()
+ s.events = append(s.events, event)
+ s.eventLock.Unlock()
+ s.Trigger()
+}
+
+// filterEvent sorts each Event either as a request event or a server event,
+// depending on its type. Request events are also sorted in a map based on the
+// module that originally initiated the request. It also ensures that no events
+// related to a server are returned before EvRegistered or after EvUnregistered.
+// In case of an EvUnregistered server event it also closes all pending requests
+// to the given server by adding a failed request event (EvFail), ensuring that
+// all requests get finalized and thereby allowing the module logic to be safe
+// and simple.
+func (s *Scheduler) filterEvents() map[Module][]Event {
+ s.eventLock.Lock()
+ events := s.events
+ s.events = nil
+ s.eventLock.Unlock()
+
+ s.requesterLock.Lock()
+ defer s.requesterLock.Unlock()
+
+ filteredEvents := make(map[Module][]Event)
+ for _, event := range events {
+ server := event.Server.(server)
+ if _, ok := s.servers[server]; !ok && event.Type != EvRegistered {
+ continue // before EvRegister or after EvUnregister, discard
+ }
+
+ if event.IsRequestEvent() {
+ sid, _, _ := event.RequestInfo()
+ pending, ok := s.pending[sid]
+ if !ok {
+ continue // request already closed, ignore further events
+ }
+ if event.Type == EvResponse || event.Type == EvFail {
+ delete(s.pending, sid) // final event, close pending request
+ }
+ filteredEvents[pending.module] = append(filteredEvents[pending.module], event)
+ } else {
+ switch event.Type {
+ case EvRegistered:
+ s.servers[server] = struct{}{}
+ s.serverOrder = append(s.serverOrder, nil)
+ copy(s.serverOrder[1:], s.serverOrder[:len(s.serverOrder)-1])
+ s.serverOrder[0] = server
+ case EvUnregistered:
+ s.closePending(event.Server, filteredEvents)
+ delete(s.servers, server)
+ for i, srv := range s.serverOrder {
+ if srv == server {
+ copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:])
+ s.serverOrder = s.serverOrder[:len(s.serverOrder)-1]
+ break
+ }
+ }
+ }
+ for _, module := range s.modules {
+ filteredEvents[module] = append(filteredEvents[module], event)
+ }
+ }
+ }
+ return filteredEvents
+}
+
+// closePending closes all pending requests to the given server and adds an EvFail
+// event to properly finalize them
+func (s *Scheduler) closePending(server Server, filteredEvents map[Module][]Event) {
+ for sid, pending := range s.pending {
+ if sid.Server == server {
+ filteredEvents[pending.module] = append(filteredEvents[pending.module], Event{
+ Type: EvFail,
+ Server: server,
+ Data: RequestResponse{
+ ID: sid.ID,
+ Request: pending.request,
+ },
+ })
+ delete(s.pending, sid)
+ }
+ }
+}
+
+// requester implements Requester. Note that while requester basically wraps
+// Scheduler (with the added information of the currently processed Module), all
+// functions are safe to call from Module.Process which is running while
+// the Scheduler.lock mutex is held.
+type requester struct {
+ *Scheduler
+ module Module
+}
+
+// CanSendTo returns the list of currently available servers. It also returns
+// them in an order of least to most recently used, ensuring a round-robin usage
+// of suitable servers if the module always chooses the first suitable one.
+func (s requester) CanSendTo() []Server {
+ s.requesterLock.RLock()
+ defer s.requesterLock.RUnlock()
+
+ list := make([]Server, 0, len(s.serverOrder))
+ for _, server := range s.serverOrder {
+ if server.canRequestNow() {
+ list = append(list, server)
+ }
+ }
+ return list
+}
+
+// Send sends a request and adds an entry to Scheduler.pending map, ensuring that
+// related request events will be delivered to the sender Module.
+func (s requester) Send(srv Server, req Request) ID {
+ s.requesterLock.Lock()
+ defer s.requesterLock.Unlock()
+
+ server := srv.(server)
+ id := server.sendRequest(req)
+ sid := ServerAndID{Server: srv, ID: id}
+ s.pending[sid] = pendingRequest{request: req, module: s.module}
+ for i, ss := range s.serverOrder {
+ if ss == server {
+ copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:])
+ s.serverOrder[len(s.serverOrder)-1] = server
+ return id
+ }
+ }
+ log.Error("Target server not found in ordered list of registered servers")
+ return id
+}
+
+// Fail should be called when a server delivers invalid or useless information.
+// Calling Fail disables the given server for a period that is initially short
+// but is exponentially growing if it happens frequently. This results in a
+// somewhat fault tolerant operation that avoids hammering servers with requests
+// that they cannot serve but still gives them a chance periodically.
+func (s requester) Fail(srv Server, desc string) {
+ srv.(server).fail(desc)
+}
diff --git a/beacon/light/request/scheduler_test.go b/beacon/light/request/scheduler_test.go
new file mode 100644
index 0000000000..310e33af77
--- /dev/null
+++ b/beacon/light/request/scheduler_test.go
@@ -0,0 +1,122 @@
+package request
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestEventFilter(t *testing.T) {
+ s := NewScheduler()
+ module1 := &testModule{name: "module1"}
+ module2 := &testModule{name: "module2"}
+ s.RegisterModule(module1, "module1")
+ s.RegisterModule(module2, "module2")
+ s.Start()
+ // startup process round without events
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, nil)
+ module2.expProcess(t, nil)
+ srv := &testServer{}
+ // register server; both modules should receive server event
+ s.RegisterServer(srv)
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, []Event{
+ Event{Type: EvRegistered, Server: srv},
+ })
+ module2.expProcess(t, []Event{
+ Event{Type: EvRegistered, Server: srv},
+ })
+ // let module1 send a request
+ srv.canRequest = 1
+ module1.sendReq = testRequest
+ s.Trigger()
+ // in first triggered round module1 sends the request, no events yet
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, nil)
+ module2.expProcess(t, nil)
+ // server emits EvTimeout; only module1 should receive it
+ srv.eventCb(Event{Type: EvTimeout, Data: RequestResponse{ID: 1, Request: testRequest}})
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, []Event{
+ Event{Type: EvTimeout, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
+ })
+ module2.expProcess(t, nil)
+ // unregister server; both modules should receive server event
+ s.UnregisterServer(srv)
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, []Event{
+ // module1 should also receive EvFail on its pending request
+ Event{Type: EvFail, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
+ Event{Type: EvUnregistered, Server: srv},
+ })
+ module2.expProcess(t, []Event{
+ Event{Type: EvUnregistered, Server: srv},
+ })
+ // response after server unregistered; should be discarded
+ srv.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
+ s.testWaitCh <- struct{}{}
+ module1.expProcess(t, nil)
+ module2.expProcess(t, nil)
+ // no more process rounds expected; shut down
+ s.testWaitCh <- struct{}{}
+ module1.expNoMoreProcess(t)
+ module2.expNoMoreProcess(t)
+ s.Stop()
+}
+
+type testServer struct {
+ eventCb func(Event)
+ lastID ID
+ canRequest int
+}
+
+func (s *testServer) subscribe(eventCb func(Event)) {
+ s.eventCb = eventCb
+}
+
+func (s *testServer) canRequestNow() bool {
+ return s.canRequest > 0
+}
+
+func (s *testServer) sendRequest(req Request) ID {
+ s.canRequest--
+ s.lastID++
+ return s.lastID
+}
+
+func (s *testServer) fail(string) {}
+func (s *testServer) unsubscribe() {}
+
+type testModule struct {
+ name string
+ processed [][]Event
+ sendReq Request
+}
+
+func (m *testModule) Process(requester Requester, events []Event) {
+ m.processed = append(m.processed, events)
+ if m.sendReq != nil {
+ if cs := requester.CanSendTo(); len(cs) > 0 {
+ requester.Send(cs[0], m.sendReq)
+ }
+ }
+}
+
+func (m *testModule) expProcess(t *testing.T, expEvents []Event) {
+ if len(m.processed) == 0 {
+ t.Errorf("Missing call to %s.Process", m.name)
+ return
+ }
+ events := m.processed[0]
+ m.processed = m.processed[1:]
+ if !reflect.DeepEqual(events, expEvents) {
+ t.Errorf("Call to %s.Process with wrong events (expected %v, got %v)", m.name, expEvents, events)
+ }
+}
+
+func (m *testModule) expNoMoreProcess(t *testing.T) {
+ for len(m.processed) > 0 {
+ t.Errorf("Unexpected call to %s.Process with events %v", m.name, m.processed[0])
+ m.processed = m.processed[1:]
+ }
+}
diff --git a/beacon/light/request/server.go b/beacon/light/request/server.go
new file mode 100644
index 0000000000..999f64178a
--- /dev/null
+++ b/beacon/light/request/server.go
@@ -0,0 +1,439 @@
+// Copyright 2023 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package request
+
+import (
+ "math"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+var (
+ // request events
+ EvResponse = &EventType{Name: "response", requestEvent: true} // data: RequestResponse; sent by requestServer
+ EvFail = &EventType{Name: "fail", requestEvent: true} // data: RequestResponse; sent by requestServer
+ EvTimeout = &EventType{Name: "timeout", requestEvent: true} // data: RequestResponse; sent by serverWithTimeout
+ // server events
+ EvRegistered = &EventType{Name: "registered"} // data: nil; sent by Scheduler
+ EvUnregistered = &EventType{Name: "unregistered"} // data: nil; sent by Scheduler
+ EvCanRequestAgain = &EventType{Name: "canRequestAgain"} // data: nil; sent by serverWithLimits
+)
+
+const (
+ softRequestTimeout = time.Second // allow resending request to a different server but do not cancel yet
+ hardRequestTimeout = time.Second * 10 // cancel request
+)
+
+const (
+ // serverWithLimits parameters
+ parallelAdjustUp = 0.1 // adjust parallelLimit up in case of success under full load
+ parallelAdjustDown = 1 // adjust parallelLimit down in case of timeout/failure
+ minParallelLimit = 1 // parallelLimit lower bound
+ defaultParallelLimit = 3 // parallelLimit initial value
+ minFailureDelay = time.Millisecond * 100 // minimum disable time in case of request failure
+ maxFailureDelay = time.Minute // maximum disable time in case of request failure
+ maxServerEventBuffer = 5 // server event allowance buffer limit
+ maxServerEventRate = time.Second // server event allowance buffer recharge rate
+)
+
+// requestServer can send requests in a non-blocking way and feed back events
+// through the event callback. After each request it should send back either
+// EvResponse or EvFail. Additionally, it may also send application-defined
+// events that the Modules can interpret.
+type requestServer interface {
+ Subscribe(eventCallback func(Event))
+ SendRequest(ID, Request)
+ Unsubscribe()
+}
+
+// server is implemented by a requestServer wrapped into serverWithTimeout and
+// serverWithLimits and is used by Scheduler.
+// In addition to requestServer functionality, server can also handle timeouts,
+// limit the number of parallel in-flight requests and temporarily disable
+// new requests based on timeouts and response failures.
+type server interface {
+ subscribe(eventCallback func(Event))
+ canRequestNow() bool
+ sendRequest(Request) ID
+ fail(string)
+ unsubscribe()
+}
+
+// NewServer wraps a requestServer and returns a server
+func NewServer(rs requestServer, clock mclock.Clock) server {
+ s := &serverWithLimits{}
+ s.parent = rs
+ s.serverWithTimeout.init(clock)
+ s.init()
+ return s
+}
+
+// EventType identifies an event type, either related to a request or the server
+// in general. Server events can also be externally defined.
+type EventType struct {
+ Name string
+ requestEvent bool // all request events are pre-defined in request package
+}
+
+// Event describes an event where the type of Data depends on Type.
+// Server field is not required when sent through the event callback; it is filled
+// out when processed by the Scheduler. Note that the Scheduler can also create
+// and send events (EvRegistered, EvUnregistered) directly.
+type Event struct {
+ Type *EventType
+ Server Server // filled by Scheduler
+ Data any
+}
+
+// IsRequestEvent returns true if the event is a request event
+func (e *Event) IsRequestEvent() bool {
+ return e.Type.requestEvent
+}
+
+// RequestInfo assumes that the event is a request event and returns its contents
+// in a convenient form.
+func (e *Event) RequestInfo() (ServerAndID, Request, Response) {
+ data := e.Data.(RequestResponse)
+ return ServerAndID{Server: e.Server, ID: data.ID}, data.Request, data.Response
+}
+
+// RequestResponse is the Data type of request events.
+type RequestResponse struct {
+ ID ID
+ Request Request
+ Response Response
+}
+
+// serverWithTimeout wraps a requestServer and introduces timeouts.
+// The request's lifecycle is concluded if EvResponse or EvFail emitted by the
+// parent requestServer. If this does not happen until softRequestTimeout then
+// EvTimeout is emitted, after which the final EvResponse or EvFail is still
+// guaranteed to follow.
+// If the parent fails to send this final event for hardRequestTimeout then
+// serverWithTimeout emits EvFail and discards any further events from the
+// parent related to the given request.
+type serverWithTimeout struct {
+ parent requestServer
+ lock sync.Mutex
+ clock mclock.Clock
+ childEventCb func(event Event)
+ timeouts map[ID]mclock.Timer
+ lastID ID
+}
+
+// init initializes serverWithTimeout
+func (s *serverWithTimeout) init(clock mclock.Clock) {
+ s.clock = clock
+ s.timeouts = make(map[ID]mclock.Timer)
+}
+
+// subscribe subscribes to events which include parent (requestServer) events
+// plus EvTimeout.
+func (s *serverWithTimeout) subscribe(eventCallback func(event Event)) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.childEventCb = eventCallback
+ s.parent.Subscribe(s.eventCallback)
+}
+
+// sendRequest generated a new request ID, emits EvRequest, sets up the timeout
+// timer, then sends the request through the parent (requestServer).
+func (s *serverWithTimeout) sendRequest(request Request) (reqId ID) {
+ s.lock.Lock()
+ s.lastID++
+ id := s.lastID
+ s.startTimeout(RequestResponse{ID: id, Request: request})
+ s.lock.Unlock()
+ s.parent.SendRequest(id, request)
+ return id
+}
+
+// eventCallback is called by parent (requestServer) event subscription.
+func (s *serverWithTimeout) eventCallback(event Event) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ switch event.Type {
+ case EvResponse, EvFail:
+ id := event.Data.(RequestResponse).ID
+ if timer, ok := s.timeouts[id]; ok {
+ // Note: if stopping the timer is unsuccessful then the resulting AfterFunc
+ // call will just do nothing
+ timer.Stop()
+ delete(s.timeouts, id)
+ s.childEventCb(event)
+ }
+ default:
+ s.childEventCb(event)
+ }
+}
+
+// startTimeout starts a timeout timer for the given request.
+func (s *serverWithTimeout) startTimeout(reqData RequestResponse) {
+ id := reqData.ID
+ s.timeouts[id] = s.clock.AfterFunc(softRequestTimeout, func() {
+ s.lock.Lock()
+ if _, ok := s.timeouts[id]; !ok {
+ s.lock.Unlock()
+ return
+ }
+ s.timeouts[id] = s.clock.AfterFunc(hardRequestTimeout-softRequestTimeout, func() {
+ s.lock.Lock()
+ if _, ok := s.timeouts[id]; !ok {
+ s.lock.Unlock()
+ return
+ }
+ delete(s.timeouts, id)
+ childEventCb := s.childEventCb
+ s.lock.Unlock()
+ childEventCb(Event{Type: EvFail, Data: reqData})
+ })
+ childEventCb := s.childEventCb
+ s.lock.Unlock()
+ childEventCb(Event{Type: EvTimeout, Data: reqData})
+ })
+}
+
+// stop stops all goroutines associated with the server.
+func (s *serverWithTimeout) unsubscribe() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for _, timer := range s.timeouts {
+ if timer != nil {
+ timer.Stop()
+ }
+ }
+ s.childEventCb = nil
+ s.parent.Unsubscribe()
+}
+
+// serverWithLimits wraps serverWithTimeout and implements server. It limits the
+// number of parallel in-flight requests and prevents sending new requests when a
+// pending one has already timed out. Server events are also rate limited.
+// It also implements a failure delay mechanism that adds an exponentially growing
+// delay each time a request fails (wrong answer or hard timeout). This makes the
+// syncing mechanism less brittle as temporary failures of the server might happen
+// sometimes, but still avoids hammering a non-functional server with requests.
+type serverWithLimits struct {
+ serverWithTimeout
+ lock sync.Mutex
+ childEventCb func(event Event)
+ softTimeouts map[ID]struct{}
+ pendingCount, timeoutCount int
+ parallelLimit float32
+ sendEvent bool
+ delayTimer mclock.Timer
+ delayCounter int
+ failureDelayEnd mclock.AbsTime
+ failureDelay float64
+ serverEventBuffer int
+ eventBufferUpdated mclock.AbsTime
+}
+
+// init initializes serverWithLimits
+func (s *serverWithLimits) init() {
+ s.softTimeouts = make(map[ID]struct{})
+ s.parallelLimit = defaultParallelLimit
+ s.serverEventBuffer = maxServerEventBuffer
+}
+
+// subscribe subscribes to events which include parent (serverWithTimeout) events
+// plus EvCanRequstAgain.
+func (s *serverWithLimits) subscribe(eventCallback func(event Event)) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.childEventCb = eventCallback
+ s.serverWithTimeout.subscribe(s.eventCallback)
+}
+
+// eventCallback is called by parent (serverWithTimeout) event subscription.
+func (s *serverWithLimits) eventCallback(event Event) {
+ s.lock.Lock()
+ var sendCanRequestAgain bool
+ passEvent := true
+ switch event.Type {
+ case EvTimeout:
+ id := event.Data.(RequestResponse).ID
+ s.softTimeouts[id] = struct{}{}
+ s.timeoutCount++
+ s.parallelLimit -= parallelAdjustDown
+ if s.parallelLimit < minParallelLimit {
+ s.parallelLimit = minParallelLimit
+ }
+ log.Debug("Server timeout", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
+ case EvResponse, EvFail:
+ id := event.Data.(RequestResponse).ID
+ if _, ok := s.softTimeouts[id]; ok {
+ delete(s.softTimeouts, id)
+ s.timeoutCount--
+ log.Debug("Server timeout finalized", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
+ }
+ if event.Type == EvResponse && s.pendingCount >= int(s.parallelLimit) {
+ s.parallelLimit += parallelAdjustUp
+ }
+ s.pendingCount--
+ if s.canRequest() {
+ sendCanRequestAgain = s.sendEvent
+ s.sendEvent = false
+ }
+ if event.Type == EvFail {
+ s.failLocked("failed request")
+ }
+ default:
+ // server event; check rate limit
+ if s.serverEventBuffer < maxServerEventBuffer {
+ now := s.clock.Now()
+ sinceUpdate := time.Duration(now - s.eventBufferUpdated)
+ if sinceUpdate >= maxServerEventRate*time.Duration(maxServerEventBuffer-s.serverEventBuffer) {
+ s.serverEventBuffer = maxServerEventBuffer
+ s.eventBufferUpdated = now
+ } else {
+ addBuffer := int(sinceUpdate / maxServerEventRate)
+ s.serverEventBuffer += addBuffer
+ s.eventBufferUpdated += mclock.AbsTime(maxServerEventRate * time.Duration(addBuffer))
+ }
+ }
+ if s.serverEventBuffer > 0 {
+ s.serverEventBuffer--
+ } else {
+ passEvent = false
+ }
+ }
+ childEventCb := s.childEventCb
+ s.lock.Unlock()
+ if passEvent {
+ childEventCb(event)
+ }
+ if sendCanRequestAgain {
+ childEventCb(Event{Type: EvCanRequestAgain})
+ }
+}
+
+// sendRequest sends a request through the parent (serverWithTimeout).
+func (s *serverWithLimits) sendRequest(request Request) (reqId ID) {
+ s.lock.Lock()
+ s.pendingCount++
+ s.lock.Unlock()
+ return s.serverWithTimeout.sendRequest(request)
+}
+
+// stop stops all goroutines associated with the server.
+func (s *serverWithLimits) unsubscribe() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if s.delayTimer != nil {
+ s.delayTimer.Stop()
+ s.delayTimer = nil
+ }
+ s.childEventCb = nil
+ s.serverWithTimeout.unsubscribe()
+}
+
+// canRequest checks whether a new request can be started.
+func (s *serverWithLimits) canRequest() bool {
+ if s.delayTimer != nil || s.pendingCount >= int(s.parallelLimit) || s.timeoutCount > 0 {
+ return false
+ }
+ if s.parallelLimit < minParallelLimit {
+ s.parallelLimit = minParallelLimit
+ }
+ return true
+}
+
+// canRequestNow checks whether a new request can be started, according to the
+// current in-flight request count and parallelLimit, and also the failure delay
+// timer.
+// If it returns false then it is guaranteed that an EvCanRequestAgain will be
+// sent whenever the server becomes available for requesting again.
+func (s *serverWithLimits) canRequestNow() bool {
+ var sendCanRequestAgain bool
+ s.lock.Lock()
+ canRequest := s.canRequest()
+ if canRequest {
+ sendCanRequestAgain = s.sendEvent
+ s.sendEvent = false
+ }
+ childEventCb := s.childEventCb
+ s.lock.Unlock()
+ if sendCanRequestAgain {
+ childEventCb(Event{Type: EvCanRequestAgain})
+ }
+ return canRequest
+}
+
+// delay sets the delay timer to the given duration, disabling new requests for
+// the given period.
+func (s *serverWithLimits) delay(delay time.Duration) {
+ if s.delayTimer != nil {
+ // Note: if stopping the timer is unsuccessful then the resulting AfterFunc
+ // call will just do nothing
+ s.delayTimer.Stop()
+ s.delayTimer = nil
+ }
+
+ s.delayCounter++
+ delayCounter := s.delayCounter
+ log.Debug("Server delay started", "length", delay)
+ s.delayTimer = s.clock.AfterFunc(delay, func() {
+ log.Debug("Server delay ended", "length", delay)
+ var sendCanRequestAgain bool
+ s.lock.Lock()
+ if s.delayTimer != nil && s.delayCounter == delayCounter { // do nothing if there is a new timer now
+ s.delayTimer = nil
+ if s.canRequest() {
+ sendCanRequestAgain = s.sendEvent
+ s.sendEvent = false
+ }
+ }
+ childEventCb := s.childEventCb
+ s.lock.Unlock()
+ if sendCanRequestAgain {
+ childEventCb(Event{Type: EvCanRequestAgain})
+ }
+ })
+}
+
+// fail reports that a response from the server was found invalid by the processing
+// Module, disabling new requests for a dynamically adjused time period.
+func (s *serverWithLimits) fail(desc string) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.failLocked(desc)
+}
+
+// failLocked calculates the dynamic failure delay and applies it.
+func (s *serverWithLimits) failLocked(desc string) {
+ log.Debug("Server error", "description", desc)
+ s.failureDelay *= 2
+ now := s.clock.Now()
+ if now > s.failureDelayEnd {
+ s.failureDelay *= math.Pow(2, -float64(now-s.failureDelayEnd)/float64(maxFailureDelay))
+ }
+ if s.failureDelay < float64(minFailureDelay) {
+ s.failureDelay = float64(minFailureDelay)
+ }
+ s.failureDelayEnd = now + mclock.AbsTime(s.failureDelay)
+ s.delay(time.Duration(s.failureDelay))
+}
diff --git a/beacon/light/request/server_test.go b/beacon/light/request/server_test.go
new file mode 100644
index 0000000000..b6b9edf9a0
--- /dev/null
+++ b/beacon/light/request/server_test.go
@@ -0,0 +1,158 @@
+package request
+
+import (
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+)
+
+const (
+ testRequest = "Life, the Universe, and Everything"
+ testResponse = 42
+)
+
+var testEventType = &EventType{Name: "testEvent"}
+
+func TestServerEvents(t *testing.T) {
+ rs := &testRequestServer{}
+ clock := &mclock.Simulated{}
+ srv := NewServer(rs, clock)
+ var lastEventType *EventType
+ srv.subscribe(func(event Event) { lastEventType = event.Type })
+ evTypeName := func(evType *EventType) string {
+ if evType == nil {
+ return "none"
+ }
+ return evType.Name
+ }
+ expEvent := func(expType *EventType) {
+ if lastEventType != expType {
+ t.Errorf("Wrong event type (expected %s, got %s)", evTypeName(expType), evTypeName(lastEventType))
+ }
+ lastEventType = nil
+ }
+ // user events should simply be passed through
+ rs.eventCb(Event{Type: testEventType})
+ expEvent(testEventType)
+ // send request, soft timeout, then valid response
+ srv.sendRequest(testRequest)
+ clock.WaitForTimers(1)
+ clock.Run(softRequestTimeout)
+ expEvent(EvTimeout)
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
+ expEvent(EvResponse)
+ // send request, hard timeout (response after hard timeout should be ignored)
+ srv.sendRequest(testRequest)
+ clock.WaitForTimers(1)
+ clock.Run(softRequestTimeout)
+ expEvent(EvTimeout)
+ clock.WaitForTimers(1)
+ clock.Run(hardRequestTimeout)
+ expEvent(EvFail)
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
+ expEvent(nil)
+}
+
+func TestServerParallel(t *testing.T) {
+ rs := &testRequestServer{}
+ srv := NewServer(rs, &mclock.Simulated{})
+ srv.subscribe(func(event Event) {})
+
+ expSend := func(expSent int) {
+ var sent int
+ for sent <= expSent {
+ if !srv.canRequestNow() {
+ break
+ }
+ sent++
+ srv.sendRequest(testRequest)
+ }
+ if sent != expSent {
+ t.Errorf("Wrong number of parallel requests accepted (expected %d, got %d)", expSent, sent)
+ }
+ }
+ // max out parallel allowance
+ expSend(defaultParallelLimit)
+ // 1 answered, should accept 1 more
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
+ expSend(1)
+ // 2 answered, should accept 2 more
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 2, Request: testRequest, Response: testResponse}})
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 3, Request: testRequest, Response: testResponse}})
+ expSend(2)
+ // failed request, should decrease allowance and not accept more
+ rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 4, Request: testRequest}})
+ expSend(0)
+ srv.unsubscribe()
+}
+
+func TestServerFail(t *testing.T) {
+ rs := &testRequestServer{}
+ clock := &mclock.Simulated{}
+ srv := NewServer(rs, clock)
+ srv.subscribe(func(event Event) {})
+ expCanRequest := func(expCanRequest bool) {
+ if canRequest := srv.canRequestNow(); canRequest != expCanRequest {
+ t.Errorf("Wrong result for canRequestNow (expected %v, got %v)", expCanRequest, canRequest)
+ }
+ }
+ // timed out request
+ expCanRequest(true)
+ srv.sendRequest(testRequest)
+ clock.WaitForTimers(1)
+ expCanRequest(true)
+ clock.Run(softRequestTimeout)
+ expCanRequest(false) // cannot request when there is a timed out request
+ rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
+ expCanRequest(true)
+ // explicit server.Fail
+ srv.fail("")
+ clock.WaitForTimers(1)
+ expCanRequest(false) // cannot request for a while after a failure
+ clock.Run(minFailureDelay)
+ expCanRequest(true)
+ // request returned with EvFail
+ srv.sendRequest(testRequest)
+ rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 2, Request: testRequest}})
+ clock.WaitForTimers(1)
+ expCanRequest(false) // EvFail should also start failure delay
+ clock.Run(minFailureDelay)
+ expCanRequest(false) // second failure delay is longer, should still be disabled
+ clock.Run(minFailureDelay)
+ expCanRequest(true)
+ srv.unsubscribe()
+}
+
+func TestServerEventRateLimit(t *testing.T) {
+ rs := &testRequestServer{}
+ clock := &mclock.Simulated{}
+ srv := NewServer(rs, clock)
+ var eventCount int
+ srv.subscribe(func(event Event) {
+ if !event.IsRequestEvent() {
+ eventCount++
+ }
+ })
+ expEvents := func(send, expAllowed int) {
+ eventCount = 0
+ for sent := 0; sent < send; sent++ {
+ rs.eventCb(Event{Type: testEventType})
+ }
+ if eventCount != expAllowed {
+ t.Errorf("Wrong number of server events passing rate limitation (sent %d, expected %d, got %d)", send, expAllowed, eventCount)
+ }
+ }
+ expEvents(maxServerEventBuffer+5, maxServerEventBuffer)
+ clock.Run(maxServerEventRate)
+ expEvents(5, 1)
+ clock.Run(maxServerEventRate * maxServerEventBuffer * 2)
+ expEvents(maxServerEventBuffer+5, maxServerEventBuffer)
+}
+
+type testRequestServer struct {
+ eventCb func(Event)
+}
+
+func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
+func (rs *testRequestServer) SendRequest(ID, Request) {}
+func (rs *testRequestServer) Unsubscribe() {}