From eceba4c6f74fad27bf17cd2bdf63d2559779aa93 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 1 Feb 2024 06:58:04 +0100 Subject: [PATCH] beacon/light/request: simple request framework --- beacon/light/request/scheduler.go | 401 ++++++++++++++++++++++ beacon/light/request/scheduler_test.go | 122 +++++++ beacon/light/request/server.go | 439 +++++++++++++++++++++++++ beacon/light/request/server_test.go | 158 +++++++++ 4 files changed, 1120 insertions(+) create mode 100644 beacon/light/request/scheduler.go create mode 100644 beacon/light/request/scheduler_test.go create mode 100644 beacon/light/request/server.go create mode 100644 beacon/light/request/server_test.go diff --git a/beacon/light/request/scheduler.go b/beacon/light/request/scheduler.go new file mode 100644 index 0000000000..7f16d77d1e --- /dev/null +++ b/beacon/light/request/scheduler.go @@ -0,0 +1,401 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package request + +import ( + "sync" + + "github.com/ethereum/go-ethereum/log" +) + +// Module represents a mechanism which is typically responsible for downloading +// and updating a passive data structure. It does not directly interact with the +// servers. It can start requests using the Requester interface, maintain its +// internal state by receiving and processing Events and update its target data +// structure based on the obtained data. +// It is the Scheduler's responsibility to feed events to the modules, call +// Process as long as there might be something to process and then generate request +// candidates using MakeRequest and start the best possible requests. +// Modules are called by Scheduler whenever a global trigger is fired. All events +// fire the trigger. Changing a target data structure also triggers a next +// processing round as it could make further actions possible either by the same +// or another Module. +type Module interface { + // Process is a non-blocking function responsible for starting requests, + // processing events and updating the target data structures(s) and the + // internal state of the module. Module state typically consists of information + // about pending requests and registered servers. + // Process is always called after an event is received or after a target data + // structure has been changed. + // + // Note: Process functions of different modules are never called concurrently; + // they are called by Scheduler in the same order of priority as they were + // registered in. + Process(Requester, []Event) +} + +// Requester allows Modules to obtain the list of momentarily available servers, +// start new requests and report server failure when a response has been proven +// to be invalid in the processing phase. +// Note that all Requester functions should be safe to call from Module.Process. +type Requester interface { + CanSendTo() []Server + Send(Server, Request) ID + Fail(Server, string) +} + +// Scheduler is a modular network data retrieval framework that coordinates multiple +// servers and retrieval mechanisms (modules). It implements a trigger mechanism +// that calls the Process function of registered modules whenever either the state +// of existing data structures or events coming from registered servers could +// allow new operations. +type Scheduler struct { + lock sync.Mutex + modules []Module // first has highest priority + names map[Module]string + servers map[server]struct{} + targets map[targetData]uint64 + + requesterLock sync.RWMutex + serverOrder []server + pending map[ServerAndID]pendingRequest + + // eventLock guards access to the events list. Note that eventLock can be + // locked either while lock is locked or unlocked but lock cannot be locked + // while eventLock is locked. + eventLock sync.Mutex + events []Event + stopCh chan chan struct{} + + triggerCh chan struct{} // restarts waiting sync loop + // if trigger has already been fired then send to testWaitCh blocks until + // the triggered processing round is finished + testWaitCh chan struct{} +} + +type ( + // Server identifies a server without allowing any direct interaction. + // Note: server interface is used by Scheduler and Tracker but not used by + // the modules that do not interact with them directly. + // In order to make module testing easier, Server interface is used in + // events and modules. + Server any + Request any + Response any + ID uint64 + ServerAndID struct { + Server Server + ID ID + } +) + +// targetData represents a registered target data structure that increases its +// ChangeCounter whenever it has been changed. +type targetData interface { + ChangeCounter() uint64 +} + +// pendingRequest keeps track of sent and not yet finalized requests and their +// sender modules. +type pendingRequest struct { + request Request + module Module +} + +// NewScheduler creates a new Scheduler. +func NewScheduler() *Scheduler { + s := &Scheduler{ + servers: make(map[server]struct{}), + names: make(map[Module]string), + pending: make(map[ServerAndID]pendingRequest), + targets: make(map[targetData]uint64), + stopCh: make(chan chan struct{}), + // Note: testWaitCh should not have capacity in order to ensure + // that after a trigger happens testWaitCh will block until the resulting + // processing round has been finished + triggerCh: make(chan struct{}, 1), + testWaitCh: make(chan struct{}), + } + return s +} + +// RegisterTarget registers a target data structure, ensuring that any changes +// made to it trigger a new round of Module.Process calls, giving a chance to +// modules to react to the changes. +func (s *Scheduler) RegisterTarget(t targetData) { + s.lock.Lock() + defer s.lock.Unlock() + + s.targets[t] = 0 +} + +// RegisterModule registers a module. Should be called before starting the scheduler. +// In each processing round the order of module processing depends on the order of +// registration. +func (s *Scheduler) RegisterModule(m Module, name string) { + s.lock.Lock() + defer s.lock.Unlock() + + s.modules = append(s.modules, m) + s.names[m] = name +} + +// RegisterServer registers a new server. +func (s *Scheduler) RegisterServer(server server) { + s.lock.Lock() + defer s.lock.Unlock() + + s.addEvent(Event{Type: EvRegistered, Server: server}) + server.subscribe(func(event Event) { + event.Server = server + s.addEvent(event) + }) +} + +// UnregisterServer removes a registered server. +func (s *Scheduler) UnregisterServer(server server) { + s.lock.Lock() + defer s.lock.Unlock() + + server.unsubscribe() + s.addEvent(Event{Type: EvUnregistered, Server: server}) +} + +// Start starts the scheduler. It should be called after registering all modules +// and before registering any servers. +func (s *Scheduler) Start() { + go s.syncLoop() +} + +// Stop stops the scheduler. +func (s *Scheduler) Stop() { + stop := make(chan struct{}) + s.stopCh <- stop + <-stop + s.lock.Lock() + for server := range s.servers { + server.unsubscribe() + } + s.servers = nil + s.lock.Unlock() +} + +// syncLoop is the main event loop responsible for event/data processing and +// sending new requests. +// A round of processing starts whenever the global trigger is fired. Triggers +// fired during a processing round ensure that there is going to be a next round. +func (s *Scheduler) syncLoop() { + for { + s.lock.Lock() + s.processRound() + s.lock.Unlock() + loop: + for { + select { + case stop := <-s.stopCh: + close(stop) + return + case <-s.triggerCh: + break loop + case <-s.testWaitCh: + } + } + } +} + +// targetChanged returns true if a registered target data structure has been +// changed since the last call to this function. +func (s *Scheduler) targetChanged() (changed bool) { + for target, counter := range s.targets { + if newCounter := target.ChangeCounter(); newCounter != counter { + s.targets[target] = newCounter + changed = true + } + } + return +} + +// processRound runs an entire processing round. It calls the Process functions +// of all modules, passing all relevant events and repeating Process calls as +// long as any changes have been made to the registered target data structures. +// Once all events have been processed and a stable state has been achieved, +// requests are generated and sent if necessary and possible. +func (s *Scheduler) processRound() { + for { + log.Debug("Processing modules") + filteredEvents := s.filterEvents() + for _, module := range s.modules { + log.Debug("Processing module", "name", s.names[module], "events", len(filteredEvents[module])) + module.Process(requester{s, module}, filteredEvents[module]) + } + if !s.targetChanged() { + break + } + } +} + +// Trigger starts a new processing round. If fired during processing, it ensures +// another full round of processing all modules. +func (s *Scheduler) Trigger() { + select { + case s.triggerCh <- struct{}{}: + default: + } +} + +// addEvent adds an event to be processed in the next round. Note that it can be +// called regardless of the state of the lock mutex, making it safe for use in +// the server event callback. +func (s *Scheduler) addEvent(event Event) { + s.eventLock.Lock() + s.events = append(s.events, event) + s.eventLock.Unlock() + s.Trigger() +} + +// filterEvent sorts each Event either as a request event or a server event, +// depending on its type. Request events are also sorted in a map based on the +// module that originally initiated the request. It also ensures that no events +// related to a server are returned before EvRegistered or after EvUnregistered. +// In case of an EvUnregistered server event it also closes all pending requests +// to the given server by adding a failed request event (EvFail), ensuring that +// all requests get finalized and thereby allowing the module logic to be safe +// and simple. +func (s *Scheduler) filterEvents() map[Module][]Event { + s.eventLock.Lock() + events := s.events + s.events = nil + s.eventLock.Unlock() + + s.requesterLock.Lock() + defer s.requesterLock.Unlock() + + filteredEvents := make(map[Module][]Event) + for _, event := range events { + server := event.Server.(server) + if _, ok := s.servers[server]; !ok && event.Type != EvRegistered { + continue // before EvRegister or after EvUnregister, discard + } + + if event.IsRequestEvent() { + sid, _, _ := event.RequestInfo() + pending, ok := s.pending[sid] + if !ok { + continue // request already closed, ignore further events + } + if event.Type == EvResponse || event.Type == EvFail { + delete(s.pending, sid) // final event, close pending request + } + filteredEvents[pending.module] = append(filteredEvents[pending.module], event) + } else { + switch event.Type { + case EvRegistered: + s.servers[server] = struct{}{} + s.serverOrder = append(s.serverOrder, nil) + copy(s.serverOrder[1:], s.serverOrder[:len(s.serverOrder)-1]) + s.serverOrder[0] = server + case EvUnregistered: + s.closePending(event.Server, filteredEvents) + delete(s.servers, server) + for i, srv := range s.serverOrder { + if srv == server { + copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:]) + s.serverOrder = s.serverOrder[:len(s.serverOrder)-1] + break + } + } + } + for _, module := range s.modules { + filteredEvents[module] = append(filteredEvents[module], event) + } + } + } + return filteredEvents +} + +// closePending closes all pending requests to the given server and adds an EvFail +// event to properly finalize them +func (s *Scheduler) closePending(server Server, filteredEvents map[Module][]Event) { + for sid, pending := range s.pending { + if sid.Server == server { + filteredEvents[pending.module] = append(filteredEvents[pending.module], Event{ + Type: EvFail, + Server: server, + Data: RequestResponse{ + ID: sid.ID, + Request: pending.request, + }, + }) + delete(s.pending, sid) + } + } +} + +// requester implements Requester. Note that while requester basically wraps +// Scheduler (with the added information of the currently processed Module), all +// functions are safe to call from Module.Process which is running while +// the Scheduler.lock mutex is held. +type requester struct { + *Scheduler + module Module +} + +// CanSendTo returns the list of currently available servers. It also returns +// them in an order of least to most recently used, ensuring a round-robin usage +// of suitable servers if the module always chooses the first suitable one. +func (s requester) CanSendTo() []Server { + s.requesterLock.RLock() + defer s.requesterLock.RUnlock() + + list := make([]Server, 0, len(s.serverOrder)) + for _, server := range s.serverOrder { + if server.canRequestNow() { + list = append(list, server) + } + } + return list +} + +// Send sends a request and adds an entry to Scheduler.pending map, ensuring that +// related request events will be delivered to the sender Module. +func (s requester) Send(srv Server, req Request) ID { + s.requesterLock.Lock() + defer s.requesterLock.Unlock() + + server := srv.(server) + id := server.sendRequest(req) + sid := ServerAndID{Server: srv, ID: id} + s.pending[sid] = pendingRequest{request: req, module: s.module} + for i, ss := range s.serverOrder { + if ss == server { + copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:]) + s.serverOrder[len(s.serverOrder)-1] = server + return id + } + } + log.Error("Target server not found in ordered list of registered servers") + return id +} + +// Fail should be called when a server delivers invalid or useless information. +// Calling Fail disables the given server for a period that is initially short +// but is exponentially growing if it happens frequently. This results in a +// somewhat fault tolerant operation that avoids hammering servers with requests +// that they cannot serve but still gives them a chance periodically. +func (s requester) Fail(srv Server, desc string) { + srv.(server).fail(desc) +} diff --git a/beacon/light/request/scheduler_test.go b/beacon/light/request/scheduler_test.go new file mode 100644 index 0000000000..310e33af77 --- /dev/null +++ b/beacon/light/request/scheduler_test.go @@ -0,0 +1,122 @@ +package request + +import ( + "reflect" + "testing" +) + +func TestEventFilter(t *testing.T) { + s := NewScheduler() + module1 := &testModule{name: "module1"} + module2 := &testModule{name: "module2"} + s.RegisterModule(module1, "module1") + s.RegisterModule(module2, "module2") + s.Start() + // startup process round without events + s.testWaitCh <- struct{}{} + module1.expProcess(t, nil) + module2.expProcess(t, nil) + srv := &testServer{} + // register server; both modules should receive server event + s.RegisterServer(srv) + s.testWaitCh <- struct{}{} + module1.expProcess(t, []Event{ + Event{Type: EvRegistered, Server: srv}, + }) + module2.expProcess(t, []Event{ + Event{Type: EvRegistered, Server: srv}, + }) + // let module1 send a request + srv.canRequest = 1 + module1.sendReq = testRequest + s.Trigger() + // in first triggered round module1 sends the request, no events yet + s.testWaitCh <- struct{}{} + module1.expProcess(t, nil) + module2.expProcess(t, nil) + // server emits EvTimeout; only module1 should receive it + srv.eventCb(Event{Type: EvTimeout, Data: RequestResponse{ID: 1, Request: testRequest}}) + s.testWaitCh <- struct{}{} + module1.expProcess(t, []Event{ + Event{Type: EvTimeout, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}}, + }) + module2.expProcess(t, nil) + // unregister server; both modules should receive server event + s.UnregisterServer(srv) + s.testWaitCh <- struct{}{} + module1.expProcess(t, []Event{ + // module1 should also receive EvFail on its pending request + Event{Type: EvFail, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}}, + Event{Type: EvUnregistered, Server: srv}, + }) + module2.expProcess(t, []Event{ + Event{Type: EvUnregistered, Server: srv}, + }) + // response after server unregistered; should be discarded + srv.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) + s.testWaitCh <- struct{}{} + module1.expProcess(t, nil) + module2.expProcess(t, nil) + // no more process rounds expected; shut down + s.testWaitCh <- struct{}{} + module1.expNoMoreProcess(t) + module2.expNoMoreProcess(t) + s.Stop() +} + +type testServer struct { + eventCb func(Event) + lastID ID + canRequest int +} + +func (s *testServer) subscribe(eventCb func(Event)) { + s.eventCb = eventCb +} + +func (s *testServer) canRequestNow() bool { + return s.canRequest > 0 +} + +func (s *testServer) sendRequest(req Request) ID { + s.canRequest-- + s.lastID++ + return s.lastID +} + +func (s *testServer) fail(string) {} +func (s *testServer) unsubscribe() {} + +type testModule struct { + name string + processed [][]Event + sendReq Request +} + +func (m *testModule) Process(requester Requester, events []Event) { + m.processed = append(m.processed, events) + if m.sendReq != nil { + if cs := requester.CanSendTo(); len(cs) > 0 { + requester.Send(cs[0], m.sendReq) + } + } +} + +func (m *testModule) expProcess(t *testing.T, expEvents []Event) { + if len(m.processed) == 0 { + t.Errorf("Missing call to %s.Process", m.name) + return + } + events := m.processed[0] + m.processed = m.processed[1:] + if !reflect.DeepEqual(events, expEvents) { + t.Errorf("Call to %s.Process with wrong events (expected %v, got %v)", m.name, expEvents, events) + } +} + +func (m *testModule) expNoMoreProcess(t *testing.T) { + for len(m.processed) > 0 { + t.Errorf("Unexpected call to %s.Process with events %v", m.name, m.processed[0]) + m.processed = m.processed[1:] + } +} diff --git a/beacon/light/request/server.go b/beacon/light/request/server.go new file mode 100644 index 0000000000..999f64178a --- /dev/null +++ b/beacon/light/request/server.go @@ -0,0 +1,439 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package request + +import ( + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/log" +) + +var ( + // request events + EvResponse = &EventType{Name: "response", requestEvent: true} // data: RequestResponse; sent by requestServer + EvFail = &EventType{Name: "fail", requestEvent: true} // data: RequestResponse; sent by requestServer + EvTimeout = &EventType{Name: "timeout", requestEvent: true} // data: RequestResponse; sent by serverWithTimeout + // server events + EvRegistered = &EventType{Name: "registered"} // data: nil; sent by Scheduler + EvUnregistered = &EventType{Name: "unregistered"} // data: nil; sent by Scheduler + EvCanRequestAgain = &EventType{Name: "canRequestAgain"} // data: nil; sent by serverWithLimits +) + +const ( + softRequestTimeout = time.Second // allow resending request to a different server but do not cancel yet + hardRequestTimeout = time.Second * 10 // cancel request +) + +const ( + // serverWithLimits parameters + parallelAdjustUp = 0.1 // adjust parallelLimit up in case of success under full load + parallelAdjustDown = 1 // adjust parallelLimit down in case of timeout/failure + minParallelLimit = 1 // parallelLimit lower bound + defaultParallelLimit = 3 // parallelLimit initial value + minFailureDelay = time.Millisecond * 100 // minimum disable time in case of request failure + maxFailureDelay = time.Minute // maximum disable time in case of request failure + maxServerEventBuffer = 5 // server event allowance buffer limit + maxServerEventRate = time.Second // server event allowance buffer recharge rate +) + +// requestServer can send requests in a non-blocking way and feed back events +// through the event callback. After each request it should send back either +// EvResponse or EvFail. Additionally, it may also send application-defined +// events that the Modules can interpret. +type requestServer interface { + Subscribe(eventCallback func(Event)) + SendRequest(ID, Request) + Unsubscribe() +} + +// server is implemented by a requestServer wrapped into serverWithTimeout and +// serverWithLimits and is used by Scheduler. +// In addition to requestServer functionality, server can also handle timeouts, +// limit the number of parallel in-flight requests and temporarily disable +// new requests based on timeouts and response failures. +type server interface { + subscribe(eventCallback func(Event)) + canRequestNow() bool + sendRequest(Request) ID + fail(string) + unsubscribe() +} + +// NewServer wraps a requestServer and returns a server +func NewServer(rs requestServer, clock mclock.Clock) server { + s := &serverWithLimits{} + s.parent = rs + s.serverWithTimeout.init(clock) + s.init() + return s +} + +// EventType identifies an event type, either related to a request or the server +// in general. Server events can also be externally defined. +type EventType struct { + Name string + requestEvent bool // all request events are pre-defined in request package +} + +// Event describes an event where the type of Data depends on Type. +// Server field is not required when sent through the event callback; it is filled +// out when processed by the Scheduler. Note that the Scheduler can also create +// and send events (EvRegistered, EvUnregistered) directly. +type Event struct { + Type *EventType + Server Server // filled by Scheduler + Data any +} + +// IsRequestEvent returns true if the event is a request event +func (e *Event) IsRequestEvent() bool { + return e.Type.requestEvent +} + +// RequestInfo assumes that the event is a request event and returns its contents +// in a convenient form. +func (e *Event) RequestInfo() (ServerAndID, Request, Response) { + data := e.Data.(RequestResponse) + return ServerAndID{Server: e.Server, ID: data.ID}, data.Request, data.Response +} + +// RequestResponse is the Data type of request events. +type RequestResponse struct { + ID ID + Request Request + Response Response +} + +// serverWithTimeout wraps a requestServer and introduces timeouts. +// The request's lifecycle is concluded if EvResponse or EvFail emitted by the +// parent requestServer. If this does not happen until softRequestTimeout then +// EvTimeout is emitted, after which the final EvResponse or EvFail is still +// guaranteed to follow. +// If the parent fails to send this final event for hardRequestTimeout then +// serverWithTimeout emits EvFail and discards any further events from the +// parent related to the given request. +type serverWithTimeout struct { + parent requestServer + lock sync.Mutex + clock mclock.Clock + childEventCb func(event Event) + timeouts map[ID]mclock.Timer + lastID ID +} + +// init initializes serverWithTimeout +func (s *serverWithTimeout) init(clock mclock.Clock) { + s.clock = clock + s.timeouts = make(map[ID]mclock.Timer) +} + +// subscribe subscribes to events which include parent (requestServer) events +// plus EvTimeout. +func (s *serverWithTimeout) subscribe(eventCallback func(event Event)) { + s.lock.Lock() + defer s.lock.Unlock() + + s.childEventCb = eventCallback + s.parent.Subscribe(s.eventCallback) +} + +// sendRequest generated a new request ID, emits EvRequest, sets up the timeout +// timer, then sends the request through the parent (requestServer). +func (s *serverWithTimeout) sendRequest(request Request) (reqId ID) { + s.lock.Lock() + s.lastID++ + id := s.lastID + s.startTimeout(RequestResponse{ID: id, Request: request}) + s.lock.Unlock() + s.parent.SendRequest(id, request) + return id +} + +// eventCallback is called by parent (requestServer) event subscription. +func (s *serverWithTimeout) eventCallback(event Event) { + s.lock.Lock() + defer s.lock.Unlock() + + switch event.Type { + case EvResponse, EvFail: + id := event.Data.(RequestResponse).ID + if timer, ok := s.timeouts[id]; ok { + // Note: if stopping the timer is unsuccessful then the resulting AfterFunc + // call will just do nothing + timer.Stop() + delete(s.timeouts, id) + s.childEventCb(event) + } + default: + s.childEventCb(event) + } +} + +// startTimeout starts a timeout timer for the given request. +func (s *serverWithTimeout) startTimeout(reqData RequestResponse) { + id := reqData.ID + s.timeouts[id] = s.clock.AfterFunc(softRequestTimeout, func() { + s.lock.Lock() + if _, ok := s.timeouts[id]; !ok { + s.lock.Unlock() + return + } + s.timeouts[id] = s.clock.AfterFunc(hardRequestTimeout-softRequestTimeout, func() { + s.lock.Lock() + if _, ok := s.timeouts[id]; !ok { + s.lock.Unlock() + return + } + delete(s.timeouts, id) + childEventCb := s.childEventCb + s.lock.Unlock() + childEventCb(Event{Type: EvFail, Data: reqData}) + }) + childEventCb := s.childEventCb + s.lock.Unlock() + childEventCb(Event{Type: EvTimeout, Data: reqData}) + }) +} + +// stop stops all goroutines associated with the server. +func (s *serverWithTimeout) unsubscribe() { + s.lock.Lock() + defer s.lock.Unlock() + + for _, timer := range s.timeouts { + if timer != nil { + timer.Stop() + } + } + s.childEventCb = nil + s.parent.Unsubscribe() +} + +// serverWithLimits wraps serverWithTimeout and implements server. It limits the +// number of parallel in-flight requests and prevents sending new requests when a +// pending one has already timed out. Server events are also rate limited. +// It also implements a failure delay mechanism that adds an exponentially growing +// delay each time a request fails (wrong answer or hard timeout). This makes the +// syncing mechanism less brittle as temporary failures of the server might happen +// sometimes, but still avoids hammering a non-functional server with requests. +type serverWithLimits struct { + serverWithTimeout + lock sync.Mutex + childEventCb func(event Event) + softTimeouts map[ID]struct{} + pendingCount, timeoutCount int + parallelLimit float32 + sendEvent bool + delayTimer mclock.Timer + delayCounter int + failureDelayEnd mclock.AbsTime + failureDelay float64 + serverEventBuffer int + eventBufferUpdated mclock.AbsTime +} + +// init initializes serverWithLimits +func (s *serverWithLimits) init() { + s.softTimeouts = make(map[ID]struct{}) + s.parallelLimit = defaultParallelLimit + s.serverEventBuffer = maxServerEventBuffer +} + +// subscribe subscribes to events which include parent (serverWithTimeout) events +// plus EvCanRequstAgain. +func (s *serverWithLimits) subscribe(eventCallback func(event Event)) { + s.lock.Lock() + defer s.lock.Unlock() + + s.childEventCb = eventCallback + s.serverWithTimeout.subscribe(s.eventCallback) +} + +// eventCallback is called by parent (serverWithTimeout) event subscription. +func (s *serverWithLimits) eventCallback(event Event) { + s.lock.Lock() + var sendCanRequestAgain bool + passEvent := true + switch event.Type { + case EvTimeout: + id := event.Data.(RequestResponse).ID + s.softTimeouts[id] = struct{}{} + s.timeoutCount++ + s.parallelLimit -= parallelAdjustDown + if s.parallelLimit < minParallelLimit { + s.parallelLimit = minParallelLimit + } + log.Debug("Server timeout", "count", s.timeoutCount, "parallelLimit", s.parallelLimit) + case EvResponse, EvFail: + id := event.Data.(RequestResponse).ID + if _, ok := s.softTimeouts[id]; ok { + delete(s.softTimeouts, id) + s.timeoutCount-- + log.Debug("Server timeout finalized", "count", s.timeoutCount, "parallelLimit", s.parallelLimit) + } + if event.Type == EvResponse && s.pendingCount >= int(s.parallelLimit) { + s.parallelLimit += parallelAdjustUp + } + s.pendingCount-- + if s.canRequest() { + sendCanRequestAgain = s.sendEvent + s.sendEvent = false + } + if event.Type == EvFail { + s.failLocked("failed request") + } + default: + // server event; check rate limit + if s.serverEventBuffer < maxServerEventBuffer { + now := s.clock.Now() + sinceUpdate := time.Duration(now - s.eventBufferUpdated) + if sinceUpdate >= maxServerEventRate*time.Duration(maxServerEventBuffer-s.serverEventBuffer) { + s.serverEventBuffer = maxServerEventBuffer + s.eventBufferUpdated = now + } else { + addBuffer := int(sinceUpdate / maxServerEventRate) + s.serverEventBuffer += addBuffer + s.eventBufferUpdated += mclock.AbsTime(maxServerEventRate * time.Duration(addBuffer)) + } + } + if s.serverEventBuffer > 0 { + s.serverEventBuffer-- + } else { + passEvent = false + } + } + childEventCb := s.childEventCb + s.lock.Unlock() + if passEvent { + childEventCb(event) + } + if sendCanRequestAgain { + childEventCb(Event{Type: EvCanRequestAgain}) + } +} + +// sendRequest sends a request through the parent (serverWithTimeout). +func (s *serverWithLimits) sendRequest(request Request) (reqId ID) { + s.lock.Lock() + s.pendingCount++ + s.lock.Unlock() + return s.serverWithTimeout.sendRequest(request) +} + +// stop stops all goroutines associated with the server. +func (s *serverWithLimits) unsubscribe() { + s.lock.Lock() + defer s.lock.Unlock() + + if s.delayTimer != nil { + s.delayTimer.Stop() + s.delayTimer = nil + } + s.childEventCb = nil + s.serverWithTimeout.unsubscribe() +} + +// canRequest checks whether a new request can be started. +func (s *serverWithLimits) canRequest() bool { + if s.delayTimer != nil || s.pendingCount >= int(s.parallelLimit) || s.timeoutCount > 0 { + return false + } + if s.parallelLimit < minParallelLimit { + s.parallelLimit = minParallelLimit + } + return true +} + +// canRequestNow checks whether a new request can be started, according to the +// current in-flight request count and parallelLimit, and also the failure delay +// timer. +// If it returns false then it is guaranteed that an EvCanRequestAgain will be +// sent whenever the server becomes available for requesting again. +func (s *serverWithLimits) canRequestNow() bool { + var sendCanRequestAgain bool + s.lock.Lock() + canRequest := s.canRequest() + if canRequest { + sendCanRequestAgain = s.sendEvent + s.sendEvent = false + } + childEventCb := s.childEventCb + s.lock.Unlock() + if sendCanRequestAgain { + childEventCb(Event{Type: EvCanRequestAgain}) + } + return canRequest +} + +// delay sets the delay timer to the given duration, disabling new requests for +// the given period. +func (s *serverWithLimits) delay(delay time.Duration) { + if s.delayTimer != nil { + // Note: if stopping the timer is unsuccessful then the resulting AfterFunc + // call will just do nothing + s.delayTimer.Stop() + s.delayTimer = nil + } + + s.delayCounter++ + delayCounter := s.delayCounter + log.Debug("Server delay started", "length", delay) + s.delayTimer = s.clock.AfterFunc(delay, func() { + log.Debug("Server delay ended", "length", delay) + var sendCanRequestAgain bool + s.lock.Lock() + if s.delayTimer != nil && s.delayCounter == delayCounter { // do nothing if there is a new timer now + s.delayTimer = nil + if s.canRequest() { + sendCanRequestAgain = s.sendEvent + s.sendEvent = false + } + } + childEventCb := s.childEventCb + s.lock.Unlock() + if sendCanRequestAgain { + childEventCb(Event{Type: EvCanRequestAgain}) + } + }) +} + +// fail reports that a response from the server was found invalid by the processing +// Module, disabling new requests for a dynamically adjused time period. +func (s *serverWithLimits) fail(desc string) { + s.lock.Lock() + defer s.lock.Unlock() + + s.failLocked(desc) +} + +// failLocked calculates the dynamic failure delay and applies it. +func (s *serverWithLimits) failLocked(desc string) { + log.Debug("Server error", "description", desc) + s.failureDelay *= 2 + now := s.clock.Now() + if now > s.failureDelayEnd { + s.failureDelay *= math.Pow(2, -float64(now-s.failureDelayEnd)/float64(maxFailureDelay)) + } + if s.failureDelay < float64(minFailureDelay) { + s.failureDelay = float64(minFailureDelay) + } + s.failureDelayEnd = now + mclock.AbsTime(s.failureDelay) + s.delay(time.Duration(s.failureDelay)) +} diff --git a/beacon/light/request/server_test.go b/beacon/light/request/server_test.go new file mode 100644 index 0000000000..b6b9edf9a0 --- /dev/null +++ b/beacon/light/request/server_test.go @@ -0,0 +1,158 @@ +package request + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +const ( + testRequest = "Life, the Universe, and Everything" + testResponse = 42 +) + +var testEventType = &EventType{Name: "testEvent"} + +func TestServerEvents(t *testing.T) { + rs := &testRequestServer{} + clock := &mclock.Simulated{} + srv := NewServer(rs, clock) + var lastEventType *EventType + srv.subscribe(func(event Event) { lastEventType = event.Type }) + evTypeName := func(evType *EventType) string { + if evType == nil { + return "none" + } + return evType.Name + } + expEvent := func(expType *EventType) { + if lastEventType != expType { + t.Errorf("Wrong event type (expected %s, got %s)", evTypeName(expType), evTypeName(lastEventType)) + } + lastEventType = nil + } + // user events should simply be passed through + rs.eventCb(Event{Type: testEventType}) + expEvent(testEventType) + // send request, soft timeout, then valid response + srv.sendRequest(testRequest) + clock.WaitForTimers(1) + clock.Run(softRequestTimeout) + expEvent(EvTimeout) + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) + expEvent(EvResponse) + // send request, hard timeout (response after hard timeout should be ignored) + srv.sendRequest(testRequest) + clock.WaitForTimers(1) + clock.Run(softRequestTimeout) + expEvent(EvTimeout) + clock.WaitForTimers(1) + clock.Run(hardRequestTimeout) + expEvent(EvFail) + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) + expEvent(nil) +} + +func TestServerParallel(t *testing.T) { + rs := &testRequestServer{} + srv := NewServer(rs, &mclock.Simulated{}) + srv.subscribe(func(event Event) {}) + + expSend := func(expSent int) { + var sent int + for sent <= expSent { + if !srv.canRequestNow() { + break + } + sent++ + srv.sendRequest(testRequest) + } + if sent != expSent { + t.Errorf("Wrong number of parallel requests accepted (expected %d, got %d)", expSent, sent) + } + } + // max out parallel allowance + expSend(defaultParallelLimit) + // 1 answered, should accept 1 more + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) + expSend(1) + // 2 answered, should accept 2 more + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 2, Request: testRequest, Response: testResponse}}) + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 3, Request: testRequest, Response: testResponse}}) + expSend(2) + // failed request, should decrease allowance and not accept more + rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 4, Request: testRequest}}) + expSend(0) + srv.unsubscribe() +} + +func TestServerFail(t *testing.T) { + rs := &testRequestServer{} + clock := &mclock.Simulated{} + srv := NewServer(rs, clock) + srv.subscribe(func(event Event) {}) + expCanRequest := func(expCanRequest bool) { + if canRequest := srv.canRequestNow(); canRequest != expCanRequest { + t.Errorf("Wrong result for canRequestNow (expected %v, got %v)", expCanRequest, canRequest) + } + } + // timed out request + expCanRequest(true) + srv.sendRequest(testRequest) + clock.WaitForTimers(1) + expCanRequest(true) + clock.Run(softRequestTimeout) + expCanRequest(false) // cannot request when there is a timed out request + rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}}) + expCanRequest(true) + // explicit server.Fail + srv.fail("") + clock.WaitForTimers(1) + expCanRequest(false) // cannot request for a while after a failure + clock.Run(minFailureDelay) + expCanRequest(true) + // request returned with EvFail + srv.sendRequest(testRequest) + rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 2, Request: testRequest}}) + clock.WaitForTimers(1) + expCanRequest(false) // EvFail should also start failure delay + clock.Run(minFailureDelay) + expCanRequest(false) // second failure delay is longer, should still be disabled + clock.Run(minFailureDelay) + expCanRequest(true) + srv.unsubscribe() +} + +func TestServerEventRateLimit(t *testing.T) { + rs := &testRequestServer{} + clock := &mclock.Simulated{} + srv := NewServer(rs, clock) + var eventCount int + srv.subscribe(func(event Event) { + if !event.IsRequestEvent() { + eventCount++ + } + }) + expEvents := func(send, expAllowed int) { + eventCount = 0 + for sent := 0; sent < send; sent++ { + rs.eventCb(Event{Type: testEventType}) + } + if eventCount != expAllowed { + t.Errorf("Wrong number of server events passing rate limitation (sent %d, expected %d, got %d)", send, expAllowed, eventCount) + } + } + expEvents(maxServerEventBuffer+5, maxServerEventBuffer) + clock.Run(maxServerEventRate) + expEvents(5, 1) + clock.Run(maxServerEventRate * maxServerEventBuffer * 2) + expEvents(maxServerEventBuffer+5, maxServerEventBuffer) +} + +type testRequestServer struct { + eventCb func(Event) +} + +func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb } +func (rs *testRequestServer) SendRequest(ID, Request) {} +func (rs *testRequestServer) Unsubscribe() {}