beacon/light/request: simple request framework
This commit is contained in:
parent
5c67066a05
commit
eceba4c6f7
|
@ -0,0 +1,401 @@
|
|||
// Copyright 2023 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package request
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// Module represents a mechanism which is typically responsible for downloading
|
||||
// and updating a passive data structure. It does not directly interact with the
|
||||
// servers. It can start requests using the Requester interface, maintain its
|
||||
// internal state by receiving and processing Events and update its target data
|
||||
// structure based on the obtained data.
|
||||
// It is the Scheduler's responsibility to feed events to the modules, call
|
||||
// Process as long as there might be something to process and then generate request
|
||||
// candidates using MakeRequest and start the best possible requests.
|
||||
// Modules are called by Scheduler whenever a global trigger is fired. All events
|
||||
// fire the trigger. Changing a target data structure also triggers a next
|
||||
// processing round as it could make further actions possible either by the same
|
||||
// or another Module.
|
||||
type Module interface {
|
||||
// Process is a non-blocking function responsible for starting requests,
|
||||
// processing events and updating the target data structures(s) and the
|
||||
// internal state of the module. Module state typically consists of information
|
||||
// about pending requests and registered servers.
|
||||
// Process is always called after an event is received or after a target data
|
||||
// structure has been changed.
|
||||
//
|
||||
// Note: Process functions of different modules are never called concurrently;
|
||||
// they are called by Scheduler in the same order of priority as they were
|
||||
// registered in.
|
||||
Process(Requester, []Event)
|
||||
}
|
||||
|
||||
// Requester allows Modules to obtain the list of momentarily available servers,
|
||||
// start new requests and report server failure when a response has been proven
|
||||
// to be invalid in the processing phase.
|
||||
// Note that all Requester functions should be safe to call from Module.Process.
|
||||
type Requester interface {
|
||||
CanSendTo() []Server
|
||||
Send(Server, Request) ID
|
||||
Fail(Server, string)
|
||||
}
|
||||
|
||||
// Scheduler is a modular network data retrieval framework that coordinates multiple
|
||||
// servers and retrieval mechanisms (modules). It implements a trigger mechanism
|
||||
// that calls the Process function of registered modules whenever either the state
|
||||
// of existing data structures or events coming from registered servers could
|
||||
// allow new operations.
|
||||
type Scheduler struct {
|
||||
lock sync.Mutex
|
||||
modules []Module // first has highest priority
|
||||
names map[Module]string
|
||||
servers map[server]struct{}
|
||||
targets map[targetData]uint64
|
||||
|
||||
requesterLock sync.RWMutex
|
||||
serverOrder []server
|
||||
pending map[ServerAndID]pendingRequest
|
||||
|
||||
// eventLock guards access to the events list. Note that eventLock can be
|
||||
// locked either while lock is locked or unlocked but lock cannot be locked
|
||||
// while eventLock is locked.
|
||||
eventLock sync.Mutex
|
||||
events []Event
|
||||
stopCh chan chan struct{}
|
||||
|
||||
triggerCh chan struct{} // restarts waiting sync loop
|
||||
// if trigger has already been fired then send to testWaitCh blocks until
|
||||
// the triggered processing round is finished
|
||||
testWaitCh chan struct{}
|
||||
}
|
||||
|
||||
type (
|
||||
// Server identifies a server without allowing any direct interaction.
|
||||
// Note: server interface is used by Scheduler and Tracker but not used by
|
||||
// the modules that do not interact with them directly.
|
||||
// In order to make module testing easier, Server interface is used in
|
||||
// events and modules.
|
||||
Server any
|
||||
Request any
|
||||
Response any
|
||||
ID uint64
|
||||
ServerAndID struct {
|
||||
Server Server
|
||||
ID ID
|
||||
}
|
||||
)
|
||||
|
||||
// targetData represents a registered target data structure that increases its
|
||||
// ChangeCounter whenever it has been changed.
|
||||
type targetData interface {
|
||||
ChangeCounter() uint64
|
||||
}
|
||||
|
||||
// pendingRequest keeps track of sent and not yet finalized requests and their
|
||||
// sender modules.
|
||||
type pendingRequest struct {
|
||||
request Request
|
||||
module Module
|
||||
}
|
||||
|
||||
// NewScheduler creates a new Scheduler.
|
||||
func NewScheduler() *Scheduler {
|
||||
s := &Scheduler{
|
||||
servers: make(map[server]struct{}),
|
||||
names: make(map[Module]string),
|
||||
pending: make(map[ServerAndID]pendingRequest),
|
||||
targets: make(map[targetData]uint64),
|
||||
stopCh: make(chan chan struct{}),
|
||||
// Note: testWaitCh should not have capacity in order to ensure
|
||||
// that after a trigger happens testWaitCh will block until the resulting
|
||||
// processing round has been finished
|
||||
triggerCh: make(chan struct{}, 1),
|
||||
testWaitCh: make(chan struct{}),
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// RegisterTarget registers a target data structure, ensuring that any changes
|
||||
// made to it trigger a new round of Module.Process calls, giving a chance to
|
||||
// modules to react to the changes.
|
||||
func (s *Scheduler) RegisterTarget(t targetData) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.targets[t] = 0
|
||||
}
|
||||
|
||||
// RegisterModule registers a module. Should be called before starting the scheduler.
|
||||
// In each processing round the order of module processing depends on the order of
|
||||
// registration.
|
||||
func (s *Scheduler) RegisterModule(m Module, name string) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.modules = append(s.modules, m)
|
||||
s.names[m] = name
|
||||
}
|
||||
|
||||
// RegisterServer registers a new server.
|
||||
func (s *Scheduler) RegisterServer(server server) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.addEvent(Event{Type: EvRegistered, Server: server})
|
||||
server.subscribe(func(event Event) {
|
||||
event.Server = server
|
||||
s.addEvent(event)
|
||||
})
|
||||
}
|
||||
|
||||
// UnregisterServer removes a registered server.
|
||||
func (s *Scheduler) UnregisterServer(server server) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
server.unsubscribe()
|
||||
s.addEvent(Event{Type: EvUnregistered, Server: server})
|
||||
}
|
||||
|
||||
// Start starts the scheduler. It should be called after registering all modules
|
||||
// and before registering any servers.
|
||||
func (s *Scheduler) Start() {
|
||||
go s.syncLoop()
|
||||
}
|
||||
|
||||
// Stop stops the scheduler.
|
||||
func (s *Scheduler) Stop() {
|
||||
stop := make(chan struct{})
|
||||
s.stopCh <- stop
|
||||
<-stop
|
||||
s.lock.Lock()
|
||||
for server := range s.servers {
|
||||
server.unsubscribe()
|
||||
}
|
||||
s.servers = nil
|
||||
s.lock.Unlock()
|
||||
}
|
||||
|
||||
// syncLoop is the main event loop responsible for event/data processing and
|
||||
// sending new requests.
|
||||
// A round of processing starts whenever the global trigger is fired. Triggers
|
||||
// fired during a processing round ensure that there is going to be a next round.
|
||||
func (s *Scheduler) syncLoop() {
|
||||
for {
|
||||
s.lock.Lock()
|
||||
s.processRound()
|
||||
s.lock.Unlock()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case stop := <-s.stopCh:
|
||||
close(stop)
|
||||
return
|
||||
case <-s.triggerCh:
|
||||
break loop
|
||||
case <-s.testWaitCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// targetChanged returns true if a registered target data structure has been
|
||||
// changed since the last call to this function.
|
||||
func (s *Scheduler) targetChanged() (changed bool) {
|
||||
for target, counter := range s.targets {
|
||||
if newCounter := target.ChangeCounter(); newCounter != counter {
|
||||
s.targets[target] = newCounter
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processRound runs an entire processing round. It calls the Process functions
|
||||
// of all modules, passing all relevant events and repeating Process calls as
|
||||
// long as any changes have been made to the registered target data structures.
|
||||
// Once all events have been processed and a stable state has been achieved,
|
||||
// requests are generated and sent if necessary and possible.
|
||||
func (s *Scheduler) processRound() {
|
||||
for {
|
||||
log.Debug("Processing modules")
|
||||
filteredEvents := s.filterEvents()
|
||||
for _, module := range s.modules {
|
||||
log.Debug("Processing module", "name", s.names[module], "events", len(filteredEvents[module]))
|
||||
module.Process(requester{s, module}, filteredEvents[module])
|
||||
}
|
||||
if !s.targetChanged() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger starts a new processing round. If fired during processing, it ensures
|
||||
// another full round of processing all modules.
|
||||
func (s *Scheduler) Trigger() {
|
||||
select {
|
||||
case s.triggerCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// addEvent adds an event to be processed in the next round. Note that it can be
|
||||
// called regardless of the state of the lock mutex, making it safe for use in
|
||||
// the server event callback.
|
||||
func (s *Scheduler) addEvent(event Event) {
|
||||
s.eventLock.Lock()
|
||||
s.events = append(s.events, event)
|
||||
s.eventLock.Unlock()
|
||||
s.Trigger()
|
||||
}
|
||||
|
||||
// filterEvent sorts each Event either as a request event or a server event,
|
||||
// depending on its type. Request events are also sorted in a map based on the
|
||||
// module that originally initiated the request. It also ensures that no events
|
||||
// related to a server are returned before EvRegistered or after EvUnregistered.
|
||||
// In case of an EvUnregistered server event it also closes all pending requests
|
||||
// to the given server by adding a failed request event (EvFail), ensuring that
|
||||
// all requests get finalized and thereby allowing the module logic to be safe
|
||||
// and simple.
|
||||
func (s *Scheduler) filterEvents() map[Module][]Event {
|
||||
s.eventLock.Lock()
|
||||
events := s.events
|
||||
s.events = nil
|
||||
s.eventLock.Unlock()
|
||||
|
||||
s.requesterLock.Lock()
|
||||
defer s.requesterLock.Unlock()
|
||||
|
||||
filteredEvents := make(map[Module][]Event)
|
||||
for _, event := range events {
|
||||
server := event.Server.(server)
|
||||
if _, ok := s.servers[server]; !ok && event.Type != EvRegistered {
|
||||
continue // before EvRegister or after EvUnregister, discard
|
||||
}
|
||||
|
||||
if event.IsRequestEvent() {
|
||||
sid, _, _ := event.RequestInfo()
|
||||
pending, ok := s.pending[sid]
|
||||
if !ok {
|
||||
continue // request already closed, ignore further events
|
||||
}
|
||||
if event.Type == EvResponse || event.Type == EvFail {
|
||||
delete(s.pending, sid) // final event, close pending request
|
||||
}
|
||||
filteredEvents[pending.module] = append(filteredEvents[pending.module], event)
|
||||
} else {
|
||||
switch event.Type {
|
||||
case EvRegistered:
|
||||
s.servers[server] = struct{}{}
|
||||
s.serverOrder = append(s.serverOrder, nil)
|
||||
copy(s.serverOrder[1:], s.serverOrder[:len(s.serverOrder)-1])
|
||||
s.serverOrder[0] = server
|
||||
case EvUnregistered:
|
||||
s.closePending(event.Server, filteredEvents)
|
||||
delete(s.servers, server)
|
||||
for i, srv := range s.serverOrder {
|
||||
if srv == server {
|
||||
copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:])
|
||||
s.serverOrder = s.serverOrder[:len(s.serverOrder)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, module := range s.modules {
|
||||
filteredEvents[module] = append(filteredEvents[module], event)
|
||||
}
|
||||
}
|
||||
}
|
||||
return filteredEvents
|
||||
}
|
||||
|
||||
// closePending closes all pending requests to the given server and adds an EvFail
|
||||
// event to properly finalize them
|
||||
func (s *Scheduler) closePending(server Server, filteredEvents map[Module][]Event) {
|
||||
for sid, pending := range s.pending {
|
||||
if sid.Server == server {
|
||||
filteredEvents[pending.module] = append(filteredEvents[pending.module], Event{
|
||||
Type: EvFail,
|
||||
Server: server,
|
||||
Data: RequestResponse{
|
||||
ID: sid.ID,
|
||||
Request: pending.request,
|
||||
},
|
||||
})
|
||||
delete(s.pending, sid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// requester implements Requester. Note that while requester basically wraps
|
||||
// Scheduler (with the added information of the currently processed Module), all
|
||||
// functions are safe to call from Module.Process which is running while
|
||||
// the Scheduler.lock mutex is held.
|
||||
type requester struct {
|
||||
*Scheduler
|
||||
module Module
|
||||
}
|
||||
|
||||
// CanSendTo returns the list of currently available servers. It also returns
|
||||
// them in an order of least to most recently used, ensuring a round-robin usage
|
||||
// of suitable servers if the module always chooses the first suitable one.
|
||||
func (s requester) CanSendTo() []Server {
|
||||
s.requesterLock.RLock()
|
||||
defer s.requesterLock.RUnlock()
|
||||
|
||||
list := make([]Server, 0, len(s.serverOrder))
|
||||
for _, server := range s.serverOrder {
|
||||
if server.canRequestNow() {
|
||||
list = append(list, server)
|
||||
}
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
// Send sends a request and adds an entry to Scheduler.pending map, ensuring that
|
||||
// related request events will be delivered to the sender Module.
|
||||
func (s requester) Send(srv Server, req Request) ID {
|
||||
s.requesterLock.Lock()
|
||||
defer s.requesterLock.Unlock()
|
||||
|
||||
server := srv.(server)
|
||||
id := server.sendRequest(req)
|
||||
sid := ServerAndID{Server: srv, ID: id}
|
||||
s.pending[sid] = pendingRequest{request: req, module: s.module}
|
||||
for i, ss := range s.serverOrder {
|
||||
if ss == server {
|
||||
copy(s.serverOrder[i:len(s.serverOrder)-1], s.serverOrder[i+1:])
|
||||
s.serverOrder[len(s.serverOrder)-1] = server
|
||||
return id
|
||||
}
|
||||
}
|
||||
log.Error("Target server not found in ordered list of registered servers")
|
||||
return id
|
||||
}
|
||||
|
||||
// Fail should be called when a server delivers invalid or useless information.
|
||||
// Calling Fail disables the given server for a period that is initially short
|
||||
// but is exponentially growing if it happens frequently. This results in a
|
||||
// somewhat fault tolerant operation that avoids hammering servers with requests
|
||||
// that they cannot serve but still gives them a chance periodically.
|
||||
func (s requester) Fail(srv Server, desc string) {
|
||||
srv.(server).fail(desc)
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package request
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEventFilter(t *testing.T) {
|
||||
s := NewScheduler()
|
||||
module1 := &testModule{name: "module1"}
|
||||
module2 := &testModule{name: "module2"}
|
||||
s.RegisterModule(module1, "module1")
|
||||
s.RegisterModule(module2, "module2")
|
||||
s.Start()
|
||||
// startup process round without events
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, nil)
|
||||
module2.expProcess(t, nil)
|
||||
srv := &testServer{}
|
||||
// register server; both modules should receive server event
|
||||
s.RegisterServer(srv)
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, []Event{
|
||||
Event{Type: EvRegistered, Server: srv},
|
||||
})
|
||||
module2.expProcess(t, []Event{
|
||||
Event{Type: EvRegistered, Server: srv},
|
||||
})
|
||||
// let module1 send a request
|
||||
srv.canRequest = 1
|
||||
module1.sendReq = testRequest
|
||||
s.Trigger()
|
||||
// in first triggered round module1 sends the request, no events yet
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, nil)
|
||||
module2.expProcess(t, nil)
|
||||
// server emits EvTimeout; only module1 should receive it
|
||||
srv.eventCb(Event{Type: EvTimeout, Data: RequestResponse{ID: 1, Request: testRequest}})
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, []Event{
|
||||
Event{Type: EvTimeout, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
|
||||
})
|
||||
module2.expProcess(t, nil)
|
||||
// unregister server; both modules should receive server event
|
||||
s.UnregisterServer(srv)
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, []Event{
|
||||
// module1 should also receive EvFail on its pending request
|
||||
Event{Type: EvFail, Server: srv, Data: RequestResponse{ID: 1, Request: testRequest}},
|
||||
Event{Type: EvUnregistered, Server: srv},
|
||||
})
|
||||
module2.expProcess(t, []Event{
|
||||
Event{Type: EvUnregistered, Server: srv},
|
||||
})
|
||||
// response after server unregistered; should be discarded
|
||||
srv.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expProcess(t, nil)
|
||||
module2.expProcess(t, nil)
|
||||
// no more process rounds expected; shut down
|
||||
s.testWaitCh <- struct{}{}
|
||||
module1.expNoMoreProcess(t)
|
||||
module2.expNoMoreProcess(t)
|
||||
s.Stop()
|
||||
}
|
||||
|
||||
type testServer struct {
|
||||
eventCb func(Event)
|
||||
lastID ID
|
||||
canRequest int
|
||||
}
|
||||
|
||||
func (s *testServer) subscribe(eventCb func(Event)) {
|
||||
s.eventCb = eventCb
|
||||
}
|
||||
|
||||
func (s *testServer) canRequestNow() bool {
|
||||
return s.canRequest > 0
|
||||
}
|
||||
|
||||
func (s *testServer) sendRequest(req Request) ID {
|
||||
s.canRequest--
|
||||
s.lastID++
|
||||
return s.lastID
|
||||
}
|
||||
|
||||
func (s *testServer) fail(string) {}
|
||||
func (s *testServer) unsubscribe() {}
|
||||
|
||||
type testModule struct {
|
||||
name string
|
||||
processed [][]Event
|
||||
sendReq Request
|
||||
}
|
||||
|
||||
func (m *testModule) Process(requester Requester, events []Event) {
|
||||
m.processed = append(m.processed, events)
|
||||
if m.sendReq != nil {
|
||||
if cs := requester.CanSendTo(); len(cs) > 0 {
|
||||
requester.Send(cs[0], m.sendReq)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *testModule) expProcess(t *testing.T, expEvents []Event) {
|
||||
if len(m.processed) == 0 {
|
||||
t.Errorf("Missing call to %s.Process", m.name)
|
||||
return
|
||||
}
|
||||
events := m.processed[0]
|
||||
m.processed = m.processed[1:]
|
||||
if !reflect.DeepEqual(events, expEvents) {
|
||||
t.Errorf("Call to %s.Process with wrong events (expected %v, got %v)", m.name, expEvents, events)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *testModule) expNoMoreProcess(t *testing.T) {
|
||||
for len(m.processed) > 0 {
|
||||
t.Errorf("Unexpected call to %s.Process with events %v", m.name, m.processed[0])
|
||||
m.processed = m.processed[1:]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,439 @@
|
|||
// Copyright 2023 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package request
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
var (
|
||||
// request events
|
||||
EvResponse = &EventType{Name: "response", requestEvent: true} // data: RequestResponse; sent by requestServer
|
||||
EvFail = &EventType{Name: "fail", requestEvent: true} // data: RequestResponse; sent by requestServer
|
||||
EvTimeout = &EventType{Name: "timeout", requestEvent: true} // data: RequestResponse; sent by serverWithTimeout
|
||||
// server events
|
||||
EvRegistered = &EventType{Name: "registered"} // data: nil; sent by Scheduler
|
||||
EvUnregistered = &EventType{Name: "unregistered"} // data: nil; sent by Scheduler
|
||||
EvCanRequestAgain = &EventType{Name: "canRequestAgain"} // data: nil; sent by serverWithLimits
|
||||
)
|
||||
|
||||
const (
|
||||
softRequestTimeout = time.Second // allow resending request to a different server but do not cancel yet
|
||||
hardRequestTimeout = time.Second * 10 // cancel request
|
||||
)
|
||||
|
||||
const (
|
||||
// serverWithLimits parameters
|
||||
parallelAdjustUp = 0.1 // adjust parallelLimit up in case of success under full load
|
||||
parallelAdjustDown = 1 // adjust parallelLimit down in case of timeout/failure
|
||||
minParallelLimit = 1 // parallelLimit lower bound
|
||||
defaultParallelLimit = 3 // parallelLimit initial value
|
||||
minFailureDelay = time.Millisecond * 100 // minimum disable time in case of request failure
|
||||
maxFailureDelay = time.Minute // maximum disable time in case of request failure
|
||||
maxServerEventBuffer = 5 // server event allowance buffer limit
|
||||
maxServerEventRate = time.Second // server event allowance buffer recharge rate
|
||||
)
|
||||
|
||||
// requestServer can send requests in a non-blocking way and feed back events
|
||||
// through the event callback. After each request it should send back either
|
||||
// EvResponse or EvFail. Additionally, it may also send application-defined
|
||||
// events that the Modules can interpret.
|
||||
type requestServer interface {
|
||||
Subscribe(eventCallback func(Event))
|
||||
SendRequest(ID, Request)
|
||||
Unsubscribe()
|
||||
}
|
||||
|
||||
// server is implemented by a requestServer wrapped into serverWithTimeout and
|
||||
// serverWithLimits and is used by Scheduler.
|
||||
// In addition to requestServer functionality, server can also handle timeouts,
|
||||
// limit the number of parallel in-flight requests and temporarily disable
|
||||
// new requests based on timeouts and response failures.
|
||||
type server interface {
|
||||
subscribe(eventCallback func(Event))
|
||||
canRequestNow() bool
|
||||
sendRequest(Request) ID
|
||||
fail(string)
|
||||
unsubscribe()
|
||||
}
|
||||
|
||||
// NewServer wraps a requestServer and returns a server
|
||||
func NewServer(rs requestServer, clock mclock.Clock) server {
|
||||
s := &serverWithLimits{}
|
||||
s.parent = rs
|
||||
s.serverWithTimeout.init(clock)
|
||||
s.init()
|
||||
return s
|
||||
}
|
||||
|
||||
// EventType identifies an event type, either related to a request or the server
|
||||
// in general. Server events can also be externally defined.
|
||||
type EventType struct {
|
||||
Name string
|
||||
requestEvent bool // all request events are pre-defined in request package
|
||||
}
|
||||
|
||||
// Event describes an event where the type of Data depends on Type.
|
||||
// Server field is not required when sent through the event callback; it is filled
|
||||
// out when processed by the Scheduler. Note that the Scheduler can also create
|
||||
// and send events (EvRegistered, EvUnregistered) directly.
|
||||
type Event struct {
|
||||
Type *EventType
|
||||
Server Server // filled by Scheduler
|
||||
Data any
|
||||
}
|
||||
|
||||
// IsRequestEvent returns true if the event is a request event
|
||||
func (e *Event) IsRequestEvent() bool {
|
||||
return e.Type.requestEvent
|
||||
}
|
||||
|
||||
// RequestInfo assumes that the event is a request event and returns its contents
|
||||
// in a convenient form.
|
||||
func (e *Event) RequestInfo() (ServerAndID, Request, Response) {
|
||||
data := e.Data.(RequestResponse)
|
||||
return ServerAndID{Server: e.Server, ID: data.ID}, data.Request, data.Response
|
||||
}
|
||||
|
||||
// RequestResponse is the Data type of request events.
|
||||
type RequestResponse struct {
|
||||
ID ID
|
||||
Request Request
|
||||
Response Response
|
||||
}
|
||||
|
||||
// serverWithTimeout wraps a requestServer and introduces timeouts.
|
||||
// The request's lifecycle is concluded if EvResponse or EvFail emitted by the
|
||||
// parent requestServer. If this does not happen until softRequestTimeout then
|
||||
// EvTimeout is emitted, after which the final EvResponse or EvFail is still
|
||||
// guaranteed to follow.
|
||||
// If the parent fails to send this final event for hardRequestTimeout then
|
||||
// serverWithTimeout emits EvFail and discards any further events from the
|
||||
// parent related to the given request.
|
||||
type serverWithTimeout struct {
|
||||
parent requestServer
|
||||
lock sync.Mutex
|
||||
clock mclock.Clock
|
||||
childEventCb func(event Event)
|
||||
timeouts map[ID]mclock.Timer
|
||||
lastID ID
|
||||
}
|
||||
|
||||
// init initializes serverWithTimeout
|
||||
func (s *serverWithTimeout) init(clock mclock.Clock) {
|
||||
s.clock = clock
|
||||
s.timeouts = make(map[ID]mclock.Timer)
|
||||
}
|
||||
|
||||
// subscribe subscribes to events which include parent (requestServer) events
|
||||
// plus EvTimeout.
|
||||
func (s *serverWithTimeout) subscribe(eventCallback func(event Event)) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.childEventCb = eventCallback
|
||||
s.parent.Subscribe(s.eventCallback)
|
||||
}
|
||||
|
||||
// sendRequest generated a new request ID, emits EvRequest, sets up the timeout
|
||||
// timer, then sends the request through the parent (requestServer).
|
||||
func (s *serverWithTimeout) sendRequest(request Request) (reqId ID) {
|
||||
s.lock.Lock()
|
||||
s.lastID++
|
||||
id := s.lastID
|
||||
s.startTimeout(RequestResponse{ID: id, Request: request})
|
||||
s.lock.Unlock()
|
||||
s.parent.SendRequest(id, request)
|
||||
return id
|
||||
}
|
||||
|
||||
// eventCallback is called by parent (requestServer) event subscription.
|
||||
func (s *serverWithTimeout) eventCallback(event Event) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
switch event.Type {
|
||||
case EvResponse, EvFail:
|
||||
id := event.Data.(RequestResponse).ID
|
||||
if timer, ok := s.timeouts[id]; ok {
|
||||
// Note: if stopping the timer is unsuccessful then the resulting AfterFunc
|
||||
// call will just do nothing
|
||||
timer.Stop()
|
||||
delete(s.timeouts, id)
|
||||
s.childEventCb(event)
|
||||
}
|
||||
default:
|
||||
s.childEventCb(event)
|
||||
}
|
||||
}
|
||||
|
||||
// startTimeout starts a timeout timer for the given request.
|
||||
func (s *serverWithTimeout) startTimeout(reqData RequestResponse) {
|
||||
id := reqData.ID
|
||||
s.timeouts[id] = s.clock.AfterFunc(softRequestTimeout, func() {
|
||||
s.lock.Lock()
|
||||
if _, ok := s.timeouts[id]; !ok {
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
s.timeouts[id] = s.clock.AfterFunc(hardRequestTimeout-softRequestTimeout, func() {
|
||||
s.lock.Lock()
|
||||
if _, ok := s.timeouts[id]; !ok {
|
||||
s.lock.Unlock()
|
||||
return
|
||||
}
|
||||
delete(s.timeouts, id)
|
||||
childEventCb := s.childEventCb
|
||||
s.lock.Unlock()
|
||||
childEventCb(Event{Type: EvFail, Data: reqData})
|
||||
})
|
||||
childEventCb := s.childEventCb
|
||||
s.lock.Unlock()
|
||||
childEventCb(Event{Type: EvTimeout, Data: reqData})
|
||||
})
|
||||
}
|
||||
|
||||
// stop stops all goroutines associated with the server.
|
||||
func (s *serverWithTimeout) unsubscribe() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
for _, timer := range s.timeouts {
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
}
|
||||
}
|
||||
s.childEventCb = nil
|
||||
s.parent.Unsubscribe()
|
||||
}
|
||||
|
||||
// serverWithLimits wraps serverWithTimeout and implements server. It limits the
|
||||
// number of parallel in-flight requests and prevents sending new requests when a
|
||||
// pending one has already timed out. Server events are also rate limited.
|
||||
// It also implements a failure delay mechanism that adds an exponentially growing
|
||||
// delay each time a request fails (wrong answer or hard timeout). This makes the
|
||||
// syncing mechanism less brittle as temporary failures of the server might happen
|
||||
// sometimes, but still avoids hammering a non-functional server with requests.
|
||||
type serverWithLimits struct {
|
||||
serverWithTimeout
|
||||
lock sync.Mutex
|
||||
childEventCb func(event Event)
|
||||
softTimeouts map[ID]struct{}
|
||||
pendingCount, timeoutCount int
|
||||
parallelLimit float32
|
||||
sendEvent bool
|
||||
delayTimer mclock.Timer
|
||||
delayCounter int
|
||||
failureDelayEnd mclock.AbsTime
|
||||
failureDelay float64
|
||||
serverEventBuffer int
|
||||
eventBufferUpdated mclock.AbsTime
|
||||
}
|
||||
|
||||
// init initializes serverWithLimits
|
||||
func (s *serverWithLimits) init() {
|
||||
s.softTimeouts = make(map[ID]struct{})
|
||||
s.parallelLimit = defaultParallelLimit
|
||||
s.serverEventBuffer = maxServerEventBuffer
|
||||
}
|
||||
|
||||
// subscribe subscribes to events which include parent (serverWithTimeout) events
|
||||
// plus EvCanRequstAgain.
|
||||
func (s *serverWithLimits) subscribe(eventCallback func(event Event)) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.childEventCb = eventCallback
|
||||
s.serverWithTimeout.subscribe(s.eventCallback)
|
||||
}
|
||||
|
||||
// eventCallback is called by parent (serverWithTimeout) event subscription.
|
||||
func (s *serverWithLimits) eventCallback(event Event) {
|
||||
s.lock.Lock()
|
||||
var sendCanRequestAgain bool
|
||||
passEvent := true
|
||||
switch event.Type {
|
||||
case EvTimeout:
|
||||
id := event.Data.(RequestResponse).ID
|
||||
s.softTimeouts[id] = struct{}{}
|
||||
s.timeoutCount++
|
||||
s.parallelLimit -= parallelAdjustDown
|
||||
if s.parallelLimit < minParallelLimit {
|
||||
s.parallelLimit = minParallelLimit
|
||||
}
|
||||
log.Debug("Server timeout", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
|
||||
case EvResponse, EvFail:
|
||||
id := event.Data.(RequestResponse).ID
|
||||
if _, ok := s.softTimeouts[id]; ok {
|
||||
delete(s.softTimeouts, id)
|
||||
s.timeoutCount--
|
||||
log.Debug("Server timeout finalized", "count", s.timeoutCount, "parallelLimit", s.parallelLimit)
|
||||
}
|
||||
if event.Type == EvResponse && s.pendingCount >= int(s.parallelLimit) {
|
||||
s.parallelLimit += parallelAdjustUp
|
||||
}
|
||||
s.pendingCount--
|
||||
if s.canRequest() {
|
||||
sendCanRequestAgain = s.sendEvent
|
||||
s.sendEvent = false
|
||||
}
|
||||
if event.Type == EvFail {
|
||||
s.failLocked("failed request")
|
||||
}
|
||||
default:
|
||||
// server event; check rate limit
|
||||
if s.serverEventBuffer < maxServerEventBuffer {
|
||||
now := s.clock.Now()
|
||||
sinceUpdate := time.Duration(now - s.eventBufferUpdated)
|
||||
if sinceUpdate >= maxServerEventRate*time.Duration(maxServerEventBuffer-s.serverEventBuffer) {
|
||||
s.serverEventBuffer = maxServerEventBuffer
|
||||
s.eventBufferUpdated = now
|
||||
} else {
|
||||
addBuffer := int(sinceUpdate / maxServerEventRate)
|
||||
s.serverEventBuffer += addBuffer
|
||||
s.eventBufferUpdated += mclock.AbsTime(maxServerEventRate * time.Duration(addBuffer))
|
||||
}
|
||||
}
|
||||
if s.serverEventBuffer > 0 {
|
||||
s.serverEventBuffer--
|
||||
} else {
|
||||
passEvent = false
|
||||
}
|
||||
}
|
||||
childEventCb := s.childEventCb
|
||||
s.lock.Unlock()
|
||||
if passEvent {
|
||||
childEventCb(event)
|
||||
}
|
||||
if sendCanRequestAgain {
|
||||
childEventCb(Event{Type: EvCanRequestAgain})
|
||||
}
|
||||
}
|
||||
|
||||
// sendRequest sends a request through the parent (serverWithTimeout).
|
||||
func (s *serverWithLimits) sendRequest(request Request) (reqId ID) {
|
||||
s.lock.Lock()
|
||||
s.pendingCount++
|
||||
s.lock.Unlock()
|
||||
return s.serverWithTimeout.sendRequest(request)
|
||||
}
|
||||
|
||||
// stop stops all goroutines associated with the server.
|
||||
func (s *serverWithLimits) unsubscribe() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.delayTimer != nil {
|
||||
s.delayTimer.Stop()
|
||||
s.delayTimer = nil
|
||||
}
|
||||
s.childEventCb = nil
|
||||
s.serverWithTimeout.unsubscribe()
|
||||
}
|
||||
|
||||
// canRequest checks whether a new request can be started.
|
||||
func (s *serverWithLimits) canRequest() bool {
|
||||
if s.delayTimer != nil || s.pendingCount >= int(s.parallelLimit) || s.timeoutCount > 0 {
|
||||
return false
|
||||
}
|
||||
if s.parallelLimit < minParallelLimit {
|
||||
s.parallelLimit = minParallelLimit
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// canRequestNow checks whether a new request can be started, according to the
|
||||
// current in-flight request count and parallelLimit, and also the failure delay
|
||||
// timer.
|
||||
// If it returns false then it is guaranteed that an EvCanRequestAgain will be
|
||||
// sent whenever the server becomes available for requesting again.
|
||||
func (s *serverWithLimits) canRequestNow() bool {
|
||||
var sendCanRequestAgain bool
|
||||
s.lock.Lock()
|
||||
canRequest := s.canRequest()
|
||||
if canRequest {
|
||||
sendCanRequestAgain = s.sendEvent
|
||||
s.sendEvent = false
|
||||
}
|
||||
childEventCb := s.childEventCb
|
||||
s.lock.Unlock()
|
||||
if sendCanRequestAgain {
|
||||
childEventCb(Event{Type: EvCanRequestAgain})
|
||||
}
|
||||
return canRequest
|
||||
}
|
||||
|
||||
// delay sets the delay timer to the given duration, disabling new requests for
|
||||
// the given period.
|
||||
func (s *serverWithLimits) delay(delay time.Duration) {
|
||||
if s.delayTimer != nil {
|
||||
// Note: if stopping the timer is unsuccessful then the resulting AfterFunc
|
||||
// call will just do nothing
|
||||
s.delayTimer.Stop()
|
||||
s.delayTimer = nil
|
||||
}
|
||||
|
||||
s.delayCounter++
|
||||
delayCounter := s.delayCounter
|
||||
log.Debug("Server delay started", "length", delay)
|
||||
s.delayTimer = s.clock.AfterFunc(delay, func() {
|
||||
log.Debug("Server delay ended", "length", delay)
|
||||
var sendCanRequestAgain bool
|
||||
s.lock.Lock()
|
||||
if s.delayTimer != nil && s.delayCounter == delayCounter { // do nothing if there is a new timer now
|
||||
s.delayTimer = nil
|
||||
if s.canRequest() {
|
||||
sendCanRequestAgain = s.sendEvent
|
||||
s.sendEvent = false
|
||||
}
|
||||
}
|
||||
childEventCb := s.childEventCb
|
||||
s.lock.Unlock()
|
||||
if sendCanRequestAgain {
|
||||
childEventCb(Event{Type: EvCanRequestAgain})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// fail reports that a response from the server was found invalid by the processing
|
||||
// Module, disabling new requests for a dynamically adjused time period.
|
||||
func (s *serverWithLimits) fail(desc string) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
s.failLocked(desc)
|
||||
}
|
||||
|
||||
// failLocked calculates the dynamic failure delay and applies it.
|
||||
func (s *serverWithLimits) failLocked(desc string) {
|
||||
log.Debug("Server error", "description", desc)
|
||||
s.failureDelay *= 2
|
||||
now := s.clock.Now()
|
||||
if now > s.failureDelayEnd {
|
||||
s.failureDelay *= math.Pow(2, -float64(now-s.failureDelayEnd)/float64(maxFailureDelay))
|
||||
}
|
||||
if s.failureDelay < float64(minFailureDelay) {
|
||||
s.failureDelay = float64(minFailureDelay)
|
||||
}
|
||||
s.failureDelayEnd = now + mclock.AbsTime(s.failureDelay)
|
||||
s.delay(time.Duration(s.failureDelay))
|
||||
}
|
|
@ -0,0 +1,158 @@
|
|||
package request
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/mclock"
|
||||
)
|
||||
|
||||
const (
|
||||
testRequest = "Life, the Universe, and Everything"
|
||||
testResponse = 42
|
||||
)
|
||||
|
||||
var testEventType = &EventType{Name: "testEvent"}
|
||||
|
||||
func TestServerEvents(t *testing.T) {
|
||||
rs := &testRequestServer{}
|
||||
clock := &mclock.Simulated{}
|
||||
srv := NewServer(rs, clock)
|
||||
var lastEventType *EventType
|
||||
srv.subscribe(func(event Event) { lastEventType = event.Type })
|
||||
evTypeName := func(evType *EventType) string {
|
||||
if evType == nil {
|
||||
return "none"
|
||||
}
|
||||
return evType.Name
|
||||
}
|
||||
expEvent := func(expType *EventType) {
|
||||
if lastEventType != expType {
|
||||
t.Errorf("Wrong event type (expected %s, got %s)", evTypeName(expType), evTypeName(lastEventType))
|
||||
}
|
||||
lastEventType = nil
|
||||
}
|
||||
// user events should simply be passed through
|
||||
rs.eventCb(Event{Type: testEventType})
|
||||
expEvent(testEventType)
|
||||
// send request, soft timeout, then valid response
|
||||
srv.sendRequest(testRequest)
|
||||
clock.WaitForTimers(1)
|
||||
clock.Run(softRequestTimeout)
|
||||
expEvent(EvTimeout)
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
||||
expEvent(EvResponse)
|
||||
// send request, hard timeout (response after hard timeout should be ignored)
|
||||
srv.sendRequest(testRequest)
|
||||
clock.WaitForTimers(1)
|
||||
clock.Run(softRequestTimeout)
|
||||
expEvent(EvTimeout)
|
||||
clock.WaitForTimers(1)
|
||||
clock.Run(hardRequestTimeout)
|
||||
expEvent(EvFail)
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
||||
expEvent(nil)
|
||||
}
|
||||
|
||||
func TestServerParallel(t *testing.T) {
|
||||
rs := &testRequestServer{}
|
||||
srv := NewServer(rs, &mclock.Simulated{})
|
||||
srv.subscribe(func(event Event) {})
|
||||
|
||||
expSend := func(expSent int) {
|
||||
var sent int
|
||||
for sent <= expSent {
|
||||
if !srv.canRequestNow() {
|
||||
break
|
||||
}
|
||||
sent++
|
||||
srv.sendRequest(testRequest)
|
||||
}
|
||||
if sent != expSent {
|
||||
t.Errorf("Wrong number of parallel requests accepted (expected %d, got %d)", expSent, sent)
|
||||
}
|
||||
}
|
||||
// max out parallel allowance
|
||||
expSend(defaultParallelLimit)
|
||||
// 1 answered, should accept 1 more
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
||||
expSend(1)
|
||||
// 2 answered, should accept 2 more
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 2, Request: testRequest, Response: testResponse}})
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 3, Request: testRequest, Response: testResponse}})
|
||||
expSend(2)
|
||||
// failed request, should decrease allowance and not accept more
|
||||
rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 4, Request: testRequest}})
|
||||
expSend(0)
|
||||
srv.unsubscribe()
|
||||
}
|
||||
|
||||
func TestServerFail(t *testing.T) {
|
||||
rs := &testRequestServer{}
|
||||
clock := &mclock.Simulated{}
|
||||
srv := NewServer(rs, clock)
|
||||
srv.subscribe(func(event Event) {})
|
||||
expCanRequest := func(expCanRequest bool) {
|
||||
if canRequest := srv.canRequestNow(); canRequest != expCanRequest {
|
||||
t.Errorf("Wrong result for canRequestNow (expected %v, got %v)", expCanRequest, canRequest)
|
||||
}
|
||||
}
|
||||
// timed out request
|
||||
expCanRequest(true)
|
||||
srv.sendRequest(testRequest)
|
||||
clock.WaitForTimers(1)
|
||||
expCanRequest(true)
|
||||
clock.Run(softRequestTimeout)
|
||||
expCanRequest(false) // cannot request when there is a timed out request
|
||||
rs.eventCb(Event{Type: EvResponse, Data: RequestResponse{ID: 1, Request: testRequest, Response: testResponse}})
|
||||
expCanRequest(true)
|
||||
// explicit server.Fail
|
||||
srv.fail("")
|
||||
clock.WaitForTimers(1)
|
||||
expCanRequest(false) // cannot request for a while after a failure
|
||||
clock.Run(minFailureDelay)
|
||||
expCanRequest(true)
|
||||
// request returned with EvFail
|
||||
srv.sendRequest(testRequest)
|
||||
rs.eventCb(Event{Type: EvFail, Data: RequestResponse{ID: 2, Request: testRequest}})
|
||||
clock.WaitForTimers(1)
|
||||
expCanRequest(false) // EvFail should also start failure delay
|
||||
clock.Run(minFailureDelay)
|
||||
expCanRequest(false) // second failure delay is longer, should still be disabled
|
||||
clock.Run(minFailureDelay)
|
||||
expCanRequest(true)
|
||||
srv.unsubscribe()
|
||||
}
|
||||
|
||||
func TestServerEventRateLimit(t *testing.T) {
|
||||
rs := &testRequestServer{}
|
||||
clock := &mclock.Simulated{}
|
||||
srv := NewServer(rs, clock)
|
||||
var eventCount int
|
||||
srv.subscribe(func(event Event) {
|
||||
if !event.IsRequestEvent() {
|
||||
eventCount++
|
||||
}
|
||||
})
|
||||
expEvents := func(send, expAllowed int) {
|
||||
eventCount = 0
|
||||
for sent := 0; sent < send; sent++ {
|
||||
rs.eventCb(Event{Type: testEventType})
|
||||
}
|
||||
if eventCount != expAllowed {
|
||||
t.Errorf("Wrong number of server events passing rate limitation (sent %d, expected %d, got %d)", send, expAllowed, eventCount)
|
||||
}
|
||||
}
|
||||
expEvents(maxServerEventBuffer+5, maxServerEventBuffer)
|
||||
clock.Run(maxServerEventRate)
|
||||
expEvents(5, 1)
|
||||
clock.Run(maxServerEventRate * maxServerEventBuffer * 2)
|
||||
expEvents(maxServerEventBuffer+5, maxServerEventBuffer)
|
||||
}
|
||||
|
||||
type testRequestServer struct {
|
||||
eventCb func(Event)
|
||||
}
|
||||
|
||||
func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
|
||||
func (rs *testRequestServer) SendRequest(ID, Request) {}
|
||||
func (rs *testRequestServer) Unsubscribe() {}
|
Loading…
Reference in New Issue